mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 00:10:26 +08:00
Shims can be loaded from the classpath, or jars
CompilationManager now packs the active shim jar into the job jar. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a7dcdeafb
commit
799f7e9070
@ -43,6 +43,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.sqoop.SqoopOptions;
|
import org.apache.hadoop.sqoop.SqoopOptions;
|
||||||
import org.apache.hadoop.sqoop.util.FileListing;
|
import org.apache.hadoop.sqoop.util.FileListing;
|
||||||
|
import org.apache.hadoop.sqoop.shims.HadoopShim;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages the compilation of a bunch of .java files into .class files
|
* Manages the compilation of a bunch of .java files into .class files
|
||||||
@ -289,18 +290,21 @@ public void jar() throws IOException {
|
|||||||
// put our own jar in there in its lib/ subdir
|
// put our own jar in there in its lib/ subdir
|
||||||
String thisJarFile = findThisJar();
|
String thisJarFile = findThisJar();
|
||||||
if (null != thisJarFile) {
|
if (null != thisJarFile) {
|
||||||
File thisJarFileObj = new File(thisJarFile);
|
addLibJar(thisJarFile, jstream);
|
||||||
String thisJarBasename = thisJarFileObj.getName();
|
|
||||||
String thisJarEntryName = "lib" + File.separator + thisJarBasename;
|
|
||||||
ZipEntry ze = new ZipEntry(thisJarEntryName);
|
|
||||||
jstream.putNextEntry(ze);
|
|
||||||
copyFileToStream(thisJarFileObj, jstream);
|
|
||||||
jstream.closeEntry();
|
|
||||||
} else {
|
} else {
|
||||||
// couldn't find our own jar (we were running from .class files?)
|
// couldn't find our own jar (we were running from .class files?)
|
||||||
LOG.warn("Could not find jar for Sqoop; MapReduce jobs may not run correctly.");
|
LOG.warn("Could not find jar for Sqoop; MapReduce jobs may not run correctly.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String shimJarFile = findShimJar();
|
||||||
|
if (null != shimJarFile) {
|
||||||
|
addLibJar(shimJarFile, jstream);
|
||||||
|
} else {
|
||||||
|
// Couldn't find the shim jar.
|
||||||
|
LOG.warn("Could not find jar for Sqoop shims.");
|
||||||
|
LOG.warn("MapReduce jobs may not run correctly.");
|
||||||
|
}
|
||||||
|
|
||||||
jstream.finish();
|
jstream.finish();
|
||||||
} finally {
|
} finally {
|
||||||
if (null != jstream) {
|
if (null != jstream) {
|
||||||
@ -323,6 +327,22 @@ public void jar() throws IOException {
|
|||||||
LOG.debug("Finished writing jar file " + jarFilename);
|
LOG.debug("Finished writing jar file " + jarFilename);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a jar in the lib/ directory of a JarOutputStream we're building.
|
||||||
|
* @param jarFilename the source jar file to include.
|
||||||
|
* @param jstream the JarOutputStream to write to.
|
||||||
|
*/
|
||||||
|
private void addLibJar(String jarFilename, JarOutputStream jstream)
|
||||||
|
throws IOException {
|
||||||
|
File thisJarFileObj = new File(jarFilename);
|
||||||
|
String thisJarBasename = thisJarFileObj.getName();
|
||||||
|
String thisJarEntryName = "lib" + File.separator + thisJarBasename;
|
||||||
|
ZipEntry ze = new ZipEntry(thisJarEntryName);
|
||||||
|
jstream.putNextEntry(ze);
|
||||||
|
copyFileToStream(thisJarFileObj, jstream);
|
||||||
|
jstream.closeEntry();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final int BUFFER_SZ = 4096;
|
private static final int BUFFER_SZ = 4096;
|
||||||
|
|
||||||
@ -353,6 +373,14 @@ private String findThisJar() {
|
|||||||
return findJarForClass(CompilationManager.class);
|
return findJarForClass(CompilationManager.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String findShimJar() {
|
||||||
|
HadoopShim h = HadoopShim.get();
|
||||||
|
if (null == h) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return findJarForClass(h.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
// method mostly cloned from o.a.h.mapred.JobConf.findContainingJar()
|
// method mostly cloned from o.a.h.mapred.JobConf.findContainingJar()
|
||||||
private String findJarForClass(Class<? extends Object> classObj) {
|
private String findJarForClass(Class<? extends Object> classObj) {
|
||||||
ClassLoader loader = classObj.getClassLoader();
|
ClassLoader loader = classObj.getClassLoader();
|
||||||
|
@ -136,22 +136,33 @@ private static <T> T loadShim(List<String> matchExprs,
|
|||||||
if (version.matches(matchExprs.get(i))) {
|
if (version.matches(matchExprs.get(i))) {
|
||||||
String className = classNames.get(i);
|
String className = classNames.get(i);
|
||||||
String jarPattern = jarPatterns.get(i);
|
String jarPattern = jarPatterns.get(i);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Version matched regular expression: " + matchExprs.get(i));
|
LOG.debug("Version matched regular expression: " + matchExprs.get(i));
|
||||||
LOG.debug("Searching for jar matching: " + jarPattern);
|
|
||||||
LOG.debug("Trying to load class: " + className);
|
LOG.debug("Trying to load class: " + className);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test to see if the class is already on the classpath.
|
||||||
try {
|
try {
|
||||||
|
// If we can load the shim directly, we just do so. In this case,
|
||||||
|
// there's no need to update the Configuration's classloader,
|
||||||
|
// because we didn't modify the classloader stack.
|
||||||
|
return getShimInstance(className, xface);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Not already present. We'll need to load a jar for this.
|
||||||
|
// Ignore this exception.
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.debug("Searching for jar matching: " + jarPattern);
|
||||||
loadMatchingShimJar(jarPattern, className);
|
loadMatchingShimJar(jarPattern, className);
|
||||||
ClassLoader cl = Thread.currentThread().getContextClassLoader();
|
LOG.debug("Loading shim from jar");
|
||||||
Class clazz = Class.forName(className, true, cl);
|
T shim = getShimInstance(className, xface);
|
||||||
T shim = xface.cast(clazz.newInstance());
|
|
||||||
|
|
||||||
if (null != conf) {
|
if (null != conf) {
|
||||||
// Set the context classloader for the base Configuration to
|
// Set the context classloader for the base Configuration to
|
||||||
// the current one, so we can load more classes from the shim jar.
|
// the current one, so we can load more classes from the shim jar.
|
||||||
conf.setClassLoader(cl);
|
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
|
||||||
}
|
}
|
||||||
|
|
||||||
return shim;
|
return shim;
|
||||||
@ -166,6 +177,22 @@ private static <T> T loadShim(List<String> matchExprs,
|
|||||||
+ version);
|
+ version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check the current classloader to see if it can load the prescribed
|
||||||
|
* class name as an instance of 'xface'. If so, create an instance of
|
||||||
|
* the class and return it.
|
||||||
|
* @param className the shim class to attempt to instantiate.
|
||||||
|
* @param xface the interface it must implement.
|
||||||
|
* @return an instance of className.
|
||||||
|
*/
|
||||||
|
private static <T> T getShimInstance(String className, Class<T> xface)
|
||||||
|
throws ClassNotFoundException, InstantiationException,
|
||||||
|
IllegalAccessException {
|
||||||
|
ClassLoader cl = Thread.currentThread().getContextClassLoader();
|
||||||
|
Class clazz = Class.forName(className, true, cl);
|
||||||
|
return xface.cast(clazz.newInstance());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Look through the shim directory for a jar matching 'jarPattern'
|
* Look through the shim directory for a jar matching 'jarPattern'
|
||||||
* and classload it.
|
* and classload it.
|
||||||
|
Loading…
Reference in New Issue
Block a user