5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 00:43:42 +08:00

SQOOP-2634: Sqoop2: Provide classpath isolation for connectors and its dependencies

(Dian Fu via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-12-23 11:21:15 +01:00
parent f4909a5634
commit c60d44f940
38 changed files with 578 additions and 164 deletions

34
assemblies/pom.xml Normal file
View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
<artifactId>sqoop</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-assemblies</artifactId>
<name>Sqoop Assemblies</name>
</project>

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>sqoop-connector</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
<useProjectArtifact>false</useProjectArtifact>
<useStrictFiltering>true</useStrictFiltering>
<includes>
<include>*</include>
</includes>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>
<directory>${basedir}/target/classes/</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>
</assembly>

View File

@ -125,11 +125,11 @@ public ByteCode(byte[] bytes,
private final ClassLoader parent;
private final List<String> systemClasses;
public ConnectorClassLoader(URL[] urls, ClassLoader parent,
public ConnectorClassLoader(URL url, ClassLoader parent,
List<String> systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
super(urls, parent);
super(new URL[] {url}, parent);
if (LOG.isDebugEnabled()) {
LOG.debug("urls: " + Arrays.toString(urls));
LOG.debug("url: " + url);
LOG.debug("system classes: " + systemClasses);
}
this.parent = parent;
@ -147,17 +147,17 @@ public ConnectorClassLoader(URL[] urls, ClassLoader parent,
LOG.info("system classes: " + this.systemClasses);
urlFactory = new ConnectorURLFactory(this);
load(urls);
load(url);
}
public ConnectorClassLoader(String classpath, ClassLoader parent,
public ConnectorClassLoader(String connectorJar, ClassLoader parent,
List<String> systemClasses) throws IOException {
this(constructUrlsFromClasspath(classpath), parent, systemClasses, true);
this(connectorJar, parent, systemClasses, true);
}
public ConnectorClassLoader(String classpath, ClassLoader parent,
public ConnectorClassLoader(String connectorJar, ClassLoader parent,
List<String> systemClasses, boolean overrideDefaultSystemClasses) throws IOException {
this(constructUrlsFromClasspath(classpath), parent, systemClasses, overrideDefaultSystemClasses);
this(new File(connectorJar).toURI().toURL(), parent, systemClasses, overrideDefaultSystemClasses);
}
static URL[] constructUrlsFromClasspath(String classpath)
@ -485,9 +485,8 @@ private String resolve(String name) {
return resource;
}
private void load(URL[] urls) throws IOException {
for (URL url : urls) {
String jarName = url.getPath();
private void load(URL connectorUrl) throws IOException {
String jarName = connectorUrl.getPath();
JarFile jarFile = null;
try {
jarFile = new JarFile(jarName);
@ -532,7 +531,6 @@ private void load(URL[] urls) throws IOException {
}
}
}
}
private void loadBytesFromJar(InputStream is, String jar) throws IOException {
JarInputStream jis = new JarInputStream(is);

View File

@ -59,7 +59,12 @@ public enum ConnectorError implements ErrorCode {
+ "changed since it was registered previously."),
/** A connector is not assigned with a valid id yet. */
CONN_0010("A connector is not assigned with a valid id yet");
CONN_0010("A connector is not assigned with a valid id yet"),
/** Failed to create ConnectorClassLoader. */
CONN_0011("Failed to create ConnectorClassLoader"),
;
private final String message;

View File

@ -79,6 +79,9 @@ public enum MRExecutionError implements ErrorCode {
/** Got invalid number of partitions from Partitioner */
MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
/** Unable to find connector jar */
MAPRED_EXEC_0026("Unable to find connector jar"),
;
private final String message;

View File

@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ -271,6 +272,25 @@ public static ClassLoader getClassLoader() {
return classLoader;
}
public static Object executeWithClassLoader(ClassLoader loader, Callable<?> callable) {
ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader();
if (loader != null) {
Thread.currentThread().setContextClassLoader(loader);
}
try {
return callable.call();
} catch (Exception e) {
if (e instanceof SqoopException) {
throw (SqoopException) e;
} else {
throw new SqoopException(CoreError.CORE_0000, e);
}
} finally {
// Restore the old context ClassLoader
Thread.currentThread().setContextClassLoader(oldContextClassLoader);
}
}
private ClassUtils() {
// Disable explicit object creation
}

View File

@ -143,7 +143,7 @@ public void testGetResource() throws IOException {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
new URL[] { testJar }, currentClassLoader, null, false);
testJar, currentClassLoader, null, false);
assertNull(currentClassLoader.getResourceAsStream("resource.txt"),
"Resource should be null for current classloader");
@ -164,7 +164,7 @@ public void testGetResources() throws IOException {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
new URL[] { testJar }, currentClassLoader, null, false);
testJar, currentClassLoader, null, false);
List<String> resourceContents = new ArrayList<String>();
resourceContents.add("hello A");
@ -185,7 +185,7 @@ public void testLoadClass() throws Exception {
ClassLoader currentClassLoader = getClass().getClassLoader();
ClassLoader connectorClassloader = new ConnectorClassLoader(
new URL[] { testJar }, currentClassLoader, null, false);
testJar, currentClassLoader, null, false);
try {
currentClassLoader.loadClass("A");

View File

@ -171,6 +171,7 @@ public void testLoadClass() {
@Test
public void testLoadClassWithClassLoader() throws Exception {
String classpath = ClassUtils.jarForClass(testAClass);
classpath = classpath.startsWith("file:") ? classpath.substring("file:".length()) : classpath;
assertNotEquals(testAClass, ClassUtils.loadClassWithClassLoader(testAClass.getName(),
new ConnectorClassLoader(classpath, getClass().getClassLoader(), Arrays.asList("java."))));
}

View File

@ -44,6 +44,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -72,4 +73,12 @@ limitations under the License.
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -36,6 +36,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -60,4 +61,12 @@ limitations under the License.
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -40,6 +40,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -68,4 +69,12 @@ limitations under the License.
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -33,6 +33,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
@ -41,6 +42,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@ -50,4 +52,12 @@ limitations under the License.
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -22,9 +22,6 @@
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
import java.util.Set;
public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfiguration> {
@ -34,18 +31,4 @@ public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfi
public void initialize(InitializerContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) {
LOG.info("Running Kafka Connector initializer. This does nothing except log this message.");
}
@Override
public Set<String> getJars(InitializerContext context, LinkConfiguration
linkConfiguration, ToJobConfiguration toJobConfiguration) {
Set<String> jars = super.getJars(context, linkConfiguration, toJobConfiguration);
// Jars for Kafka, Scala and Yammer (required by Kafka)
jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer"));
jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike"));
jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics"));
return jars;
}
}

View File

@ -36,10 +36,12 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
@ -97,4 +99,12 @@ limitations under the License.
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -56,24 +56,11 @@ public void initialize(InitializerContext context,
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
Set<String> jars = super.getJars(context, linkConfig, fromJobConfig);
jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
jars.add(ClassUtils.jarForClass("parquet.Version"));
jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
if (fromJobConfig.fromJobConfig.uri.startsWith("dataset:hive")) {
// @TODO(Abe): Remove a deps that aren't used?
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));

View File

@ -19,7 +19,6 @@
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@ -59,28 +58,11 @@ public void initialize(InitializerContext context,
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
Set<String> jars = super.getJars(context, linkConfig, toJobConfig);
jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
}
if (FileFormat.PARQUET.equals(toJobConfig.toJobConfig.fileFormat)) {
jars.add(ClassUtils.jarForClass("parquet.hadoop.metadata.CompressionCodecName"));
jars.add(ClassUtils.jarForClass("parquet.format.CompressionCodec"));
jars.add(ClassUtils.jarForClass("parquet.avro.AvroParquetWriter"));
jars.add(ClassUtils.jarForClass("parquet.column.ParquetProperties"));
jars.add(ClassUtils.jarForClass("parquet.Version"));
jars.add(ClassUtils.jarForClass("parquet.org.codehaus.jackson.type.TypeReference"));
jars.add(ClassUtils.jarForClass("parquet.bytes.CapacityByteArrayOutputStream"));
jars.add(ClassUtils.jarForClass("parquet.encoding.Generator"));
}
if (toJobConfig.toJobConfig.uri.startsWith("dataset:hive")) {
// @TODO(Abe): Remove a deps that aren't used?
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.conf.HiveConf"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.serde2.SerDe"));
jars.add(ClassUtils.jarForClass("org.kitesdk.data.spi.hive.MetaStoreUtil"));
jars.add(ClassUtils.jarForClass("org.kitesdk.compat.DynConstructors"));
jars.add(ClassUtils.jarForClass("org.apache.hadoop.hive.metastore.Warehouse"));
jars.add(ClassUtils.jarForClass("org.apache.hive.common.HiveCompat"));
jars.add(ClassUtils.jarForClass("com.facebook.fb303.FacebookService"));

View File

@ -36,6 +36,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<!-- Test dependencies -->
@ -125,6 +126,11 @@ limitations under the License.
<excludedGroups>none</excludedGroups>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>

View File

@ -37,6 +37,16 @@
@InterfaceStability.Evolving
public abstract class SqoopConnector {
private String connectorName;
public void setConnectorName(String connectorName) {
this.connectorName = connectorName;
}
public String getConnectorName() {
return this.connectorName;
}
/**
* Retrieve connector version.
*

View File

@ -40,6 +40,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -49,4 +50,12 @@ limitations under the License.
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -23,9 +23,6 @@
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
import java.util.Set;
/**
* Perform any required initialization before execution of job.
@ -43,19 +40,4 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
LOG.info("Running SFTP Connector TO initializer.");
// do nothing at this point
}
/**
* {@inheritDoc}
*/
@Override
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfiguration,
ToJobConfiguration toJobConfiguration) {
Set<String> jars =
super.getJars(context, linkConfiguration, toJobConfiguration);
// Jar for jsch library:
jars.add(ClassUtils.jarForClass("com.jcraft.jsch.JSch"));
return jars;
}
}

View File

@ -71,13 +71,18 @@ public ConnectorHandler(URL configFileUrl) {
throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
}
Class<?> connectorClass = ClassUtils.loadClass(connectorClassName);
String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
ClassLoader connectorClassLoader = ConnectorManagerUtils
.createConnectorClassLoader(connectorJarPath, null);
Class<?> connectorClass = ClassUtils.loadClassWithClassLoader(
connectorClassName, connectorClassLoader);
if(connectorClass == null) {
throw new SqoopException(ConnectorError.CONN_0005, connectorClassName);
}
try {
connector = (SqoopConnector) connectorClass.newInstance();
connector.setConnectorName(connectorUniqueName);
} catch (IllegalAccessException ex) {
throw new SqoopException(ConnectorError.CONN_0005,
connectorClassName, ex);

View File

@ -159,7 +159,11 @@ public SqoopConnector getSqoopConnector(long connectorId) {
}
public SqoopConnector getSqoopConnector(String uniqueName) {
if (handlerMap != null && handlerMap.get(uniqueName) != null) {
return handlerMap.get(uniqueName).getSqoopConnector();
} else {
return null;
}
}
public synchronized void initialize() {

View File

@ -17,14 +17,17 @@
*/
package org.apache.sqoop.connector;
import org.apache.sqoop.classloader.ConnectorClassLoader;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.error.code.ConnectorError;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@ -96,4 +99,27 @@ static boolean isConnectorJar(File file) {
throw new RuntimeException(e);
}
}
public static String getConnectorJarPath(URL configFileUrl) {
return configFileUrl.getPath().substring("file:".length(),
configFileUrl.getPath().length() -
("!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES).length());
}
public static ClassLoader createConnectorClassLoader(
final String connectorJarPath, final List<String> systemClasses) {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<ClassLoader>() {
@Override
public ClassLoader run() throws IOException {
return new ConnectorClassLoader(connectorJarPath,
Thread.currentThread().getContextClassLoader(),
systemClasses, false);
}
});
} catch (PrivilegedActionException e) {
throw new SqoopException(ConnectorError.CONN_0011, e);
}
}
}

View File

@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
@ -509,10 +510,19 @@ MJob getJob(String jobName) {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void initializeConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) {
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
private void initializeConnector(final JobRequest jobRequest, final Direction direction,
final Initializer initializer, final InitializerContext initializerContext) {
// Initialize submission from the connector perspective
ClassUtils.executeWithClassLoader(initializer.getClass().getClassLoader(),
new Callable<Void>() {
@Override
public Void call() {
initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
jobRequest.getJobConfig(direction));
return null;
}
});
}
@SuppressWarnings({ "unchecked", "rawtypes" })

View File

@ -18,6 +18,7 @@
*/
package org.apache.sqoop.connector;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@ -71,4 +72,9 @@ public void testIsBlacklisted() throws Exception {
assertFalse(ConnectorManagerUtils.isBlacklisted(url, blacklistedConnectors));
}
@Test
public void testGetConnectorJarPath() throws Exception {
URL url = new URL("jar:file:" + testConnectorPath + "!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
assertEquals(ConnectorManagerUtils.getConnectorJarPath(url), testConnectorPath);
}
}

View File

@ -81,6 +81,11 @@ public void prepareJob(JobRequest jobRequest) {
if(mrJobRequest.getExtractors() != null) {
context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
context.setString(MRJobConstants.JOB_CONNECTOR_FROM_NAME,
mrJobRequest.getConnector(Direction.FROM).getConnectorName());
context.setString(MRJobConstants.JOB_CONNECTOR_TO_NAME,
mrJobRequest.getConnector(Direction.TO).getConnectorName());
}

View File

@ -66,6 +66,12 @@ public final class MRJobConstants extends Constants {
public static final String PREFIX_CONNECTOR_TO_CONTEXT =
PREFIX_JOB_CONFIG + "connector.to.context.";
public static final String JOB_CONNECTOR_FROM_NAME = PREFIX_JOB_CONFIG
+ "connector.from.name";
public static final String JOB_CONNECTOR_TO_NAME = PREFIX_JOB_CONFIG
+ "connector.to.name";
// Hadoop specific constants
// We're using constants from Hadoop 1. Hadoop 2 has different names, but
// provides backward compatibility layer for those names as well.

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.mr;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManagerUtils;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.job.MRJobConstants;
public final class MRUtils {
private static ClassLoader fromConnectorClassLoader;
private static ClassLoader toConnectorClassLoader;
private static boolean connectorClassLoaderInited = false;
public static synchronized void initConnectorClassLoaders(Configuration conf) {
if (connectorClassLoaderInited) {
return;
}
// Create ConnectorClassLoader for from connector
String partitionClass = conf.get(MRJobConstants.JOB_ETL_PARTITION);
String fromConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_FROM_NAME);
if (!StringUtils.isBlank(fromConnectorName)) {
fromConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
getConnectorJarName(fromConnectorName), Arrays.asList(partitionClass));
}
// Create ConnectorClassLoader for to connector
String toConnectorName = conf.get(MRJobConstants.JOB_CONNECTOR_TO_NAME);
if (!StringUtils.isBlank(toConnectorName)) {
toConnectorClassLoader = ConnectorManagerUtils.createConnectorClassLoader(
getConnectorJarName(toConnectorName), null);
}
connectorClassLoaderInited = true;
}
public static ClassLoader getConnectorClassLoader(Direction direction) {
switch (direction) {
case FROM:
return fromConnectorClassLoader;
case TO:
return toConnectorClassLoader;
}
return null;
}
@SuppressWarnings("unchecked")
private static String getConnectorJarName(String connectorName) {
List<URL> configFileUrls = ConnectorManagerUtils.getConnectorConfigs(Collections.EMPTY_SET);
try {
for (URL configFileUrl : configFileUrls) {
Properties properties = new Properties();
properties.load(configFileUrl.openStream());
if (connectorName.equals(properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME))) {
String connectorJarPath = ConnectorManagerUtils.getConnectorJarPath(configFileUrl);
return connectorJarPath.substring(connectorJarPath.lastIndexOf("/") + 1);
}
}
} catch (IOException e) {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName, e);
}
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName);
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@ -32,6 +33,8 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.error.code.MRExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.Partition;
@ -54,9 +57,29 @@ public RecordReader<SqoopSplit, NullWritable> createRecordReader(
return new SqoopRecordReader();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
@Override
public List<InputSplit> getSplits(JobContext context)
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
public List<InputSplit> getSplits(final JobContext context)
throws IOException, InterruptedException {
SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
context.getConfiguration().get(MRJobConstants.JOB_CONNECTOR_FROM_NAME));
ClassLoader loader = null;
if (connector != null) {
loader = connector.getClass().getClassLoader();
}
return (List<InputSplit>) ClassUtils.executeWithClassLoader(loader,
new Callable<List<InputSplit>>() {
@Override
public List<InputSplit> call() throws IOException, InterruptedException {
return getSplitsInternal(context);
}
});
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private List<InputSplit> getSplitsInternal(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

View File

@ -18,13 +18,13 @@
package org.apache.sqoop.job.mr;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
@ -61,9 +61,24 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
private IntermediateDataFormat<Object> toIDF = null;
private Matcher matcher;
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void run(Context context) throws IOException, InterruptedException {
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
public void run(final Context context) throws IOException, InterruptedException {
// Set context ClassLoader for this thread to the ClassLoader for from connector
MRUtils.initConnectorClassLoaders(context.getConfiguration());
ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
runInternal(context);
return null;
}
});
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void runInternal(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.job.mr;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@ -32,6 +33,7 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.utils.ClassUtils;
/**
* An output format for MapReduce job.
@ -58,7 +60,8 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
private static class SqoopDestroyerOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext jobContext) {
public void setupJob(JobContext jobContext) throws IOException {
MRUtils.initConnectorClassLoaders(jobContext.getConfiguration());
}
@Override
@ -73,10 +76,26 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce
invokeDestroyerExecutor(jobContext, false);
}
private void invokeDestroyerExecutor(JobContext jobContext, boolean success) {
Configuration config = jobContext.getConfiguration();
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
private void invokeDestroyerExecutor(final JobContext jobContext, final boolean success) {
final Configuration config = jobContext.getConfiguration();
ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.FROM),
new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
return null;
}
});
ClassUtils.executeWithClassLoader(MRUtils.getConnectorClassLoader(Direction.TO),
new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO, jobContext.getConfiguration().get(MRJobConstants.SUBMITTING_USER));
return null;
}
});
}
@Override

View File

@ -243,11 +243,20 @@ public ConsumerThread(final JobContext context) {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
DataReader reader = new SqoopOutputFormatDataReader();
Configuration conf = context.getConfiguration();
final DataReader reader = new SqoopOutputFormatDataReader();
final Configuration conf = context.getConfiguration();
// Set context ClassLoader for this thread to the ClassLoader for to connector
MRUtils.initConnectorClassLoaders(conf);
ClassLoader classLoader = MRUtils.getConnectorClassLoader(Direction.TO);
ClassUtils.executeWithClassLoader(classLoader,
new Callable<Void>() {
@Override
public Void call() throws Exception {
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
// Objects that should be passed to the Loader
@ -268,7 +277,9 @@ public void run() {
LOG.info("Loader has finished");
((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
loader.getRowsWritten());
return null;
}
});
} catch (Throwable t) {
readerFinished = true;
LOG.error("Error while loading data out of MR job.", t);

View File

@ -66,6 +66,7 @@ public class TestMapReduce {
public void testSqoopInputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
@ -86,6 +87,7 @@ public void testSqoopInputFormat() throws Exception {
public void testSqoopMapper() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
@ -105,6 +107,7 @@ public void testSqoopMapper() throws Exception {
public void testNullOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());

View File

@ -127,6 +127,7 @@ public static Object[][] data() {
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_PARTITION, DummyPartition.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());

View File

@ -182,7 +182,8 @@ public void testWhenLoaderThrows() throws Throwable {
writer.write(writable, null);
}
} catch (SqoopException ex) {
throw ex.getCause();
Assert.assertTrue(ex.getCause() instanceof SqoopException);
throw ex.getCause().getCause();
}
}
@ -267,7 +268,8 @@ public void testThrowingContinuousLoader() throws Throwable {
}
writer.close(null);
} catch (SqoopException ex) {
throw ex.getCause();
Assert.assertTrue(ex.getCause() instanceof SqoopException);
throw ex.getCause().getCause();
}
}

31
pom.xml
View File

@ -124,6 +124,8 @@ limitations under the License.
<groovy.version>2.4.0</groovy.version>
<jansi.version>1.7</jansi.version>
<felix.version>2.4.0</felix.version>
<!-- maven plugin versions -->
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
</properties>
<dependencies>
@ -697,6 +699,7 @@ limitations under the License.
</dependencyManagement>
<modules>
<module>assemblies</module>
<module>common</module>
<module>common-test</module>
<module>core</module>
@ -803,6 +806,34 @@ limitations under the License.
<artifactId>maven-bundle-plugin</artifactId>
<version>${felix.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-assemblies</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>sqoop-connector</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>

View File

@ -80,36 +80,78 @@ limitations under the License.
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kite</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-sftp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-ftp</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-oracle-jdbc</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -47,7 +47,6 @@
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.SubmissionError;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;