5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 22:51:34 +08:00

SQOOP-35. Bundle job dependencies via DistributedCache.

Sqoop, shim, JDBC driver, and $SQOOP_HOME/lib/*.jar are now all
placed in the DistributedCache for each job.
Removes generation of the 'lib/' subdir in generated jar files.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:06 +00:00
parent 09d4d46d34
commit 41cee93c40
9 changed files with 236 additions and 83 deletions

View File

@ -49,6 +49,7 @@ public DirectMySQLManager(final SqoopOptions options) {
public void importTable(ImportJobContext context) public void importTable(ImportJobContext context)
throws IOException, ImportException { throws IOException, ImportException {
context.setConnManager(this);
if (context.getOptions().getColumns() != null) { if (context.getOptions().getColumns() != null) {
LOG.warn("Direct-mode import from MySQL does not support column"); LOG.warn("Direct-mode import from MySQL does not support column");
LOG.warn("selection. Falling back to JDBC-based import."); LOG.warn("selection. Falling back to JDBC-based import.");
@ -97,6 +98,7 @@ public void importTable(ImportJobContext context)
@Override @Override
public void exportTable(ExportJobContext context) public void exportTable(ExportJobContext context)
throws IOException, ExportException { throws IOException, ExportException {
context.setConnManager(this);
MySQLExportJob exportJob = new MySQLExportJob(context); MySQLExportJob exportJob = new MySQLExportJob(context);
exportJob.runExport(); exportJob.runExport();
} }

View File

@ -29,6 +29,7 @@ public class ExportJobContext {
private String tableName; private String tableName;
private String jarFile; private String jarFile;
private SqoopOptions options; private SqoopOptions options;
private ConnManager manager;
public ExportJobContext(final String table, final String jar, public ExportJobContext(final String table, final String jar,
final SqoopOptions opts) { final SqoopOptions opts) {
@ -53,5 +54,22 @@ public String getJarFile() {
public SqoopOptions getOptions() { public SqoopOptions getOptions() {
return options; return options;
} }
/**
* Set the ConnManager instance to be used during the export's
* configuration.
*/
public void setConnManager(ConnManager mgr) {
this.manager = mgr;
}
/**
* Get the ConnManager instance to use during an export's
* configuration stage.
*/
public ConnManager getConnManager() {
return this.manager;
}
} }

View File

@ -302,6 +302,7 @@ public void importTable(ImportJobContext context)
*/ */
public void exportTable(ExportJobContext context) public void exportTable(ExportJobContext context)
throws IOException, ExportException { throws IOException, ExportException {
context.setConnManager(this);
try { try {
JdbcExportJob exportJob = new JdbcExportJob(context, null, null, JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
(Class<? extends OutputFormat>) ShimLoader.getShimClass( (Class<? extends OutputFormat>) ShimLoader.getShimClass(

View File

@ -599,6 +599,7 @@ protected Connection makeConnection() throws SQLException {
*/ */
public void exportTable(ExportJobContext context) public void exportTable(ExportJobContext context)
throws IOException, ExportException { throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context); JdbcExportJob exportJob = new JdbcExportJob(context);
exportJob.runExport(); exportJob.runExport();
} }
@ -620,6 +621,7 @@ public void release() {
*/ */
public void updateTable(ExportJobContext context) public void updateTable(ExportJobContext context)
throws IOException, ExportException { throws IOException, ExportException {
context.setConnManager(this);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
exportJob.runExport(); exportJob.runExport();
} }

View File

@ -284,6 +284,7 @@ public void runExport() throws ExportException, IOException {
configureOutputFormat(job, tableName, tableClassName); configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName); configureMapper(job, tableName, tableClassName);
configureNumTasks(job); configureNumTasks(job);
cacheJars(job, context.getConnManager());
boolean success = runJob(job); boolean success = runJob(job);
if (!success) { if (!success) {

View File

@ -160,6 +160,7 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
configureOutputFormat(job, tableName, tableClassName); configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName); configureMapper(job, tableName, tableClassName);
configureNumTasks(job); configureNumTasks(job);
cacheJars(job, getContext().getConnManager());
jobSetup(job); jobSetup(job);
boolean success = runJob(job); boolean success = runJob(job);

View File

@ -18,20 +18,34 @@
package com.cloudera.sqoop.mapreduce; package com.cloudera.sqoop.mapreduce;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.util.ClassLoaderStack; import com.cloudera.sqoop.util.ClassLoaderStack;
import com.cloudera.sqoop.util.Jars;
/** /**
* Base class for configuring and running a MapReduce job. * Base class for configuring and running a MapReduce job.
@ -113,6 +127,80 @@ public void setOptions(SqoopOptions opts) {
this.options = opts; this.options = opts;
} }
/**
* Put jar files required by Sqoop into the DistributedCache.
* @param job the Job being submitted.
* @param mgr the ConnManager to use.
*/
protected void cacheJars(Job job, ConnManager mgr)
throws IOException {
Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.getLocal(conf);
Set<String> localUrls = new HashSet<String>();
addToCache(Jars.getSqoopJarPath(), fs, localUrls);
addToCache(Jars.getShimJarPath(), fs, localUrls);
addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
// Add anything in $SQOOP_HOME/lib, if this is set.
String sqoopHome = System.getenv("SQOOP_HOME");
if (null != sqoopHome) {
File sqoopHomeFile = new File(sqoopHome);
File sqoopLibFile = new File(sqoopHomeFile, "lib");
if (sqoopLibFile.exists()) {
addDirToCache(sqoopLibFile, fs, localUrls);
}
} else {
LOG.warn("SQOOP_HOME is unset. May not be able to find "
+ "all job dependencies.");
}
// If we didn't put anything in our set, then there's nothing to cache.
if (localUrls.isEmpty()) {
return;
}
// Add these to the 'tmpjars' array, which the MR JobSubmitter
// will upload to HDFS and put in the DistributedCache libjars.
String tmpjars = conf.get("tmpjars");
StringBuilder sb = new StringBuilder();
if (null != tmpjars) {
sb.append(tmpjars);
sb.append(",");
}
sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
conf.set("tmpjars", sb.toString());
}
private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
if (null == file) {
return;
}
Path p = new Path(file);
String qualified = p.makeQualified(fs).toString();
LOG.debug("Adding to job classpath: " + qualified);
localUrls.add(qualified);
}
/**
* Add the .jar elements of a directory to the DCache classpath,
* nonrecursively.
*/
private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
if (null == dir) {
return;
}
for (File libfile : dir.listFiles()) {
if (libfile.exists() && !libfile.isDirectory()
&& libfile.getName().endsWith("jar")) {
addToCache(libfile.toString(), fs, localUrls);
}
}
}
/** /**
* If jars must be loaded into the local environment, do so here. * If jars must be loaded into the local environment, do so here.
*/ */

View File

@ -44,6 +44,8 @@
import com.cloudera.sqoop.util.FileListing; import com.cloudera.sqoop.util.FileListing;
import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.shims.HadoopShim;
import com.cloudera.sqoop.util.Jars;
/** /**
* Manages the compilation of a bunch of .java files into .class files * Manages the compilation of a bunch of .java files into .class files
* and eventually a jar. * and eventually a jar.
@ -82,7 +84,7 @@ private String findHadoopCoreJar() {
if (null == hadoopHome) { if (null == hadoopHome) {
LOG.info("$HADOOP_HOME is not set"); LOG.info("$HADOOP_HOME is not set");
return findJarForClass(JobConf.class); return Jars.getJarPathForClass(JobConf.class);
} }
if (!hadoopHome.endsWith(File.separator)) { if (!hadoopHome.endsWith(File.separator)) {
@ -95,7 +97,7 @@ private String findHadoopCoreJar() {
if (null == entries) { if (null == entries) {
LOG.warn("HADOOP_HOME appears empty or missing"); LOG.warn("HADOOP_HOME appears empty or missing");
return findJarForClass(JobConf.class); return Jars.getJarPathForClass(JobConf.class);
} }
for (File f : entries) { for (File f : entries) {
@ -106,7 +108,7 @@ private String findHadoopCoreJar() {
} }
} }
return findJarForClass(JobConf.class); return Jars.getJarPathForClass(JobConf.class);
} }
/** /**
@ -144,7 +146,7 @@ public void compile() throws IOException {
} }
// find sqoop jar for compilation classpath // find sqoop jar for compilation classpath
String sqoopJar = findThisJar(); String sqoopJar = Jars.getSqoopJarPath();
if (null != sqoopJar) { if (null != sqoopJar) {
sqoopJar = File.pathSeparator + sqoopJar; sqoopJar = File.pathSeparator + sqoopJar;
} else { } else {
@ -299,26 +301,6 @@ public void jar() throws IOException {
jstream = new JarOutputStream(fstream); jstream = new JarOutputStream(fstream);
addClassFilesFromDir(new File(jarOutDir), jstream); addClassFilesFromDir(new File(jarOutDir), jstream);
// put our own jar in there in its lib/ subdir
String thisJarFile = findThisJar();
if (null != thisJarFile) {
addLibJar(thisJarFile, jstream);
} else {
// couldn't find our own jar (we were running from .class files?)
LOG.warn("Could not find jar for Sqoop; "
+ "MapReduce jobs may not run correctly.");
}
String shimJarFile = findShimJar();
if (null != shimJarFile) {
addLibJar(shimJarFile, jstream);
} else {
// Couldn't find the shim jar.
LOG.warn("Could not find jar for Sqoop shims.");
LOG.warn("MapReduce jobs may not run correctly.");
}
jstream.finish(); jstream.finish();
} finally { } finally {
if (null != jstream) { if (null != jstream) {
@ -341,23 +323,6 @@ public void jar() throws IOException {
LOG.debug("Finished writing jar file " + jarFilename); LOG.debug("Finished writing jar file " + jarFilename);
} }
/**
* Add a jar in the lib/ directory of a JarOutputStream we're building.
* @param jarFilename the source jar file to include.
* @param jstream the JarOutputStream to write to.
*/
private void addLibJar(String jarFilename, JarOutputStream jstream)
throws IOException {
File thisJarFileObj = new File(jarFilename);
String thisJarBasename = thisJarFileObj.getName();
String thisJarEntryName = "lib" + File.separator + thisJarBasename;
ZipEntry ze = new ZipEntry(thisJarEntryName);
jstream.putNextEntry(ze);
copyFileToStream(thisJarFileObj, jstream);
jstream.closeEntry();
}
private static final int BUFFER_SZ = 4096; private static final int BUFFER_SZ = 4096;
/** /**
@ -383,46 +348,4 @@ private void copyFileToStream(File f, OutputStream ostream)
fis.close(); fis.close();
} }
} }
private String findThisJar() {
return findJarForClass(CompilationManager.class);
}
private String findShimJar() {
HadoopShim h = HadoopShim.get();
if (null == h) {
return null;
}
return findJarForClass(h.getClass());
}
// Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar().
private String findJarForClass(Class<? extends Object> classObj) {
ClassLoader loader = classObj.getClassLoader();
String classFile = classObj.getName().replaceAll("\\.", "/") + ".class";
try {
for (Enumeration<URL> itr = loader.getResources(classFile);
itr.hasMoreElements();) {
URL url = (URL) itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
// URLDecoder is a misnamed class, since it actually decodes
// x-www-form-urlencoded MIME type rather than actual
// URL encoding (which the file path has). Therefore it would
// decode +s to ' 's which is incorrect (spaces are actually
// either unencoded or encoded as "%20"). Replace +s first, so
// that they are kept sacred during the decoding process.
toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
} }

View File

@ -0,0 +1,117 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.util;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.shims.HadoopShim;
/**
* Utility class; returns the locations of various jars.
*/
public final class Jars {
public static final Log LOG = LogFactory.getLog(
Jars.class.getName());
private Jars() {
}
/**
* @return the path to the main Sqoop jar.
*/
public static String getSqoopJarPath() {
return getJarPathForClass(Jars.class);
}
/**
* @return the path to the currently-loaded shim jar.
*/
public static String getShimJarPath() {
HadoopShim h = HadoopShim.get();
if (null == h) {
return null;
}
return getJarPathForClass(h.getClass());
}
/**
* Return the jar file path that contains a particular class.
* Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar().
*/
public static String getJarPathForClass(Class<? extends Object> classObj) {
ClassLoader loader = classObj.getClassLoader();
String classFile = classObj.getName().replaceAll("\\.", "/") + ".class";
try {
for (Enumeration<URL> itr = loader.getResources(classFile);
itr.hasMoreElements();) {
URL url = (URL) itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
// URLDecoder is a misnamed class, since it actually decodes
// x-www-form-urlencoded MIME type rather than actual
// URL encoding (which the file path has). Therefore it would
// decode +s to ' 's which is incorrect (spaces are actually
// either unencoded or encoded as "%20"). Replace +s first, so
// that they are kept sacred during the decoding process.
toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
/**
* Return the path to the jar containing the JDBC driver
* for a ConnManager.
*/
public static String getDriverClassJar(ConnManager mgr) {
if (null == mgr) {
return null;
}
String driverClassName = mgr.getDriverClass();
if (null == driverClassName) {
return null;
}
try {
Class<? extends Object> driverClass = Class.forName(driverClassName);
return getJarPathForClass(driverClass);
} catch (ClassNotFoundException cnfe) {
LOG.warn("No such class " + driverClassName + " available.");
return null;
}
}
}