diff --git a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java index c4a89856..5999fe4f 100644 --- a/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java +++ b/src/java/com/cloudera/sqoop/manager/DirectMySQLManager.java @@ -49,6 +49,7 @@ public DirectMySQLManager(final SqoopOptions options) { public void importTable(ImportJobContext context) throws IOException, ImportException { + context.setConnManager(this); if (context.getOptions().getColumns() != null) { LOG.warn("Direct-mode import from MySQL does not support column"); LOG.warn("selection. Falling back to JDBC-based import."); @@ -97,6 +98,7 @@ public void importTable(ImportJobContext context) @Override public void exportTable(ExportJobContext context) throws IOException, ExportException { + context.setConnManager(this); MySQLExportJob exportJob = new MySQLExportJob(context); exportJob.runExport(); } diff --git a/src/java/com/cloudera/sqoop/manager/ExportJobContext.java b/src/java/com/cloudera/sqoop/manager/ExportJobContext.java index 4e406dd0..2e45a1b1 100644 --- a/src/java/com/cloudera/sqoop/manager/ExportJobContext.java +++ b/src/java/com/cloudera/sqoop/manager/ExportJobContext.java @@ -29,6 +29,7 @@ public class ExportJobContext { private String tableName; private String jarFile; private SqoopOptions options; + private ConnManager manager; public ExportJobContext(final String table, final String jar, final SqoopOptions opts) { @@ -53,5 +54,22 @@ public String getJarFile() { public SqoopOptions getOptions() { return options; } + + /** + * Set the ConnManager instance to be used during the export's + * configuration. + */ + public void setConnManager(ConnManager mgr) { + this.manager = mgr; + } + + /** + * Get the ConnManager instance to use during an export's + * configuration stage. + */ + public ConnManager getConnManager() { + return this.manager; + } + } diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java index 75b209e8..4824ed73 100644 --- a/src/java/com/cloudera/sqoop/manager/OracleManager.java +++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java @@ -302,6 +302,7 @@ public void importTable(ImportJobContext context) */ public void exportTable(ExportJobContext context) throws IOException, ExportException { + context.setConnManager(this); try { JdbcExportJob exportJob = new JdbcExportJob(context, null, null, (Class) ShimLoader.getShimClass( diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index 7a0fa32f..c07b3a0a 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -599,6 +599,7 @@ protected Connection makeConnection() throws SQLException { */ public void exportTable(ExportJobContext context) throws IOException, ExportException { + context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context); exportJob.runExport(); } @@ -620,6 +621,7 @@ public void release() { */ public void updateTable(ExportJobContext context) throws IOException, ExportException { + context.setConnManager(this); JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); exportJob.runExport(); } diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java index 1337fad6..c8a161a5 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java @@ -284,6 +284,7 @@ public void runExport() throws ExportException, IOException { configureOutputFormat(job, tableName, tableClassName); configureMapper(job, tableName, tableClassName); configureNumTasks(job); + cacheJars(job, context.getConnManager()); boolean success = runJob(job); if (!success) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java index f4bfe815..09463310 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java @@ -160,6 +160,7 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, configureOutputFormat(job, tableName, tableClassName); configureMapper(job, tableName, tableClassName); configureNumTasks(job); + cacheJars(job, getContext().getConnManager()); jobSetup(job); boolean success = runJob(job); diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java index b80cfc97..0613506d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java @@ -18,20 +18,34 @@ package com.cloudera.sqoop.mapreduce; +import java.io.File; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.filecache.DistributedCache; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.util.StringUtils; + import com.cloudera.sqoop.SqoopOptions; + +import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.util.ClassLoaderStack; +import com.cloudera.sqoop.util.Jars; /** * Base class for configuring and running a MapReduce job. @@ -113,6 +127,80 @@ public void setOptions(SqoopOptions opts) { this.options = opts; } + /** + * Put jar files required by Sqoop into the DistributedCache. + * @param job the Job being submitted. + * @param mgr the ConnManager to use. + */ + protected void cacheJars(Job job, ConnManager mgr) + throws IOException { + + Configuration conf = job.getConfiguration(); + FileSystem fs = FileSystem.getLocal(conf); + Set localUrls = new HashSet(); + + addToCache(Jars.getSqoopJarPath(), fs, localUrls); + addToCache(Jars.getShimJarPath(), fs, localUrls); + addToCache(Jars.getDriverClassJar(mgr), fs, localUrls); + + // Add anything in $SQOOP_HOME/lib, if this is set. + String sqoopHome = System.getenv("SQOOP_HOME"); + if (null != sqoopHome) { + File sqoopHomeFile = new File(sqoopHome); + File sqoopLibFile = new File(sqoopHomeFile, "lib"); + if (sqoopLibFile.exists()) { + addDirToCache(sqoopLibFile, fs, localUrls); + } + } else { + LOG.warn("SQOOP_HOME is unset. May not be able to find " + + "all job dependencies."); + } + + // If we didn't put anything in our set, then there's nothing to cache. + if (localUrls.isEmpty()) { + return; + } + + // Add these to the 'tmpjars' array, which the MR JobSubmitter + // will upload to HDFS and put in the DistributedCache libjars. + String tmpjars = conf.get("tmpjars"); + StringBuilder sb = new StringBuilder(); + if (null != tmpjars) { + sb.append(tmpjars); + sb.append(","); + } + sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0]))); + conf.set("tmpjars", sb.toString()); + } + + private void addToCache(String file, FileSystem fs, Set localUrls) { + if (null == file) { + return; + } + + Path p = new Path(file); + String qualified = p.makeQualified(fs).toString(); + LOG.debug("Adding to job classpath: " + qualified); + localUrls.add(qualified); + } + + /** + * Add the .jar elements of a directory to the DCache classpath, + * nonrecursively. + */ + private void addDirToCache(File dir, FileSystem fs, Set localUrls) { + if (null == dir) { + return; + } + + for (File libfile : dir.listFiles()) { + if (libfile.exists() && !libfile.isDirectory() + && libfile.getName().endsWith("jar")) { + addToCache(libfile.toString(), fs, localUrls); + } + } + } + /** * If jars must be loaded into the local environment, do so here. */ diff --git a/src/java/com/cloudera/sqoop/orm/CompilationManager.java b/src/java/com/cloudera/sqoop/orm/CompilationManager.java index 93263002..d6a6ccf4 100644 --- a/src/java/com/cloudera/sqoop/orm/CompilationManager.java +++ b/src/java/com/cloudera/sqoop/orm/CompilationManager.java @@ -44,6 +44,8 @@ import com.cloudera.sqoop.util.FileListing; import com.cloudera.sqoop.shims.HadoopShim; +import com.cloudera.sqoop.util.Jars; + /** * Manages the compilation of a bunch of .java files into .class files * and eventually a jar. @@ -82,7 +84,7 @@ private String findHadoopCoreJar() { if (null == hadoopHome) { LOG.info("$HADOOP_HOME is not set"); - return findJarForClass(JobConf.class); + return Jars.getJarPathForClass(JobConf.class); } if (!hadoopHome.endsWith(File.separator)) { @@ -95,7 +97,7 @@ private String findHadoopCoreJar() { if (null == entries) { LOG.warn("HADOOP_HOME appears empty or missing"); - return findJarForClass(JobConf.class); + return Jars.getJarPathForClass(JobConf.class); } for (File f : entries) { @@ -106,7 +108,7 @@ private String findHadoopCoreJar() { } } - return findJarForClass(JobConf.class); + return Jars.getJarPathForClass(JobConf.class); } /** @@ -144,7 +146,7 @@ public void compile() throws IOException { } // find sqoop jar for compilation classpath - String sqoopJar = findThisJar(); + String sqoopJar = Jars.getSqoopJarPath(); if (null != sqoopJar) { sqoopJar = File.pathSeparator + sqoopJar; } else { @@ -299,26 +301,6 @@ public void jar() throws IOException { jstream = new JarOutputStream(fstream); addClassFilesFromDir(new File(jarOutDir), jstream); - - // put our own jar in there in its lib/ subdir - String thisJarFile = findThisJar(); - if (null != thisJarFile) { - addLibJar(thisJarFile, jstream); - } else { - // couldn't find our own jar (we were running from .class files?) - LOG.warn("Could not find jar for Sqoop; " - + "MapReduce jobs may not run correctly."); - } - - String shimJarFile = findShimJar(); - if (null != shimJarFile) { - addLibJar(shimJarFile, jstream); - } else { - // Couldn't find the shim jar. - LOG.warn("Could not find jar for Sqoop shims."); - LOG.warn("MapReduce jobs may not run correctly."); - } - jstream.finish(); } finally { if (null != jstream) { @@ -341,23 +323,6 @@ public void jar() throws IOException { LOG.debug("Finished writing jar file " + jarFilename); } - /** - * Add a jar in the lib/ directory of a JarOutputStream we're building. - * @param jarFilename the source jar file to include. - * @param jstream the JarOutputStream to write to. - */ - private void addLibJar(String jarFilename, JarOutputStream jstream) - throws IOException { - File thisJarFileObj = new File(jarFilename); - String thisJarBasename = thisJarFileObj.getName(); - String thisJarEntryName = "lib" + File.separator + thisJarBasename; - ZipEntry ze = new ZipEntry(thisJarEntryName); - jstream.putNextEntry(ze); - copyFileToStream(thisJarFileObj, jstream); - jstream.closeEntry(); - } - - private static final int BUFFER_SZ = 4096; /** @@ -383,46 +348,4 @@ private void copyFileToStream(File f, OutputStream ostream) fis.close(); } } - - private String findThisJar() { - return findJarForClass(CompilationManager.class); - } - - private String findShimJar() { - HadoopShim h = HadoopShim.get(); - if (null == h) { - return null; - } - return findJarForClass(h.getClass()); - } - - // Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar(). - private String findJarForClass(Class classObj) { - ClassLoader loader = classObj.getClassLoader(); - String classFile = classObj.getName().replaceAll("\\.", "/") + ".class"; - try { - for (Enumeration itr = loader.getResources(classFile); - itr.hasMoreElements();) { - URL url = (URL) itr.nextElement(); - if ("jar".equals(url.getProtocol())) { - String toReturn = url.getPath(); - if (toReturn.startsWith("file:")) { - toReturn = toReturn.substring("file:".length()); - } - // URLDecoder is a misnamed class, since it actually decodes - // x-www-form-urlencoded MIME type rather than actual - // URL encoding (which the file path has). Therefore it would - // decode +s to ' 's which is incorrect (spaces are actually - // either unencoded or encoded as "%20"). Replace +s first, so - // that they are kept sacred during the decoding process. - toReturn = toReturn.replaceAll("\\+", "%2B"); - toReturn = URLDecoder.decode(toReturn, "UTF-8"); - return toReturn.replaceAll("!.*$", ""); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - } } diff --git a/src/java/com/cloudera/sqoop/util/Jars.java b/src/java/com/cloudera/sqoop/util/Jars.java new file mode 100644 index 00000000..8c3d62b4 --- /dev/null +++ b/src/java/com/cloudera/sqoop/util/Jars.java @@ -0,0 +1,117 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.util; + +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.Enumeration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.shims.HadoopShim; + +/** + * Utility class; returns the locations of various jars. + */ +public final class Jars { + + public static final Log LOG = LogFactory.getLog( + Jars.class.getName()); + + private Jars() { + } + + /** + * @return the path to the main Sqoop jar. + */ + public static String getSqoopJarPath() { + return getJarPathForClass(Jars.class); + } + + /** + * @return the path to the currently-loaded shim jar. + */ + public static String getShimJarPath() { + HadoopShim h = HadoopShim.get(); + if (null == h) { + return null; + } + return getJarPathForClass(h.getClass()); + } + + /** + * Return the jar file path that contains a particular class. + * Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar(). + */ + public static String getJarPathForClass(Class classObj) { + ClassLoader loader = classObj.getClassLoader(); + String classFile = classObj.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(classFile); + itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + /** + * Return the path to the jar containing the JDBC driver + * for a ConnManager. + */ + public static String getDriverClassJar(ConnManager mgr) { + if (null == mgr) { + return null; + } + + String driverClassName = mgr.getDriverClass(); + if (null == driverClassName) { + return null; + } + + try { + Class driverClass = Class.forName(driverClassName); + return getJarPathForClass(driverClass); + } catch (ClassNotFoundException cnfe) { + LOG.warn("No such class " + driverClassName + " available."); + return null; + } + } +} +