5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 16:41:08 +08:00

SQOOP-2762: Sqoop2: Provide test infrastructure base class for connector loading tests

(Dian Fu via Colin Ma)
This commit is contained in:
Colin Ma 2016-01-29 15:48:26 +08:00
parent 25d236c315
commit 66a86353ca
7 changed files with 64 additions and 593 deletions

View File

@ -26,6 +26,7 @@
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.MRUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@ -101,4 +102,10 @@ protected void addDependencies(MRJobRequest jobrequest) {
// Guava
jobrequest.addJarForClass(ThreadFactoryBuilder.class);
}
@Override
public void destroy() {
super.destroy();
MRUtils.destroy();
}
}

View File

@ -92,4 +92,8 @@ private static String getConnectorJarName(String connectorName) {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0026, connectorName);
}
public static synchronized void destroy() {
connectorClassLoaderInited = false;
}
}

View File

@ -1,285 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.testcases;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.sqoop.common.test.asserts.ProviderAsserts;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.DatabaseProviderFactory;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.test.data.Cities;
import org.apache.sqoop.test.data.ShortStories;
import org.apache.sqoop.test.data.UbuntuReleases;
import org.apache.sqoop.test.utils.SqoopUtils;
import org.apache.sqoop.validation.Status;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
/**
* Base test case suitable for connector testing.
*
* In addition to Jetty based test case it will also create and initialize
* the database provider prior every test execution.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"MS_PKGPROTECT", "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"})
abstract public class ConnectorTestCase extends JettyTestCase {
private static final Logger LOG = Logger.getLogger(ConnectorTestCase.class);
protected static DatabaseProvider provider;
/**
* Default submission callbacks that are printing various status about the submission.
*/
protected static final SubmissionCallback DEFAULT_SUBMISSION_CALLBACKS = new SubmissionCallback() {
@Override
public void submitted(MSubmission submission) {
LOG.info("Submission submitted: " + submission);
}
@Override
public void updated(MSubmission submission) {
LOG.info("Submission updated: " + submission);
}
@Override
public void finished(MSubmission submission) {
LOG.info("Submission finished: " + submission);
}
};
@BeforeSuite(alwaysRun = true)
public void startProvider() throws Exception {
provider = DatabaseProviderFactory.getProvider(System.getProperties());
LOG.info("Starting database provider: " + provider.getClass().getName());
provider.start();
}
@AfterSuite(alwaysRun = true)
public void stopProvider() {
LOG.info("Stopping database provider: " + provider.getClass().getName());
provider.stop();
}
public TableName getTableName() {
return new TableName(getClass().getSimpleName());
}
protected void createTable(String primaryKey, String ...columns) {
provider.createTable(getTableName(), primaryKey, columns);
}
protected void dropTable() {
provider.dropTable(getTableName());
}
protected void insertRow(Object ...values) {
provider.insertRow(getTableName(), values);
}
protected long rowCount() {
return provider.rowCount(getTableName());
}
protected void dumpTable() {
provider.dumpTable(getTableName());
}
/**
* Fill link config based on currently active provider.
*
* @param link MLink object to fill
*/
protected void fillRdbmsLinkConfig(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.jdbcDriver").setValue(provider.getJdbcDriver());
configs.getStringInput("linkConfig.connectionString").setValue(provider.getConnectionUrl());
configs.getStringInput("linkConfig.username").setValue(provider.getConnectionUsername());
configs.getStringInput("linkConfig.password").setValue(provider.getConnectionPassword());
}
protected void fillRdbmsFromConfig(MJob job, String partitionColumn) {
MConfigList fromConfig = job.getFromJobConfig();
fromConfig.getStringInput("fromJobConfig.tableName").setValue(getTableName().getTableName());
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
}
protected void fillRdbmsToConfig(MJob job) {
MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.tableName").setValue(getTableName().getTableName());
}
protected void fillHdfsLink(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
}
/**
* Fill TO config with specific storage and output type.
*
* @param job MJob object to fill
* @param output Output type that should be set
*/
protected void fillHdfsToConfig(MJob job, ToFormat output) {
MConfigList toConfig = job.getToJobConfig();
toConfig.getEnumInput("toJobConfig.outputFormat").setValue(output);
toConfig.getStringInput("toJobConfig.outputDirectory").setValue(getMapreduceDirectory());
}
/**
* Fill FROM config
*
* @param job MJob object to fill
*/
protected void fillHdfsFromConfig(MJob job) {
MConfigList fromConfig = job.getFromJobConfig();
fromConfig.getStringInput("fromJobConfig.inputDirectory").setValue(getMapreduceDirectory());
}
/**
* Fill Driver config
* @param job
*/
protected void fillDriverConfig(MJob job) {
job.getDriverConfig().getStringInput("throttlingConfig.numExtractors").setValue("3");
}
/**
* Create table cities.
*/
protected void createTableCities() {
new Cities(provider, getTableName()).createTables();
}
/**
* Create table cities and load few rows.
*/
protected void createAndLoadTableCities() {
new Cities(provider, getTableName()).createTables().loadBasicData();
}
/**
* Create table for ubuntu releases.
*/
protected void createTableUbuntuReleases() {
new UbuntuReleases(provider, getTableName()).createTables();
}
/**
* Create table for ubuntu releases.
*/
protected void createAndLoadTableUbuntuReleases() {
new UbuntuReleases(provider, getTableName()).createTables().loadBasicData();
}
/**
* Create table for short stories.
*/
protected void createTableShortStories() {
new ShortStories(provider, getTableName()).createTables();
}
/**
* Create table for short stories.
*/
protected void createAndLoadTableShortStories() {
new ShortStories(provider, getTableName()).createTables().loadBasicData();
}
/**
* Assert row in testing table.
*
* @param conditions Conditions in config that are expected by the database provider
* @param values Values that are expected in the table (with corresponding types)
*/
protected void assertRow(Object[] conditions, Object ...values) {
ProviderAsserts.assertRow(provider, getTableName(), conditions, values);
}
/**
* Assert row in table "cities".
*
* @param values Values that are expected
*/
protected void assertRowInCities(Object... values) {
assertRow(new Object[]{"id", values[0]}, values);
}
/**
* Create link.
*
* With asserts to make sure that it was created correctly.
*
* @param link
*/
protected void saveLink(MLink link) {
SqoopUtils.fillObjectName(link);
assertEquals(getClient().saveLink(link), Status.OK);
}
/**
* Create job.
*
* With asserts to make sure that it was created correctly.
*
* @param job
*/
protected void saveJob(MJob job) {
SqoopUtils.fillObjectName(job);
assertEquals(getClient().saveJob(job), Status.OK);
}
/**
* Run job with given job name.
*
* @param jobName Job Name
* @throws Exception
*/
protected void executeJob(String jobName) throws Exception {
MSubmission finalSubmission = getClient().startJob(jobName, DEFAULT_SUBMISSION_CALLBACKS, 100);
if(finalSubmission.getStatus().isFailure()) {
LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary());
LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails());
}
assertEquals(finalSubmission.getStatus(), SubmissionStatus.SUCCEEDED,
"Submission finished with error: " + finalSubmission.getError().getErrorSummary());
}
/**
* Run given job.
*
* @param job Job object
* @throws Exception
*/
protected void executeJob(MJob job) throws Exception {
executeJob(job.getName());
}
}

View File

@ -1,274 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.testcases;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner;
import org.apache.sqoop.test.hadoop.HadoopRunner;
import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
import org.apache.sqoop.test.kdc.KdcRunner;
import org.apache.sqoop.test.kdc.KdcRunnerFactory;
import org.apache.sqoop.test.kdc.MiniKdcRunner;
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.test.minicluster.SqoopMiniClusterFactory;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.ITest;
import org.testng.ITestContext;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeSuite;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
/**
* Basic test case that will bootstrap Sqoop server running in embedded Jetty
* process.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"MS_PKGPROTECT", "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"})
abstract public class JettyTestCase implements ITest {
private static final Logger LOG = Logger.getLogger(JettyTestCase.class);
public String methodName;
/**
* Temporary base path that will be used for tests.
*
* By default we will take a look for sqoop.integration.tmpdir property that is
* filled up by maven. If the test is not started from maven (IDE) we will
* pick up configured java.io.tmpdir value. The last results is /tmp/ directory
* in case that no property is set.
*/
protected static final String TMP_PATH_BASE =
System.getProperty("sqoop.integration.tmpdir", System.getProperty("java.io.tmpdir", "/tmp")) + "/sqoop-cargo-tests";
/**
* Temporary directory that will be used by the test.
*
* We will take TMP_PATH_BASE and append the test suite. For example:
*
* TMP_PATH_BASE/TestConnectorsSuite
*/
private static String tmpPath;
/**
* Hadoop cluster
*/
protected static HadoopRunner hadoopCluster;
/**
* Hadoop client
*/
protected static FileSystem hdfsClient;
/**
* Jetty based Sqoop mini cluster
*/
private static SqoopMiniCluster cluster;
/**
* Sqoop client API.
*/
private static SqoopClient client;
/**
* Kdc
*/
private static KdcRunner kdc;
/**
* Use the method name as the test name
*/
public String getTestName() {
return methodName;
}
@BeforeMethod(alwaysRun = true)
public void setupMehodNameAndOutputPath(Method method) throws Exception {
methodName = method.getName();
hdfsClient.delete(new Path(getMapreduceDirectory()), true);
}
@BeforeSuite(alwaysRun = true)
public void setupSuite(ITestContext context) throws Exception {
tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, context.getSuite().getName());
LOG.debug("Temporary Directory: " + getTemporaryPath());
FileUtils.deleteDirectory(new File(getTemporaryPath()));
startKdc();
startHadoop();
startSqoop();
}
@AfterSuite(alwaysRun = true)
public void tearDownSuite() throws Exception {
stopSqoop();
stopHadoop();
stopKdc();
}
protected void startKdc() throws Exception {
kdc = KdcRunnerFactory.getKdc(System.getProperties(), MiniKdcRunner.class);
kdc.setTemporaryPath(getTemporaryPath());
kdc.start();
}
protected void stopKdc() throws Exception {
kdc.stop();
}
protected void startHadoop() throws Exception {
// Start Hadoop Clusters
hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopMiniClusterRunner.class);
hadoopCluster.setTemporaryPath(getTemporaryPath());
hadoopCluster.setConfiguration(hadoopCluster.prepareConfiguration(new JobConf()));
hadoopCluster.start();
// Initialize Hdfs Client
hdfsClient = FileSystem.get(hadoopCluster.getConfiguration());
LOG.debug("HDFS Client: " + hdfsClient);
}
protected void startSqoop() throws Exception {
// Start server
cluster = createSqoopMiniCluster();
cluster.start();
// Initialize Sqoop Client API
setClient(new SqoopClient(getServerUrl()));
}
protected void stopSqoop() throws Exception {
if (cluster != null) {
cluster.stop();
}
}
protected void stopHadoop() throws Exception {
hadoopCluster.stop();
}
/**
* Create Sqoop MiniCluster instance that should be used for this test.
*
* @return New instance of test mini cluster
*/
public SqoopMiniCluster createSqoopMiniCluster() throws Exception {
return SqoopMiniClusterFactory.getSqoopMiniCluster(System.getProperties(), JettySqoopMiniCluster.class,
getSqoopMiniClusterTemporaryPath(), hadoopCluster.getConfiguration(), kdc);
}
/**
* Return SqoopClient configured to talk to testing server.
*
* @return
*/
public static SqoopClient getClient() {
return client;
}
public static void setClient(SqoopClient sqoopClient) throws Exception {
client = sqoopClient;
kdc.authenticateWithSqoopServer(sqoopClient);
}
public static SqoopMiniCluster getCluster() {
return cluster;
}
public static void setCluster(JettySqoopMiniCluster sqoopMiniClusterluster) {
cluster = sqoopMiniClusterluster;
}
public static String getTemporaryPath() {
return tmpPath;
}
public static String getSqoopMiniClusterTemporaryPath() {
return HdfsUtils.joinPathFragments(getTemporaryPath(), "sqoop-mini-cluster");
}
/**
* Return testing server URL
*
* @return
*/
public static String getServerUrl() {
return cluster.getServerUrl();
}
/**
* Return mapreduce base directory.
*
* @return
*/
public String getMapreduceDirectory() {
return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), getTestName());
}
/**
* Assert that execution has generated following lines.
*
* As the lines can be spread between multiple files the ordering do not make
* a difference.
*
* @param lines
* @throws IOException
*/
protected void assertTo(String... lines) throws IOException {
// TODO(VB): fix this to be not directly dependent on hdfs/MR
HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines);
}
/**
* Verify number of TO files.
*
* @param expectedFiles Expected number of files
*/
protected void assertToFiles(int expectedFiles) throws IOException {
// TODO(VB): fix this to be not directly dependent on hdfs/MR
HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles);
}
/**
* Create FROM file with specified content.
*
* @param filename Input file name
* @param lines Individual lines that should be written into the file
* @throws IOException
*/
protected void createFromFile(String filename, String...lines) throws IOException {
createFromFile(hdfsClient, filename, lines);
}
/**
* Create file on given HDFS instance with given lines
*/
protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
}
}

View File

@ -18,20 +18,26 @@
package org.apache.sqoop.integration.connectorloading;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
@Test(groups = "no-real-cluster")
public class BlacklistedConnectorTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class})
public class BlacklistedConnectorTest extends SqoopTestCase {
private SqoopMiniCluster sqoopMiniCluster;
public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster {
public DerbySqoopMiniCluster(String temporaryPath, Configuration configuration) throws Exception {
super(temporaryPath, configuration);
@ -46,33 +52,30 @@ protected Map<String, String> getBlacklistedConnectorConfiguration() {
}
}
@BeforeMethod(dependsOnMethods = { "init" })
public void startSqoopMiniCluster() throws Exception {
// And use them for new Derby repo instance
setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super
.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration()));
sqoopMiniCluster = new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super
.getTemporaryPath(), getTestName()), getHadoopConf());
KdcInfrastructureProvider kdcProvider = getInfrastructureProvider(KdcInfrastructureProvider.class);
if (kdcProvider != null) {
sqoopMiniCluster.setKdc(kdcProvider.getInstance());
}
// Start server
getCluster().start();
sqoopMiniCluster.start();
// Initialize Sqoop Client API
setClient(new SqoopClient(getServerUrl()));
initSqoopClient(sqoopMiniCluster.getServerUrl());
}
@Test(expectedExceptions = {SqoopException.class})
public void testCreateLinkWithNonexistantConnector() throws Exception {
startSqoopMiniCluster();
getClient().createLink("generic-jdbc-connector");
}
@AfterMethod
public void stopCluster() throws Exception {
if (getCluster() != null) {
getCluster().stop();
}
}
@Override
public void startSqoop() throws Exception {
// Do nothing so that Sqoop isn't started before Suite.
sqoopMiniCluster.stop();
}
}

View File

@ -19,13 +19,16 @@
package org.apache.sqoop.integration.connectorloading;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.test.utils.ConnectorUtils;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
@ -38,7 +41,8 @@
import java.util.Map;
@Test(groups = "no-real-cluster")
public class ClasspathTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class ClasspathTest extends SqoopTestCase {
private static final String TEST_CONNECTOR_JAR_NAME = "test-connector.jar";
private static final String TEST_DEPENDENCY_JAR_NAME = "test-dependency.jar";
@ -61,6 +65,7 @@ public class ClasspathTest extends ConnectorTestCase {
};
private ClassLoader classLoader;
private SqoopMiniCluster sqoopMiniCluster;
public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster {
@ -91,13 +96,18 @@ protected Map<String, String> getClasspathConfiguration() {
public void startSqoopMiniCluster(String extraClasspath, String jobExtraClasspath) throws Exception {
// And use them for new Derby repo instance
setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration(), extraClasspath, jobExtraClasspath));
sqoopMiniCluster = new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments
(super.getTemporaryPath(), getTestName()), getHadoopConf(), extraClasspath, jobExtraClasspath);
KdcInfrastructureProvider kdcProvider = getInfrastructureProvider(KdcInfrastructureProvider.class);
if (kdcProvider != null) {
sqoopMiniCluster.setKdc(kdcProvider.getInstance());
}
// Start server
getCluster().start();
sqoopMiniCluster.start();
// Initialize Sqoop Client API
setClient(new SqoopClient(getServerUrl()));
initSqoopClient(sqoopMiniCluster.getServerUrl());
}
@BeforeMethod
@ -185,8 +195,7 @@ private MDriverConfig prepareDriverConfig(MJob job) {
return driverConfig;
}
@Override
public void startSqoop() throws Exception {
// Do nothing so that Sqoop isn't started before Suite.
private void stopSqoop() throws Exception {
sqoopMiniCluster.stop();
}
}

View File

@ -24,13 +24,15 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.minicluster.SqoopMiniCluster;
import org.apache.sqoop.test.utils.ConnectorUtils;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
@ -38,7 +40,8 @@
import org.testng.annotations.Test;
@Test(groups = "no-real-cluster")
public class ConnectorClasspathIsolationTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class})
public class ConnectorClasspathIsolationTest extends SqoopTestCase {
private static final String TEST_FROM_CONNECTOR_JAR_NAME = "test-from-connector.jar";
private static final String TEST_TO_CONNECTOR_JAR_NAME = "test-to-connector.jar";
@ -82,6 +85,7 @@ public class ConnectorClasspathIsolationTest extends ConnectorTestCase {
};
private ClassLoader classLoader;
private SqoopMiniCluster sqoopMiniCluster;
public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster {
@ -106,13 +110,17 @@ protected Map<String, String> getClasspathConfiguration() {
public void startSqoopMiniCluster(String extraClasspath) throws Exception {
// And use them for new Derby repo instance
setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration(), extraClasspath));
sqoopMiniCluster = new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super.getTemporaryPath(), getTestName()), getHadoopConf(), extraClasspath);
KdcInfrastructureProvider kdcProvider = getInfrastructureProvider(KdcInfrastructureProvider.class);
if (kdcProvider != null) {
sqoopMiniCluster.setKdc(kdcProvider.getInstance());
}
// Start server
getCluster().start();
sqoopMiniCluster.start();
// Initialize Sqoop Client API
setClient(new SqoopClient(getServerUrl()));
initSqoopClient(sqoopMiniCluster.getServerUrl());
}
@BeforeMethod
@ -175,8 +183,7 @@ private MDriverConfig prepareDriverConfig(MJob job) {
return driverConfig;
}
@Override
public void startSqoop() throws Exception {
// Do nothing so that Sqoop isn't started before Suite.
private void stopSqoop() throws Exception {
sqoopMiniCluster.stop();
}
}