mirror of
https://github.com/apache/sqoop.git
synced 2025-05-08 12:19:59 +08:00
SQOOP-2490: Sqoop2: Add extra jars to job
(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
parent
26ae15c200
commit
d3fbe102ac
@ -81,6 +81,11 @@ public final class ConfigurationConstants {
|
||||
*/
|
||||
public static final String CLASSPATH = "org.apache.sqoop.classpath.extra";
|
||||
|
||||
/**
|
||||
* Add external jars to job classpath.
|
||||
*/
|
||||
public static final String JOB_CLASSPATH = "org.apache.sqoop.classpath.job";
|
||||
|
||||
/**
|
||||
* Enable Sqoop App to kill Tomcat in case that it will fail to load.
|
||||
*/
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.error.code.CoreError;
|
||||
import org.apache.sqoop.utils.ClassUtils;
|
||||
|
||||
import static org.apache.sqoop.utils.ContextUtils.getUniqueStrings;
|
||||
|
||||
@ -189,7 +190,7 @@ public synchronized void initialize() {
|
||||
|
||||
initialized = true;
|
||||
|
||||
configureClassLoader(ConfigurationConstants.CLASSPATH);
|
||||
configureClassLoader();
|
||||
}
|
||||
|
||||
public synchronized MapContext getContext() {
|
||||
@ -229,44 +230,53 @@ public synchronized void destroy() {
|
||||
|
||||
/**
|
||||
* Load extra classpath from sqoop configuration.
|
||||
* @param classpathProperty
|
||||
*/
|
||||
private synchronized void configureClassLoader(String classpathProperty) {
|
||||
LOG.info("Adding jars to current classloader from property: " + classpathProperty);
|
||||
private synchronized void configureClassLoader() {
|
||||
LOG.info("Adding jars to current classloader from property: " + ConfigurationConstants.CLASSPATH);
|
||||
List<URL> urls = getJarsForProperty(ConfigurationConstants.CLASSPATH);
|
||||
|
||||
// Chain the current thread classloader so that
|
||||
// configured classpath adds to existing classloader.
|
||||
// Existing classpath is not changed.
|
||||
ClassLoader currentThreadClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (currentThreadClassLoader == null) {
|
||||
throw new SqoopException(CoreError.CORE_0009, "No thread context classloader to override.");
|
||||
}
|
||||
URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]),
|
||||
currentThreadClassLoader);
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the list of jars for a given property
|
||||
*
|
||||
* @param classpathProperty The property corresponding to the list of jars
|
||||
* @return A list of URLs pointing to the jars
|
||||
*/
|
||||
public synchronized List<URL> getJarsForProperty(String classpathProperty) {
|
||||
List<URL> urls = new LinkedList<>();
|
||||
|
||||
// CSV URL list separated by ":".
|
||||
String paths = getContext().getString(classpathProperty);
|
||||
|
||||
if (StringUtils.isEmpty(paths)) {
|
||||
LOG.debug("Property " + classpathProperty + " is null or empty. Not adding any extra jars.");
|
||||
return;
|
||||
}
|
||||
|
||||
ClassLoader currentThreadClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (currentThreadClassLoader == null) {
|
||||
throw new SqoopException(CoreError.CORE_0009, "No thread context classloader to override.");
|
||||
}
|
||||
|
||||
Set<String> pathSet = getUniqueStrings(paths);
|
||||
List<URL> urls = new LinkedList<URL>();
|
||||
|
||||
for (String path : pathSet) {
|
||||
try {
|
||||
LOG.debug("Found jar in path: " + path);
|
||||
URL url = new File(path).toURI().toURL();
|
||||
urls.add(url);
|
||||
LOG.debug("Using URL: " + url.toString());
|
||||
} catch (MalformedURLException e) {
|
||||
throw new SqoopException(CoreError.CORE_0009, "Malformed URL found.", e);
|
||||
LOG.debug("Property " + classpathProperty + " is null or empty.");
|
||||
} else {
|
||||
Set<String> pathSet = getUniqueStrings(paths);
|
||||
for (String path : pathSet) {
|
||||
try {
|
||||
LOG.debug("Found jar in path: " + path);
|
||||
URL url = new File(path).toURI().toURL();
|
||||
urls.add(url);
|
||||
LOG.debug("Using URL: " + url.toString());
|
||||
} catch (MalformedURLException e) {
|
||||
throw new SqoopException(CoreError.CORE_0009, "Malformed URL found.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Chain the current thread classloader so that
|
||||
// configured classpath adds to existing classloader.
|
||||
// Existing classpath is not changed.
|
||||
URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]),
|
||||
currentThreadClassLoader);
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
return urls;
|
||||
}
|
||||
|
||||
private synchronized void configureLogging() {
|
||||
|
@ -17,8 +17,11 @@
|
||||
*/
|
||||
package org.apache.sqoop.driver;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.Direction;
|
||||
@ -27,6 +30,7 @@
|
||||
import org.apache.sqoop.connector.ConnectorManager;
|
||||
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
|
||||
import org.apache.sqoop.connector.spi.SqoopConnector;
|
||||
import org.apache.sqoop.core.ConfigurationConstants;
|
||||
import org.apache.sqoop.core.Reconfigurable;
|
||||
import org.apache.sqoop.core.SqoopConfiguration;
|
||||
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
|
||||
@ -162,6 +166,11 @@ public static void setInstance(JobManager newInstance) {
|
||||
*/
|
||||
private String notificationBaseUrl;
|
||||
|
||||
/**
|
||||
* Additional jars to be made available during job execution
|
||||
*/
|
||||
private List<URL> extraJobClasspath;
|
||||
|
||||
/**
|
||||
* Set notification base URL.
|
||||
*
|
||||
@ -210,6 +219,8 @@ public synchronized void destroy() {
|
||||
if (executionEngine != null) {
|
||||
executionEngine.destroy();
|
||||
}
|
||||
|
||||
extraJobClasspath = null;
|
||||
}
|
||||
|
||||
public synchronized void initialize() {
|
||||
@ -241,6 +252,8 @@ public synchronized void initialize() {
|
||||
executionEngineClassName);
|
||||
}
|
||||
|
||||
extraJobClasspath = SqoopConfiguration.getInstance().getJarsForProperty(ConfigurationConstants.JOB_CLASSPATH);
|
||||
|
||||
// We need to make sure that user has configured compatible combination of
|
||||
// submission engine and execution engine
|
||||
if (!submissionEngine
|
||||
@ -372,6 +385,8 @@ private JobRequest createJobRequest(MSubmission submission, MJob job) {
|
||||
|
||||
// set all the jars
|
||||
addStandardJars(jobRequest);
|
||||
addJobJars(jobRequest);
|
||||
addExtraJarsFromSqoopConfig(jobRequest);
|
||||
addConnectorClass(jobRequest, fromConnector);
|
||||
addConnectorClass(jobRequest, toConnector);
|
||||
addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat());
|
||||
@ -419,6 +434,21 @@ private void addStandardJars(JobRequest jobRequest) {
|
||||
jobRequest.addJarForClass(executionEngine.getClass());
|
||||
}
|
||||
|
||||
private void addExtraJarsFromSqoopConfig(JobRequest jobRequest) {
|
||||
Set<String> jars = new HashSet<>();
|
||||
for (URL jarURL : extraJobClasspath){
|
||||
jars.add(jarURL.toString());
|
||||
}
|
||||
jobRequest.addJars(jars);
|
||||
}
|
||||
|
||||
private void addJobJars(JobRequest jobRequest) {
|
||||
JobConfiguration jobConfiguration = (JobConfiguration) jobRequest.getDriverConfig();
|
||||
if (jobConfiguration.jarConfig.extraJars != null) {
|
||||
jobRequest.addJars(new HashSet<>(jobConfiguration.jarConfig.extraJars));
|
||||
}
|
||||
}
|
||||
|
||||
MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
|
||||
MSubmission summary = new MSubmission(jobId);
|
||||
summary.setCreationUser(ctx.getUsername());
|
||||
@ -698,6 +728,8 @@ public synchronized void configurationChanged() {
|
||||
"You might need to restart the server.");
|
||||
}
|
||||
|
||||
extraJobClasspath = SqoopConfiguration.getInstance().getJarsForProperty(ConfigurationConstants.JOB_CLASSPATH);
|
||||
|
||||
// Set up worker threads
|
||||
purgeThreshold = newContext.getLong(
|
||||
DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
|
||||
|
@ -0,0 +1,30 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.driver.configuration;
|
||||
|
||||
import org.apache.sqoop.model.ConfigClass;
|
||||
import org.apache.sqoop.model.Input;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ConfigClass
|
||||
public class JarConfig {
|
||||
// A list of the FQDNs of additional jars that are needed to execute the job
|
||||
@Input public List<String> extraJars;
|
||||
}
|
@ -28,7 +28,12 @@ public class JobConfiguration {
|
||||
@Config
|
||||
public ThrottlingConfig throttlingConfig;
|
||||
|
||||
@Config
|
||||
public JarConfig jarConfig;
|
||||
|
||||
|
||||
public JobConfiguration() {
|
||||
throttlingConfig = new ThrottlingConfig();
|
||||
jarConfig = new JarConfig();
|
||||
}
|
||||
}
|
||||
|
@ -242,6 +242,7 @@ public Object doIt(Connection conn) {
|
||||
// so let's just compare the structure to see if we need upgrade.
|
||||
if(!mDriver.equals(existingDriver)) {
|
||||
if (autoUpgrade) {
|
||||
mDriver.setPersistenceId(existingDriver.getPersistenceId());
|
||||
upgradeDriver(mDriver);
|
||||
return mDriver;
|
||||
} else {
|
||||
|
@ -28,3 +28,10 @@ throttlingConfig.numExtractors.help = Number of extractors that Sqoop will use
|
||||
throttlingConfig.numLoaders.label = Loaders
|
||||
throttlingConfig.numLoaders.help = Number of loaders that Sqoop will use
|
||||
|
||||
# Jar Configuration
|
||||
#
|
||||
jarConfig.label = Classpath configuration
|
||||
jarConfig.help = Classpath configuration specific to the driver
|
||||
|
||||
jarConfig.extraJars.label = Extra mapper jars
|
||||
jarConfig.extraJars.help = A list of the FQDNs of additional jars that are needed to execute the job
|
||||
|
@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.driver;
|
||||
|
||||
import org.apache.sqoop.driver.configuration.JobConfiguration;
|
||||
import org.apache.sqoop.model.ConfigUtils;
|
||||
import org.apache.sqoop.model.MConfig;
|
||||
import org.apache.sqoop.model.MInput;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.ResourceBundle;
|
||||
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
|
||||
public class TestJobConfiguration {
|
||||
|
||||
@Test
|
||||
public void testBundleForLink() {
|
||||
verifyBundleForConfigurationClass(Driver.getInstance().getBundle(Locale
|
||||
.getDefault()), JobConfiguration.class);
|
||||
}
|
||||
|
||||
void verifyBundleForConfigurationClass(ResourceBundle bundle, Class klass) {
|
||||
assertNotNull(bundle);
|
||||
assertNotNull(klass);
|
||||
|
||||
List<MConfig> configs = ConfigUtils.toConfigs(klass);
|
||||
|
||||
for(MConfig config : configs) {
|
||||
assertNotNull(config.getHelpKey());
|
||||
assertNotNull(config.getLabelKey());
|
||||
|
||||
assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
|
||||
assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
|
||||
|
||||
for(MInput input : config.getInputs()) {
|
||||
assertNotNull(input.getHelpKey());
|
||||
assertNotNull(input.getLabelKey());
|
||||
|
||||
assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
|
||||
assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
5
dist/src/main/server/conf/sqoop.properties
vendored
5
dist/src/main/server/conf/sqoop.properties
vendored
@ -179,3 +179,8 @@ org.apache.sqoop.connector.external.loadpath=
|
||||
# ":" separated list of jars to be included in sqoop.
|
||||
#
|
||||
org.apache.sqoop.classpath.extra=
|
||||
|
||||
# Sqoop extra classpath to be included with all jobs
|
||||
# ":" separated list of jars to be included in map job classpath.
|
||||
#
|
||||
org.apache.sqoop.classpath.job=
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.sqoop.test.minicluster;
|
||||
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.core.ConfigurationConstants;
|
||||
@ -138,6 +139,7 @@ protected void prepareTemporaryPath() throws Exception {
|
||||
mapToProperties(sqoopProperties, getSecurityConfiguration());
|
||||
mapToProperties(sqoopProperties, getConnectorManagerConfiguration());
|
||||
mapToProperties(sqoopProperties, getDriverManagerConfiguration());
|
||||
mapToProperties(sqoopProperties, getClasspathConfiguration());
|
||||
|
||||
FileUtils.writeLines(f, sqoopProperties);
|
||||
|
||||
@ -215,4 +217,8 @@ protected Map<String, String> getDriverManagerConfiguration() {
|
||||
properties.put(ConfigurationConstants.DRIVER_AUTO_UPGRADE, "true");
|
||||
return properties;
|
||||
}
|
||||
|
||||
protected Map<String, String> getClasspathConfiguration() {
|
||||
return MapUtils.EMPTY_MAP;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,333 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.integration.classpath;
|
||||
|
||||
import org.apache.commons.collections.ListUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.client.SqoopClient;
|
||||
import org.apache.sqoop.core.ConfigurationConstants;
|
||||
import org.apache.sqoop.model.MDriverConfig;
|
||||
import org.apache.sqoop.model.MJob;
|
||||
import org.apache.sqoop.model.MLink;
|
||||
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
|
||||
import org.apache.sqoop.test.testcases.ConnectorTestCase;
|
||||
import org.apache.sqoop.test.utils.HdfsUtils;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import javax.tools.JavaCompiler;
|
||||
import javax.tools.JavaFileObject;
|
||||
import javax.tools.StandardJavaFileManager;
|
||||
import javax.tools.StandardLocation;
|
||||
import javax.tools.ToolProvider;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.jar.Attributes;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarOutputStream;
|
||||
import java.util.jar.Manifest;
|
||||
|
||||
public class ClasspathTest extends ConnectorTestCase {
|
||||
|
||||
private static final String TEST_CONNECTOR_JAR_NAME = "test-connector.jar";
|
||||
private static final String TEST_DEPENDENCY_JAR_NAME = "test-dependency.jar";
|
||||
|
||||
private ClassLoader classLoader;
|
||||
|
||||
public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster {
|
||||
|
||||
private String extraClasspath;
|
||||
private String jobExtraClasspath;
|
||||
|
||||
public DerbySqoopMiniCluster(String temporaryPath, Configuration configuration, String extraClasspath, String jobExtraClasspath) throws Exception {
|
||||
super(temporaryPath, configuration);
|
||||
this.extraClasspath = extraClasspath;
|
||||
this.jobExtraClasspath = jobExtraClasspath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getClasspathConfiguration() {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
|
||||
if (extraClasspath != null) {
|
||||
properties.put(ConfigurationConstants.CLASSPATH, extraClasspath);
|
||||
}
|
||||
if (jobExtraClasspath != null) {
|
||||
properties.put(ConfigurationConstants.JOB_CLASSPATH, jobExtraClasspath);
|
||||
}
|
||||
|
||||
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
class JarContents {
|
||||
private List<File> sourceFiles;
|
||||
private List<File> properitesFiles;
|
||||
|
||||
public JarContents(List<File> sourceFiles, List<File> properitesFiles){
|
||||
this.sourceFiles = sourceFiles;
|
||||
this.properitesFiles = properitesFiles;
|
||||
}
|
||||
|
||||
public List<File> getSourceFiles() {
|
||||
return sourceFiles;
|
||||
}
|
||||
|
||||
public List<File> getProperitesFiles() {
|
||||
return properitesFiles;
|
||||
}
|
||||
}
|
||||
|
||||
public void startSqoopMiniCluster(String extraClasspath, String jobExtraClasspath) throws Exception {
|
||||
// And use them for new Derby repo instance
|
||||
setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration(), extraClasspath, jobExtraClasspath));
|
||||
|
||||
// Start server
|
||||
getCluster().start();
|
||||
|
||||
// Initialize Sqoop Client API
|
||||
setClient(new SqoopClient(getServerUrl()));
|
||||
}
|
||||
|
||||
@BeforeMethod
|
||||
public void captureClasspath() {
|
||||
classLoader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void restoreClasspath(){
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClasspathSqoopProperties() throws Exception {
|
||||
Map<String, String> jarMap = compileTestConnectorAndDependency();
|
||||
startSqoopMiniCluster(jarMap.get(TEST_CONNECTOR_JAR_NAME), jarMap.get
|
||||
(TEST_DEPENDENCY_JAR_NAME));
|
||||
createAndLoadTableCities();
|
||||
|
||||
MJob job = prepareJob();
|
||||
|
||||
prepareDriverConfig(job);
|
||||
|
||||
saveJob(job);
|
||||
|
||||
executeJob(job);
|
||||
|
||||
stopSqoop();
|
||||
deleteJars(jarMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClasspathDriverInput() throws Exception{
|
||||
Map<String, String> jarMap = compileTestConnectorAndDependency();
|
||||
startSqoopMiniCluster(jarMap.get(TEST_CONNECTOR_JAR_NAME), null);
|
||||
createAndLoadTableCities();
|
||||
|
||||
MJob job = prepareJob();
|
||||
|
||||
MDriverConfig driverConfig = prepareDriverConfig(job);
|
||||
|
||||
List<String> extraJars = new ArrayList<>();
|
||||
extraJars.add("file:" + jarMap.get(TEST_DEPENDENCY_JAR_NAME));
|
||||
driverConfig.getListInput("jarConfig.extraJars").setValue(extraJars);
|
||||
|
||||
saveJob(job);
|
||||
|
||||
executeJob(job);
|
||||
|
||||
stopSqoop();
|
||||
deleteJars(jarMap);
|
||||
}
|
||||
|
||||
private MJob prepareJob() {
|
||||
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
|
||||
fillRdbmsLinkConfig(rdbmsConnection);
|
||||
saveLink(rdbmsConnection);
|
||||
|
||||
MLink testConnection = getClient().createLink("test-connector");
|
||||
saveLink(testConnection);
|
||||
|
||||
MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), testConnection.getPersistenceId());
|
||||
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
private MDriverConfig prepareDriverConfig(MJob job) {
|
||||
MDriverConfig driverConfig = job.getDriverConfig();
|
||||
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
|
||||
|
||||
return driverConfig;
|
||||
}
|
||||
|
||||
private Map<String, String> compileTestConnectorAndDependency() throws Exception {
|
||||
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
|
||||
if (compiler == null) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot find the system Java compiler. "
|
||||
+ "Check that your class path includes tools.jar");
|
||||
}
|
||||
|
||||
Path outputDir = Files.createTempDirectory(null);
|
||||
|
||||
Map<String, JarContents> sourceFileToJarMap = new HashMap<>();
|
||||
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
List<File> sourceFiles = new ArrayList<>();
|
||||
File file = new File(classLoader.getResource("TestConnector/TestConnector.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
file = new File(classLoader.getResource("TestConnector/TestLinkConfiguration.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
file = new File(classLoader.getResource("TestConnector/TestLoader.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
file = new File(classLoader.getResource("TestConnector/TestToDestroyer.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
file = new File(classLoader.getResource("TestConnector/TestToInitializer.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
file = new File(classLoader.getResource("TestConnector/TestToJobConfiguration.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
|
||||
List<File> propertiesFiles = new ArrayList<>();
|
||||
file = new File(classLoader.getResource("TestConnector/sqoopconnector.properties").getFile());
|
||||
propertiesFiles.add(file);
|
||||
sourceFileToJarMap.put("test-connector.jar", new JarContents(sourceFiles, propertiesFiles));
|
||||
|
||||
sourceFiles = new ArrayList<>();
|
||||
file = new File(classLoader.getResource("TestConnector/TestDependency.java").getFile());
|
||||
sourceFiles.add(file);
|
||||
sourceFileToJarMap.put("test-dependency.jar", new JarContents(sourceFiles, ListUtils.EMPTY_LIST));
|
||||
|
||||
return buildJar(outputDir.toString(), sourceFileToJarMap);
|
||||
}
|
||||
|
||||
private Map<String, String> buildJar(String outputDir, Map<String, JarContents> sourceFileToJarMap) throws Exception {
|
||||
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
|
||||
StandardJavaFileManager fileManager = compiler.getStandardFileManager
|
||||
(null, null, null);
|
||||
|
||||
List<File> sourceFiles = new ArrayList<>();
|
||||
for(JarContents jarContents : sourceFileToJarMap.values()) {
|
||||
sourceFiles.addAll(jarContents.sourceFiles);
|
||||
}
|
||||
|
||||
fileManager.setLocation(StandardLocation.CLASS_OUTPUT,
|
||||
Arrays.asList(new File(outputDir.toString())));
|
||||
|
||||
Iterable<? extends JavaFileObject> compilationUnits1 =
|
||||
fileManager.getJavaFileObjectsFromFiles(sourceFiles);
|
||||
|
||||
boolean compiled = compiler.getTask(null, fileManager, null, null, null, compilationUnits1).call();
|
||||
if (!compiled) {
|
||||
throw new RuntimeException("failed to compile");
|
||||
}
|
||||
|
||||
for(Map.Entry<String, JarContents> jarNameAndContents : sourceFileToJarMap.entrySet()) {
|
||||
Manifest manifest = new Manifest();
|
||||
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
|
||||
manifest.getMainAttributes().put(Attributes.Name.CLASS_PATH, ".");
|
||||
|
||||
|
||||
JarOutputStream target = new JarOutputStream(new FileOutputStream(outputDir.toString() + File.separator + jarNameAndContents.getKey()), manifest);
|
||||
List<String> classesForJar = new ArrayList<>();
|
||||
for(File sourceFile : jarNameAndContents.getValue().getSourceFiles()) {
|
||||
//split the file on dot to get the filename from FILENAME.java
|
||||
String fileName = sourceFile.getName().split("\\.")[0];
|
||||
classesForJar.add(fileName);
|
||||
}
|
||||
|
||||
File dir = new File(outputDir);
|
||||
File[] directoryListing = dir.listFiles();
|
||||
for (File compiledClass : directoryListing) {
|
||||
String classFileName = compiledClass.getName().split("\\$")[0].split("\\.")[0];
|
||||
if (classesForJar.contains(classFileName)){
|
||||
addFileToJar(compiledClass, target);
|
||||
}
|
||||
}
|
||||
|
||||
for (File propertiesFile : jarNameAndContents.getValue().getProperitesFiles()) {
|
||||
addFileToJar(propertiesFile, target);
|
||||
}
|
||||
|
||||
target.close();
|
||||
|
||||
|
||||
}
|
||||
//delete non jar files
|
||||
File dir = new File(outputDir);
|
||||
File[] directoryListing = dir.listFiles();
|
||||
for (File file : directoryListing) {
|
||||
String extension = file.getName().split("\\.")[1];
|
||||
if (!extension.equals("jar")) {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> jarMap = new HashMap<>();
|
||||
jarMap.put(TEST_CONNECTOR_JAR_NAME, outputDir.toString() + File.separator
|
||||
+ TEST_CONNECTOR_JAR_NAME);
|
||||
jarMap.put(TEST_DEPENDENCY_JAR_NAME, outputDir.toString() + File.separator + TEST_DEPENDENCY_JAR_NAME);
|
||||
return jarMap;
|
||||
}
|
||||
|
||||
private void addFileToJar(File source, JarOutputStream target) throws Exception {
|
||||
JarEntry entry = new JarEntry(source.getName());
|
||||
entry.setTime(source.lastModified());
|
||||
target.putNextEntry(entry);
|
||||
BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
|
||||
|
||||
long bufferSize = source.length();
|
||||
if (bufferSize < Integer.MIN_VALUE || bufferSize > Integer.MAX_VALUE) {
|
||||
throw new RuntimeException("file to large to be added to jar");
|
||||
}
|
||||
|
||||
byte[] buffer = new byte[(int) bufferSize];
|
||||
while (true) {
|
||||
int count = in.read(buffer);
|
||||
if (count == -1)
|
||||
break;
|
||||
target.write(buffer, 0, count);
|
||||
}
|
||||
target.closeEntry();
|
||||
if (in != null) in.close();
|
||||
}
|
||||
|
||||
private void deleteJars(Map<String, String> jarMap) throws Exception {
|
||||
for (String jarPath : jarMap.values()) {
|
||||
(new File(jarPath)).delete();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startSqoop() throws Exception {
|
||||
// Do nothing so that Sqoop isn't started before Suite.
|
||||
}
|
||||
}
|
89
test/src/test/resources/TestConnector/TestConnector.java
Normal file
89
test/src/test/resources/TestConnector/TestConnector.java
Normal file
@ -0,0 +1,89 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.commons.collections.ListUtils;
|
||||
import org.apache.sqoop.common.Direction;
|
||||
import org.apache.sqoop.connector.ftp.FtpLoader;
|
||||
import org.apache.sqoop.connector.ftp.FtpToDestroyer;
|
||||
import org.apache.sqoop.connector.ftp.FtpToInitializer;
|
||||
import org.apache.sqoop.connector.jdbc.GenericJdbcConnectorConstants;
|
||||
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
|
||||
import org.apache.sqoop.connector.spi.SqoopConnector;
|
||||
import org.apache.sqoop.job.etl.From;
|
||||
import org.apache.sqoop.job.etl.To;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.ResourceBundle;
|
||||
|
||||
/**
|
||||
* A connector that does not do anything
|
||||
*/
|
||||
public class TestConnector extends SqoopConnector {
|
||||
|
||||
private static final To TO = new To(TestToInitializer.class,
|
||||
TestLoader.class,
|
||||
TestToDestroyer.class);
|
||||
|
||||
@Override
|
||||
public String getVersion() {
|
||||
return "1.0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceBundle getBundle(Locale locale) {
|
||||
return ResourceBundle.getBundle(
|
||||
GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class getLinkConfigurationClass() {
|
||||
return TestLinkConfiguration.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class getJobConfigurationClass(Direction direction) {
|
||||
switch (direction) {
|
||||
case TO:
|
||||
return TestToJobConfiguration.class;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public From getFrom() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public To getTo() {
|
||||
return TO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Direction> getSupportedDirections() {
|
||||
return Arrays.asList(Direction.TO);
|
||||
}
|
||||
}
|
23
test/src/test/resources/TestConnector/TestDependency.java
Normal file
23
test/src/test/resources/TestConnector/TestDependency.java
Normal file
@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public class TestDependency {
|
||||
public TestDependency() {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.sqoop.model.ConfigurationClass;
|
||||
|
||||
@ConfigurationClass
|
||||
public class TestLinkConfiguration {
|
||||
public TestLinkConfiguration() {
|
||||
}
|
||||
}
|
45
test/src/test/resources/TestConnector/TestLoader.java
Normal file
45
test/src/test/resources/TestConnector/TestLoader.java
Normal file
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.sqoop.etl.io.DataReader;
|
||||
import org.apache.sqoop.job.etl.Loader;
|
||||
import org.apache.sqoop.job.etl.LoaderContext;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class TestLoader extends Loader<TestLinkConfiguration, TestToJobConfiguration> {
|
||||
|
||||
private long rowsWritten = 0;
|
||||
|
||||
|
||||
@Override
|
||||
public void load(LoaderContext context, TestLinkConfiguration linkConfiguration,
|
||||
TestToJobConfiguration toJobConfig) throws Exception {
|
||||
DataReader reader = context.getDataReader();
|
||||
//This will break if the TestDependency jar is not loaded
|
||||
TestDependency testDependency = new TestDependency();
|
||||
while (reader.readTextRecord() != null){
|
||||
rowsWritten++;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowsWritten() {
|
||||
return rowsWritten;
|
||||
}
|
||||
}
|
27
test/src/test/resources/TestConnector/TestToDestroyer.java
Normal file
27
test/src/test/resources/TestConnector/TestToDestroyer.java
Normal file
@ -0,0 +1,27 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.sqoop.job.etl.Destroyer;
|
||||
import org.apache.sqoop.job.etl.DestroyerContext;
|
||||
|
||||
public class TestToDestroyer extends Destroyer<TestLinkConfiguration, TestToJobConfiguration> {
|
||||
@Override
|
||||
public void destroy(DestroyerContext context, TestLinkConfiguration linkConfig,
|
||||
TestToJobConfiguration jobConfig) {
|
||||
}
|
||||
}
|
27
test/src/test/resources/TestConnector/TestToInitializer.java
Normal file
27
test/src/test/resources/TestConnector/TestToInitializer.java
Normal file
@ -0,0 +1,27 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.sqoop.job.etl.Initializer;
|
||||
import org.apache.sqoop.job.etl.InitializerContext;
|
||||
|
||||
public class TestToInitializer extends Initializer<TestLinkConfiguration, TestToJobConfiguration> {
|
||||
@Override
|
||||
public void initialize(InitializerContext context, TestLinkConfiguration linkConfig,
|
||||
TestToJobConfiguration jobConfig) {
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.sqoop.model.ConfigurationClass;
|
||||
|
||||
@ConfigurationClass
|
||||
public class TestToJobConfiguration {
|
||||
public TestToJobConfiguration() {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Test Connector Properties
|
||||
org.apache.sqoop.connector.class = TestConnector
|
||||
org.apache.sqoop.connector.name = test-connector
|
33
test/src/test/resources/classpath-tests-suite.xml
Normal file
33
test/src/test/resources/classpath-tests-suite.xml
Normal file
@ -0,0 +1,33 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
|
||||
|
||||
<suite name="ClasspathTests" verbose="2" parallel="false">
|
||||
|
||||
<listeners>
|
||||
<listener class-name="org.apache.sqoop.test.testng.SqoopTestListener" />
|
||||
</listeners>
|
||||
|
||||
<test name="ClasspathTests">
|
||||
<packages>
|
||||
<package name="org.apache.sqoop.integration.classpath"/>
|
||||
</packages>
|
||||
</test>
|
||||
|
||||
</suite>
|
Loading…
Reference in New Issue
Block a user