diff --git a/assemblies/pom.xml b/assemblies/pom.xml
new file mode 100644
index 00000000..ebc79417
--- /dev/null
+++ b/assemblies/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache
+ sqoop
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.sqoop
+ sqoop-assemblies
+ Sqoop Assemblies
+
+
diff --git a/assemblies/src/main/resources/assemblies/sqoop-connector.xml b/assemblies/src/main/resources/assemblies/sqoop-connector.xml
new file mode 100644
index 00000000..3c3c5523
--- /dev/null
+++ b/assemblies/src/main/resources/assemblies/sqoop-connector.xml
@@ -0,0 +1,44 @@
+
+
+
+ sqoop-connector
+
+ jar
+
+ false
+
+
+ lib
+ false
+ false
+ true
+
+ *
+
+
+
+
+
+
+ ${basedir}/target/classes/
+ /
+
+ **/*
+
+
+
+
diff --git a/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java b/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
index 370de2a1..fa5ec7df 100644
--- a/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
+++ b/common/src/main/java/org/apache/sqoop/classloader/ConnectorClassLoader.java
@@ -125,11 +125,11 @@ public ByteCode(byte[] bytes,
private final ClassLoader parent;
private final List systemClasses;
- public ConnectorClassLoader(URL[] urls, ClassLoader parent,
+ public ConnectorClassLoader(URL url, ClassLoader parent,
List systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
- super(urls, parent);
+ super(new URL[] {url}, parent);
if (LOG.isDebugEnabled()) {
- LOG.debug("urls: " + Arrays.toString(urls));
+ LOG.debug("url: " + url);
LOG.debug("system classes: " + systemClasses);
}
this.parent = parent;
@@ -147,17 +147,17 @@ public ConnectorClassLoader(URL[] urls, ClassLoader parent,
LOG.info("system classes: " + this.systemClasses);
urlFactory = new ConnectorURLFactory(this);
- load(urls);
+ load(url);
}
- public ConnectorClassLoader(String classpath, ClassLoader parent,
+ public ConnectorClassLoader(String connectorJar, ClassLoader parent,
List systemClasses) throws IOException {
- this(constructUrlsFromClasspath(classpath), parent, systemClasses, true);
+ this(connectorJar, parent, systemClasses, true);
}
- public ConnectorClassLoader(String classpath, ClassLoader parent,
+ public ConnectorClassLoader(String connectorJar, ClassLoader parent,
List systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
- this(constructUrlsFromClasspath(classpath), parent, systemClasses, overrideDefaultSystemClasses);
+ this(new File(connectorJar).toURI().toURL(), parent, systemClasses, overrideDefaultSystemClasses);
}
static URL[] constructUrlsFromClasspath(String classpath)
@@ -485,50 +485,48 @@ private String resolve(String name) {
return resource;
}
- private void load(URL[] urls) throws IOException {
- for (URL url : urls) {
- String jarName = url.getPath();
- JarFile jarFile = null;
- try {
- jarFile = new JarFile(jarName);
- Manifest manifest = jarFile.getManifest();
+ private void load(URL connectorUrl) throws IOException {
+ String jarName = connectorUrl.getPath();
+ JarFile jarFile = null;
+ try {
+ jarFile = new JarFile(jarName);
+ Manifest manifest = jarFile.getManifest();
- Enumeration entryEnum = jarFile.entries();
- while (entryEnum.hasMoreElements()) {
- JarEntry entry = entryEnum.nextElement();
- if (entry.isDirectory()) {
- continue;
- }
-
- String entryName = entry.getName();
- InputStream is = jarFile.getInputStream(entry);
- if (is == null) {
- throw new IOException("Unable to load resource " + entryName);
- }
- try {
- if (entryName.startsWith(LIB_PREFIX)) {
- LOG.debug("Caching " + entryName);
- loadBytesFromJar(is, entryName);
- } else if (entryName.endsWith(CLASS)) {
- // A plain vanilla class file rooted at the top of the jar file.
- loadBytes(entry, is, "/", manifest);
- LOG.debug("Loaded class: " + jarFile.getName() + "!/" + entry.getName());
- } else {
- // A resource
- loadBytes(entry, is, "/", manifest);
- LOG.debug("Loaded resource: " + jarFile.getName() + "!/" + entry.getName());
- }
- } finally {
- is.close();
- }
+ Enumeration entryEnum = jarFile.entries();
+ while (entryEnum.hasMoreElements()) {
+ JarEntry entry = entryEnum.nextElement();
+ if (entry.isDirectory()) {
+ continue;
}
- } finally {
- if (jarFile != null) {
- try {
- jarFile.close();
- } catch (IOException e) {
- LOG.debug("Exception closing jarFile: " + jarName, e);
+
+ String entryName = entry.getName();
+ InputStream is = jarFile.getInputStream(entry);
+ if (is == null) {
+ throw new IOException("Unable to load resource " + entryName);
+ }
+ try {
+ if (entryName.startsWith(LIB_PREFIX)) {
+ LOG.debug("Caching " + entryName);
+ loadBytesFromJar(is, entryName);
+ } else if (entryName.endsWith(CLASS)) {
+ // A plain vanilla class file rooted at the top of the jar file.
+ loadBytes(entry, is, "/", manifest);
+ LOG.debug("Loaded class: " + jarFile.getName() + "!/" + entry.getName());
+ } else {
+ // A resource
+ loadBytes(entry, is, "/", manifest);
+ LOG.debug("Loaded resource: " + jarFile.getName() + "!/" + entry.getName());
}
+ } finally {
+ is.close();
+ }
+ }
+ } finally {
+ if (jarFile != null) {
+ try {
+ jarFile.close();
+ } catch (IOException e) {
+ LOG.debug("Exception closing jarFile: " + jarName, e);
}
}
}
diff --git a/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
index 2f17d95c..cc983686 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/ConnectorError.java
@@ -59,7 +59,12 @@ public enum ConnectorError implements ErrorCode {
+ "changed since it was registered previously."),
/** A connector is not assigned with a valid id yet. */
- CONN_0010("A connector is not assigned with a valid id yet");
+ CONN_0010("A connector is not assigned with a valid id yet"),
+
+ /** Failed to create ConnectorClassLoader. */
+ CONN_0011("Failed to create ConnectorClassLoader"),
+
+ ;
private final String message;
diff --git a/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java b/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
index 21e5c823..0ccc84a8 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/MRExecutionError.java
@@ -79,6 +79,9 @@ public enum MRExecutionError implements ErrorCode {
/** Got invalid number of partitions from Partitioner */
MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
+ /** Unable to find connector jar */
+ MAPRED_EXEC_0026("Unable to find connector jar"),
+
;
private final String message;
diff --git a/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java b/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
index ed689884..d26afdd9 100644
--- a/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
+++ b/common/src/main/java/org/apache/sqoop/utils/ClassUtils.java
@@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@@ -271,6 +272,25 @@ public static ClassLoader getClassLoader() {
return classLoader;
}
+ public static Object executeWithClassLoader(ClassLoader loader, Callable> callable) {
+ ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+ if (loader != null) {
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ if (e instanceof SqoopException) {
+ throw (SqoopException) e;
+ } else {
+ throw new SqoopException(CoreError.CORE_0000, e);
+ }
+ } finally {
+ // Restore the old context ClassLoader
+ Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+ }
+ }
+
private ClassUtils() {
// Disable explicit object creation
}
diff --git a/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java b/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
index 87edf3d2..be12d569 100644
--- a/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
+++ b/common/src/test/java/org/apache/sqoop/classloader/TestConnectorClassLoader.java
@@ -143,7 +143,7 @@ public void testGetResource() throws IOException {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
- new URL[] { testJar }, currentClassLoader, null, false);
+ testJar, currentClassLoader, null, false);
assertNull(currentClassLoader.getResourceAsStream("resource.txt"),
"Resource should be null for current classloader");
@@ -164,7 +164,7 @@ public void testGetResources() throws IOException {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
- new URL[] { testJar }, currentClassLoader, null, false);
+ testJar, currentClassLoader, null, false);
List resourceContents = new ArrayList();
resourceContents.add("hello A");
@@ -185,7 +185,7 @@ public void testLoadClass() throws Exception {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
- new URL[] { testJar }, currentClassLoader, null, false);
+ testJar, currentClassLoader, null, false);
try {
currentClassLoader.loadClass("A");
diff --git a/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java b/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
index ec48f823..fc24d9a6 100644
--- a/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
+++ b/common/src/test/java/org/apache/sqoop/utils/TestClassUtils.java
@@ -171,6 +171,7 @@ public void testLoadClass() {
@Test
public void testLoadClassWithClassLoader() throws Exception {
String classpath = ClassUtils.jarForClass(testAClass);
+ classpath = classpath.startsWith("file:") ? classpath.substring("file:".length()) : classpath;
assertNotEquals(testAClass, ClassUtils.loadClassWithClassLoader(testAClass.getName(),
new ConnectorClassLoader(classpath, getClass().getClassLoader(), Arrays.asList("java."))));
}
diff --git a/connector/connector-ftp/pom.xml b/connector/connector-ftp/pom.xml
index 41ea0264..a3302667 100644
--- a/connector/connector-ftp/pom.xml
+++ b/connector/connector-ftp/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
@@ -66,10 +67,18 @@ limitations under the License.
- org.slf4j
- slf4j-log4j12
- ${slf4j.version}
-
+ org.slf4j
+ slf4j-log4j12
+ ${slf4j.version}
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
diff --git a/connector/connector-generic-jdbc/pom.xml b/connector/connector-generic-jdbc/pom.xml
index 052f06d4..8d054c1a 100644
--- a/connector/connector-generic-jdbc/pom.xml
+++ b/connector/connector-generic-jdbc/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
@@ -60,4 +61,12 @@ limitations under the License.
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 022a024d..d6957508 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
@@ -68,4 +69,12 @@ limitations under the License.
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
diff --git a/connector/connector-kafka/pom.xml b/connector/connector-kafka/pom.xml
index e0f06846..5f411814 100644
--- a/connector/connector-kafka/pom.xml
+++ b/connector/connector-kafka/pom.xml
@@ -33,6 +33,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
org.apache.kafka
@@ -41,6 +42,7 @@ limitations under the License.
org.apache.sqoop
sqoop-common-test
+ test
@@ -50,4 +52,12 @@ limitations under the License.
-
\ No newline at end of file
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
+
diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
index 923d1aa5..7b1f5686 100644
--- a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
+++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
@@ -22,9 +22,6 @@
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.Set;
public class KafkaToInitializer extends Initializer {
@@ -34,18 +31,4 @@ public class KafkaToInitializer extends Initializer getJars(InitializerContext context, LinkConfiguration
- linkConfiguration, ToJobConfiguration toJobConfiguration) {
- Set jars = super.getJars(context, linkConfiguration, toJobConfiguration);
- // Jars for Kafka, Scala and Yammer (required by Kafka)
- jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer"));
- jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike"));
- jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics"));
- return jars;
- }
-
-
}
diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml
index d8eaa8ea..07924453 100644
--- a/connector/connector-kite/pom.xml
+++ b/connector/connector-kite/pom.xml
@@ -36,10 +36,12 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
org.apache.hadoop
hadoop-common
+ provided
org.apache.hive
@@ -97,4 +99,12 @@ limitations under the License.
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
index 28c5bac2..4502d592 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
@@ -56,24 +56,11 @@ public void initialize(InitializerContext context,
public Set getJars(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
Set jars = super.getJars(context, linkConfig, fromJobConfig);
- jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets"));
- jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
- jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
- jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
- jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
- jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
- jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
- jars.add(ClassUtils.jarForClass("parquet.Version"));
- jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
- jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
- jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
- jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
+
if (fromJobConfig.fromJobConfig.uri.startsWith("dataset:hive")) {
// @TODO(Abe): Remove a deps that aren't used?
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
- jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
- jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index 50daba08..effab196 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -19,7 +19,6 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -59,28 +58,11 @@ public void initialize(InitializerContext context,
public Set getJars(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
Set jars = super.getJars(context, linkConfig, toJobConfig);
- jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
- jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
- jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
- if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
- jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
- }
- if (FileFormat.PARQUET.equals(toJobConfig.toJobConfig.fileFormat)) {
- jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
- jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
- jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
- jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
- jars.add(ClassUtils.jarForClass("parquet.Version"));
- jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
- jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
- jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
- }
+
if (toJobConfig.toJobConfig.uri.startsWith("dataset:hive")) {
// @TODO(Abe): Remove a deps that aren't used?
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
- jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
- jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));
diff --git a/connector/connector-oracle-jdbc/pom.xml b/connector/connector-oracle-jdbc/pom.xml
index 8186b3ab..4262cb22 100644
--- a/connector/connector-oracle-jdbc/pom.xml
+++ b/connector/connector-oracle-jdbc/pom.xml
@@ -36,6 +36,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
@@ -125,6 +126,11 @@ limitations under the License.
none
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 85ba8be1..6733906c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -37,6 +37,16 @@
@InterfaceStability.Evolving
public abstract class SqoopConnector {
+ private String connectorName;
+
+ public void setConnectorName(String connectorName) {
+ this.connectorName = connectorName;
+ }
+
+ public String getConnectorName() {
+ return this.connectorName;
+ }
+
/**
* Retrieve connector version.
*
diff --git a/connector/connector-sftp/pom.xml b/connector/connector-sftp/pom.xml
index 312ac61d..8db1af50 100644
--- a/connector/connector-sftp/pom.xml
+++ b/connector/connector-sftp/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
org.apache.sqoop
connector-sdk
+ provided
@@ -49,4 +50,12 @@ limitations under the License.
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
index bfb51ac9..7f2fde1b 100644
--- a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
+++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java
@@ -23,9 +23,6 @@
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.Set;
/**
* Perform any required initialization before execution of job.
@@ -43,19 +40,4 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
LOG.info("Running SFTP Connector TO initializer.");
// do nothing at this point
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Set getJars(InitializerContext context,
- LinkConfiguration linkConfiguration,
- ToJobConfiguration toJobConfiguration) {
- Set jars =
- super.getJars(context, linkConfiguration, toJobConfiguration);
- // Jar for jsch library:
- jars.add(ClassUtils.jarForClass("com.jcraft.jsch.JSch"));
- return jars;
- }
-
}
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index 1899bb7f..367bb806 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -71,13 +71,18 @@ public ConnectorHandler(URL configFileUrl) {
throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
}
- Class> connectorClass = ClassUtils.loadClass(connectorClassName);
+ String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
+ ClassLoader connectorClassLoader = ConnectorManagerUtils
+ .createConnectorClassLoader(connectorJarPath, null);
+ Class> connectorClass = ClassUtils.loadClassWithClassLoader(
+ connectorClassName, connectorClassLoader);
if(connectorClass == null) {
throw new SqoopException(ConnectorError.CONN_0005, connectorClassName);
}
try {
connector = (SqoopConnector) connectorClass.newInstance();
+ connector.setConnectorName(connectorUniqueName);
} catch (IllegalAccessException ex) {
throw new SqoopException(ConnectorError.CONN_0005,
connectorClassName, ex);
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index f19f391d..5663445f 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -159,7 +159,11 @@ public SqoopConnector getSqoopConnector(long connectorId) {
}
public SqoopConnector getSqoopConnector(String uniqueName) {
- return handlerMap.get(uniqueName).getSqoopConnector();
+ if (handlerMap != null && handlerMap.get(uniqueName) != null) {
+ return handlerMap.get(uniqueName).getSqoopConnector();
+ } else {
+ return null;
+ }
}
public synchronized void initialize() {
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
index 9f9be575..064da356 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
@@ -17,14 +17,17 @@
*/
package org.apache.sqoop.connector;
+import org.apache.sqoop.classloader.ConnectorClassLoader;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.error.code.ConnectorError;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -96,4 +99,27 @@ static boolean isConnectorJar(File file) {
throw new RuntimeException(e);
}
}
+
+ public static String getConnectorJarPath(URL configFileUrl) {
+ return configFileUrl.getPath().substring("file:".length(),
+ configFileUrl.getPath().length() -
+ ("!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES).length());
+ }
+
+ public static ClassLoader createConnectorClassLoader(
+ final String connectorJarPath, final List systemClasses) {
+ try {
+ return AccessController.doPrivileged(
+ new PrivilegedExceptionAction() {
+ @Override
+ public ClassLoader run() throws IOException {
+ return new ConnectorClassLoader(connectorJarPath,
+ Thread.currentThread().getContextClassLoader(),
+ systemClasses, false);
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ throw new SqoopException(ConnectorError.CONN_0011, e);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index d3a750ea..39a02600 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
@@ -509,10 +510,19 @@ MJob getJob(String jobName) {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void initializeConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ private void initializeConnector(final JobRequest jobRequest, final Direction direction,
+ final Initializer initializer, final InitializerContext initializerContext) {
// Initialize submission from the connector perspective
- initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
- jobRequest.getJobConfig(direction));
+ ClassUtils.executeWithClassLoader(initializer.getClass().getClassLoader(),
+ new Callable() {
+ @Override
+ public Void call() {
+ initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
+ jobRequest.getJobConfig(direction));
+ return null;
+ }
+ });
}
@SuppressWarnings({ "unchecked", "rawtypes" })
diff --git a/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java b/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
index 423b3df3..89ca60f7 100644
--- a/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
+++ b/core/src/test/java/org/apache/sqoop/connector/TestConnectorManagerUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.sqoop.connector;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -71,4 +72,9 @@ public void testIsBlacklisted() throws Exception {
assertFalse(ConnectorManagerUtils.isBlacklisted(url, blacklistedConnectors));
}
+ @Test
+ public void testGetConnectorJarPath() throws Exception {
+ URL url = new URL("jar:file:" + testConnectorPath + "!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
+ assertEquals(ConnectorManagerUtils.getConnectorJarPath(url), testConnectorPath);
+ }
}
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 3acd4a18..d6f0ff75 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -81,6 +81,11 @@ public void prepareJob(JobRequest jobRequest) {
if(mrJobRequest.getExtractors() != null) {
context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
+
+ context.setString(MRJobConstants.JOB_CONNECTOR_FROM_NAME,
+ mrJobRequest.getConnector(Direction.FROM).getConnectorName());
+ context.setString(MRJobConstants.JOB_CONNECTOR_TO_NAME,
+ mrJobRequest.getConnector(Direction.TO).getConnectorName());
}
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
index 737ceda6..89681983 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -66,6 +66,12 @@ public final class MRJobConstants extends Constants {
public static final String PREFIX_CONNECTOR_TO_CONTEXT =
PREFIX_JOB_CONFIG + "connector.to.context.";
+ public static final String JOB_CONNECTOR_FROM_NAME = PREFIX_JOB_CONFIG
+ + "connector.from.name";
+
+ public static final String JOB_CONNECTOR_TO_NAME = PREFIX_JOB_CONFIG
+ + "connector.to.name";
+
// Hadoop specific constants
// We're using constants from Hadoop 1. Hadoop 2 has different names, but
// provides backward compatibility layer for those names as well.
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java
new file mode 100644
index 00000000..3e557784
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManagerUtils;
+import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.error.code.MRExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+
+public final class MRUtils {
+ private static ClassLoader fromConnectorClassLoader;
+ private static ClassLoader toConnectorClassLoader;
+
+ private static boolean connectorClassLoaderInited = false;
+
+ public static synchronized void initConnectorClassLoaders(Configuration conf) {
+ if (connectorClassLoaderInited) {
+ return;
+ }
+
+ // Create ConnectorClassLoader for from connector
+ String partitionClass = conf.get(MRJobConstants.JOB_ETL_PARTITION);
+ String fromConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_FROM_NAME);
+ if (!StringUtils.isBlank(fromConnectorName)) {
+ fromConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
+ getConnectorJarName(fromConnectorName), Arrays.asList(partitionClass));
+ }
+
+ // Create ConnectorClassLoader for to connector
+ String toConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_TO_NAME);
+ if (!StringUtils.isBlank(toConnectorName)) {
+ toConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
+ getConnectorJarName(toConnectorName), null);
+ }
+
+ connectorClassLoaderInited = true;
+ }
+
+ public static ClassLoader getConnectorClassLoader(Direction direction) {
+ switch (direction) {
+ case FROM:
+ return fromConnectorClassLoader;
+ case TO:
+ return toConnectorClassLoader;
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String getConnectorJarName(String connectorName) {
+ List configFileUrls = ConnectorManagerUtils.getConnectorConfigs(Collections.EMPTY_SET);
+ try {
+ for (URL configFileUrl : configFileUrls) {
+ Properties properties = new Properties();
+ properties.load(configFileUrl.openStream());
+ if (connectorName.equals(properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME))) {
+ String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
+ return connectorJarPath.substring(connectorJarPath.lastIndexOf("/") + 1);
+ }
+ }
+ } catch (IOException e) {
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName, e);
+ }
+
+ throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName);
+ }
+}
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 0623f7b6..c471ae9d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -32,6 +33,8 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.Partition;
@@ -54,9 +57,29 @@ public RecordReader createRecordReader(
return new SqoopRecordReader();
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings("unchecked")
@Override
- public List getSplits(JobContext context)
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ public List getSplits(final JobContext context)
+ throws IOException, InterruptedException {
+ SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
+ context.getConfiguration().get(MRJobConstants.JOB_CONNECTOR_FROM_NAME));
+ ClassLoader loader = null;
+ if (connector != null) {
+ loader = connector.getClass().getClassLoader();
+ }
+
+ return (List) ClassUtils.executeWithClassLoader(loader,
+ new Callable>() {
+ @Override
+ public List call() throws IOException, InterruptedException {
+ return getSplitsInternal(context);
+ }
+ });
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private List getSplitsInternal(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7d209925..6db645ed 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -18,13 +18,13 @@
package org.apache.sqoop.job.mr;
import java.io.IOException;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
@@ -61,9 +61,24 @@ public class SqoopMapper extends Mapper toIDF = null;
private Matcher matcher;
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void run(Context context) throws IOException, InterruptedException {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ public void run(final Context context) throws IOException, InterruptedException {
+ // Set context ClassLoader for this thread to the ClassLoader for from connector
+ MRUtils.initConnectorClassLoaders(context.getConfiguration());
+
+ ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
+ new Callable() {
+ @Override
+ public Void call() throws IOException, InterruptedException {
+ runInternal(context);
+ return null;
+ }
+ });
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void runInternal(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 8c8526bb..9fb81552 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.sqoop.job.mr;
import java.io.IOException;
+import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -32,6 +33,7 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.utils.ClassUtils;
/**
* An output format for MapReduce job.
@@ -58,7 +60,8 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
private static class SqoopDestroyerOutputCommitter extends OutputCommitter {
@Override
- public void setupJob(JobContext jobContext) {
+ public void setupJob(JobContext jobContext) throws IOException {
+ MRUtils.initConnectorClassLoaders(jobContext.getConfiguration());
}
@Override
@@ -73,10 +76,26 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce
invokeDestroyerExecutor(jobContext, false);
}
- private void invokeDestroyerExecutor(JobContext jobContext, boolean success) {
- Configuration config = jobContext.getConfiguration();
- SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
- SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ private void invokeDestroyerExecutor(final JobContext jobContext, final boolean success) {
+ final Configuration config = jobContext.getConfiguration();
+
+ ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
+ new Callable() {
+ @Override
+ public Void call() throws IOException, InterruptedException {
+ SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+ return null;
+ }
+ });
+ ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.TO),
+ new Callable() {
+ @Override
+ public Void call() throws IOException, InterruptedException {
+ SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+ return null;
+ }
+ });
}
@Override
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 623d1f4e..b2839822 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -243,32 +243,43 @@ public ConsumerThread(final JobContext context) {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
- DataReader reader = new SqoopOutputFormatDataReader();
- Configuration conf = context.getConfiguration();
- Loader loader = (Loader) ClassUtils.instantiate(loaderName);
+ final DataReader reader = new SqoopOutputFormatDataReader();
+ final Configuration conf = context.getConfiguration();
- // Objects that should be passed to the Loader
- PrefixContext subContext = new PrefixContext(conf,
- MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
- Object connectorLinkConfig = MRConfigurationUtils
- .getConnectorLinkConfig(Direction.TO, conf);
- Object connectorToJobConfig = MRConfigurationUtils
- .getConnectorJobConfig(Direction.TO, conf);
- // Using the TO schema since the SqoopDataWriter in the SqoopMapper
- // encapsulates the toDataFormat
+ // Set context ClassLoader for this thread to the ClassLoader for to connector
+ MRUtils.initConnectorClassLoaders(conf);
+ ClassLoader classLoader = MRUtils.getConnectorClassLoader(Direction.TO);
+ ClassUtils.executeWithClassLoader(classLoader,
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ Loader loader = (Loader) ClassUtils.instantiate(loaderName);
- // Create loader context
- LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema(), context.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+ // Objects that should be passed to the Loader
+ PrefixContext subContext = new PrefixContext(conf,
+ MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+ Object connectorLinkConfig = MRConfigurationUtils
+ .getConnectorLinkConfig(Direction.TO, conf);
+ Object connectorToJobConfig = MRConfigurationUtils
+ .getConnectorJobConfig(Direction.TO, conf);
+ // Using the TO schema since the SqoopDataWriter in the SqoopMapper
+ // encapsulates the toDataFormat
- LOG.info("Running loader class " + loaderName);
- loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
- LOG.info("Loader has finished");
- ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
- loader.getRowsWritten());
+ // Create loader context
+ LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema(), context.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
+ LOG.info("Running loader class " + loaderName);
+ loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
+ LOG.info("Loader has finished");
+ ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
+ loader.getRowsWritten());
+ return null;
+ }
+ });
} catch (Throwable t) {
readerFinished = true;
LOG.error("Error while loading data out of MR job.", t);
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 24636434..46976120 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -66,6 +66,7 @@ public class TestMapReduce {
public void testSqoopInputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
@@ -86,6 +87,7 @@ public void testSqoopInputFormat() throws Exception {
public void testSqoopMapper() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
@@ -105,6 +107,7 @@ public void testSqoopMapper() throws Exception {
public void testNullOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index d0b41d17..75a989d5 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -127,6 +127,7 @@ public static Object[][] data() {
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 3dee8f6b..58dabbb8 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -182,7 +182,8 @@ public void testWhenLoaderThrows() throws Throwable {
writer.write(writable, null);
}
} catch (SqoopException ex) {
- throw ex.getCause();
+ Assert.assertTrue(ex.getCause() instanceof SqoopException);
+ throw ex.getCause().getCause();
}
}
@@ -267,7 +268,8 @@ public void testThrowingContinuousLoader() throws Throwable {
}
writer.close(null);
} catch (SqoopException ex) {
- throw ex.getCause();
+ Assert.assertTrue(ex.getCause() instanceof SqoopException);
+ throw ex.getCause().getCause();
}
}
diff --git a/pom.xml b/pom.xml
index 460273a1..d7b0dd5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,8 @@ limitations under the License.
2.4.0
1.7
2.4.0
+
+ 2.6
@@ -697,6 +699,7 @@ limitations under the License.
+ assemblies
common
common-test
core
@@ -803,6 +806,34 @@ limitations under the License.
maven-bundle-plugin
${felix.version}
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+
+ org.apache.sqoop
+ sqoop-assemblies
+ ${project.version}
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+ ${project.artifactId}-${project.version}
+ false
+
+ sqoop-connector
+
+
+
+
+
diff --git a/server/pom.xml b/server/pom.xml
index 370a6a2c..c24183c3 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -80,36 +80,78 @@ limitations under the License.
org.apache.sqoop.connector
sqoop-connector-generic-jdbc
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-hdfs
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-kite
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-kafka
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-sftp
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-ftp
+
+
+ *
+ *
+
+
org.apache.sqoop.connector
sqoop-connector-oracle-jdbc
+
+
+ *
+ *
+
+
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index f396783b..c03bf39a 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -47,7 +47,6 @@
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.SubmissionError;
-import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;