diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java index 54e2ef48..8ca298da 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java @@ -67,6 +67,7 @@ private KafkaConsumer getKafkaConsumer() { } public void initTopicList(List topics) { + kafkaConsumer = new KafkaConsumer(); getKafkaConsumer().initTopicList(topics); } @@ -85,7 +86,6 @@ public void prepare() throws Exception { } catch (InterruptedException e) { // ignore } - getKafkaConsumer(); logger.info("Completed the prepare phase."); } diff --git a/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java b/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java index 8a822b53..3aa05139 100644 --- a/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java +++ b/test/src/main/java/org/apache/sqoop/test/hive/MetastoreServerRunner.java @@ -23,7 +23,6 @@ import org.apache.sqoop.common.test.utils.NetworkUtils; import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.util.UUID; @@ -41,6 +40,11 @@ public abstract class MetastoreServerRunner { private final int port; private final String warehouseDirectory; + /** + * Temporary path that can be used as a root for other directories of metastore. + */ + private String temporaryPath; + public MetastoreServerRunner(String hostname, int port) throws Exception { this.hostname = hostname; this.warehouseDirectory = "/user/hive/" + UUID.randomUUID(); @@ -131,12 +135,30 @@ public int getPort() { return this.port; } + /** + * Get temporary path. + * + * @return + */ + public String getTemporaryPath() { + return temporaryPath; + } + + /** + * Set temporary path. + * + * @param temporaryPath + */ + public void setTemporaryPath(String temporaryPath) { + this.temporaryPath = temporaryPath; + } + private void printConfig() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { - config.logVars(new PrintStream(baos.toString("UTF-8"), "UTF-8")); + config.logVars(new PrintStream(baos, false, "UTF-8")); LOG.debug("Hive server runner configuration:\n" + baos.toString("UTF-8")); - } catch (UnsupportedEncodingException |FileNotFoundException e) { + } catch (UnsupportedEncodingException e) { LOG.warn("Error to print the Hive server runner configuration.", e); } } diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java index 2c6a537d..71cdc220 100644 --- a/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/SqoopTestCase.java @@ -20,19 +20,23 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.client.SubmissionCallback; +import org.apache.sqoop.common.test.asserts.ProviderAsserts; import org.apache.sqoop.common.test.db.DatabaseProvider; import org.apache.sqoop.common.test.db.TableName; +import org.apache.sqoop.common.test.kafka.TestUtil; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.model.MPersistableEntity; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.submission.SubmissionStatus; +import org.apache.sqoop.test.asserts.HdfsAsserts; import org.apache.sqoop.test.data.Cities; import org.apache.sqoop.test.data.ShortStories; import org.apache.sqoop.test.data.UbuntuReleases; @@ -45,6 +49,7 @@ import org.apache.sqoop.test.utils.HdfsUtils; import org.apache.sqoop.test.utils.SqoopUtils; import org.apache.sqoop.validation.Status; +import org.testng.Assert; import org.testng.ITest; import org.testng.ITestContext; import org.testng.ITestNGMethod; @@ -52,16 +57,22 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeSuite; +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.lang.reflect.Method; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import kafka.message.MessageAndMetadata; + +import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotSame; /** * Use Infrastructure annotation to boot up miniclusters. @@ -98,12 +109,16 @@ public void finished(MSubmission submission) { private static String suiteName; - private String methodName; + protected String methodName; private SqoopClient client; private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token(); + protected FileSystem hdfsClient; + + protected DatabaseProvider provider; + @BeforeSuite public static void findSuiteName(ITestContext context) { suiteName = context.getSuite().getName(); @@ -269,11 +284,9 @@ public void fillRdbmsLinkConfig(MLink link) { * @param partitionColumn */ public void fillRdbmsFromConfig(MJob job, String partitionColumn) { - DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance(); - MConfigList fromConfig = job.getFromJobConfig(); - fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName())); - fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName(partitionColumn)); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(getTableName().getTableName()); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn); } /** @@ -281,10 +294,8 @@ public void fillRdbmsFromConfig(MJob job, String partitionColumn) { * @param job */ public void fillRdbmsToConfig(MJob job) { - DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance(); - MConfigList toConfig = job.getToJobConfig(); - toConfig.getStringInput("toJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName())); + toConfig.getStringInput("toJobConfig.tableName").setValue(getTableName().getTableName()); } /** @@ -318,6 +329,12 @@ public void fillHdfsToConfig(MJob job, ToFormat output) { toConfig.getStringInput("toJobConfig.outputDirectory").setValue(getMapreduceDirectory()); } + public void fillHdfsLink(MLink link) { + MConfigList configs = link.getConnectorLinkConfig(); + configs.getStringInput("linkConfig.confDir").setValue( + (getInfrastructureProvider(SqoopInfrastructureProvider.class)).getInstance().getConfigurationPath()); + } + public String getSqoopServerUrl() { if (getInfrastructureProvider(SqoopInfrastructureProvider.class) == null) { return null; @@ -335,11 +352,8 @@ public DelegationTokenAuthenticatedURL.Token getAuthToken() { return authToken; } - /** - * Create a sqoop client - */ @BeforeMethod - public void initSqoopClient() throws Exception { + public void init() throws Exception { String serverUrl = getSqoopServerUrl(); if (serverUrl != null) { @@ -351,6 +365,15 @@ public void initSqoopClient() throws Exception { kdcProvider.getInstance().authenticateWithSqoopServer(new URL(serverUrl), authToken); } } + + if (getInfrastructureProvider(HadoopInfrastructureProvider.class) != null) { + hdfsClient = FileSystem.get(getInfrastructureProvider(HadoopInfrastructureProvider.class).getHadoopConfiguration()); + hdfsClient.delete(new Path(getMapreduceDirectory()), true); + } + + if (getInfrastructureProvider(DatabaseInfrastructureProvider.class) != null) { + provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance(); + } } /** @@ -389,6 +412,16 @@ public void executeJob(String jobName) throws Exception { assertEquals(SubmissionStatus.SUCCEEDED, finalSubmission.getStatus(), "Submission finished with error: " + finalSubmission.getError().getErrorSummary()); } + /** + * Run given job. + * + * @param job Job object + * @throws Exception + */ + protected void executeJob(MJob job) throws Exception { + executeJob(job.getName()); + } + /** * Fetch table name to be used by this test. * @return TableName @@ -493,4 +526,108 @@ public void clearLink() { getClient().deleteLink(link.getName()); } } + + /** + * Assert that execution has generated following lines. + * + * As the lines can be spread between multiple files the ordering do not make + * a difference. + * + * @param lines + * @throws IOException + */ + protected void assertTo(String... lines) throws IOException { + // TODO(VB): fix this to be not directly dependent on hdfs/MR + HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines); + } + + /** + * Verify number of TO files. + * + * @param expectedFiles Expected number of files + */ + protected void assertToFiles(int expectedFiles) throws IOException { + // TODO(VB): fix this to be not directly dependent on hdfs/MR + HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles); + } + + /** + * Assert row in testing table. + * + * @param conditions Conditions in config that are expected by the database provider + * @param values Values that are expected in the table (with corresponding types) + */ + protected void assertRow(Object[] conditions, Object ...values) { + DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance(); + ProviderAsserts.assertRow(provider, getTableName(), conditions, values); + } + + /** + * Assert row in table "cities". + * + * @param values Values that are expected + */ + protected void assertRowInCities(Object... values) { + assertRow(new Object[]{"id", values[0]}, values); + } + + /** + * Create FROM file with specified content. + * + * @param filename Input file name + * @param lines Individual lines that should be written into the file + * @throws IOException + */ + protected void createFromFile(String filename, String...lines) throws IOException { + createFromFile(hdfsClient, filename, lines); + } + + /** + * Create file on given HDFS instance with given lines + */ + protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException { + HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines); + } + + /** + * Create table cities. + */ + protected void createTableCities() { + DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance(); + new Cities(provider, getTableName()).createTables(); + } + + protected void fillKafkaLinkConfig(MLink link) { + MConfigList configs = link.getConnectorLinkConfig(); + configs.getStringInput("linkConfig.brokerList").setValue(TestUtil.getInstance().getKafkaServerUrl()); + configs.getStringInput("linkConfig.zookeeperConnect").setValue(TestUtil.getInstance().getZkUrl()); + + } + + protected void fillKafkaToConfig(MJob job, String topic){ + MConfigList toConfig = job.getToJobConfig(); + toConfig.getStringInput("toJobConfig.topic").setValue(topic); + List topics = new ArrayList(1); + topics.add(topic); + TestUtil.getInstance().initTopicList(topics); + } + + /** + * Compare strings in content to the messages in Kafka topic + * @param content + * @throws UnsupportedEncodingException + */ + protected void validateContent(String[] content, String topic) throws UnsupportedEncodingException { + + Set inputSet = new HashSet(Arrays.asList(content)); + Set outputSet = new HashSet(); + + for(int i = 0; i < content.length; i++) { + MessageAndMetadata fetchedMsg = + TestUtil.getInstance().getNextMessageFromConsumer(topic); + outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); + } + + Assert.assertEquals(inputSet, outputSet); + } } diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java new file mode 100644 index 00000000..0db2891f --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/HiveInfrastructureProvider.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.infrastructure.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.test.db.HiveProvider; +import org.apache.sqoop.test.hive.HiveServerRunner; +import org.apache.sqoop.test.hive.HiveServerRunnerFactory; +import org.apache.sqoop.test.hive.InternalHiveServerRunner; +import org.apache.sqoop.test.hive.InternalMetastoreServerRunner; +import org.apache.sqoop.test.hive.MetastoreServerRunner; +import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory; +import org.apache.sqoop.test.kdc.KdcRunner; +import org.apache.sqoop.test.utils.HdfsUtils; + +public class HiveInfrastructureProvider extends InfrastructureProvider { + + private static final Logger LOG = Logger.getLogger(HiveInfrastructureProvider.class); + + private HiveServerRunner hiveServerRunner; + private MetastoreServerRunner metastoreServerRunner; + private HiveProvider hiveProvider; + + private FileSystem hdfsClient; + + public HiveInfrastructureProvider() { + try { + metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class); + hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class); + } catch (Exception e) { + LOG.error("Error fetching Hadoop runner.", e); + } + } + + @Override + public void start() { + try { + LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName()); + metastoreServerRunner.start(); + + LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName()); + hiveServerRunner.start(); + + LOG.info("Starting Hive Provider: " + HiveProvider.class.getName()); + hiveProvider = new HiveProvider(hiveServerRunner.getUrl()); + hiveProvider.start(); + } catch (Exception e) { + LOG.error("Error starting hive runner.", e); + } + } + + @Override + public void stop() { + try { + if (hiveProvider != null) { + LOG.info("Stopping Hive Provider: " + hiveProvider.getClass().getName()); + hiveProvider.stop(); + } + + if (hiveServerRunner != null) { + LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName()); + hiveServerRunner.stop(); + } + + if (metastoreServerRunner != null) { + LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName()); + metastoreServerRunner.stop(); + } + } catch(Exception e) { + LOG.error("Could not stop hive runner.", e); + } + } + + @Override + public void setHadoopConfiguration(Configuration conf) { + try { + hdfsClient = FileSystem.get(conf); + String databasePath = HdfsUtils.joinPathFragments(getRootPath(), "metastore_db"); + metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(conf)); + metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, + "jdbc:derby:;databaseName=" + databasePath + ";create=true"); + ensureWarehouseDirectory(metastoreServerRunner.getConfiguration()); + hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration())); + } catch (Exception e) { + LOG.error("Could not set configuration.", e); + } + } + + @Override + public Configuration getHadoopConfiguration() { + return hiveServerRunner.getConfiguration(); + } + + @Override + public void setRootPath(String path) { + metastoreServerRunner.setTemporaryPath(path); + } + + @Override + public String getRootPath() { + return metastoreServerRunner.getTemporaryPath(); + } + + @Override + public void setKdc(KdcRunner kdc) { + // Do nothing for the time being. Need to handle this when we support kerberos enabled MiniCluster. + } + + public MetastoreServerRunner getHiveMetastore() { + return metastoreServerRunner; + } + + public HiveServerRunner getHiveServer() { + return hiveServerRunner; + } + + public HiveProvider getHiveProvider() { + return hiveProvider; + } + + private void ensureWarehouseDirectory(Configuration conf) throws Exception { + String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + StringBuilder dir = new StringBuilder(); + for (String part : warehouseDirectory.split("/")) { + dir.append(part).append("/"); + Path path = new Path(dir.toString()); + if (!hdfsClient.exists(path)) { + hdfsClient.mkdirs(path); + } + } + hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777)); + } +} diff --git a/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java new file mode 100644 index 00000000..e2a32d7a --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/infrastructure/providers/KafkaInfrastructureProvider.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.infrastructure.providers; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.test.kafka.TestUtil; +import org.apache.sqoop.test.kdc.KdcRunner; + +public class KafkaInfrastructureProvider extends InfrastructureProvider { + + private static final Logger LOG = Logger.getLogger(KafkaInfrastructureProvider.class); + + private static TestUtil testUtil = TestUtil.getInstance(); + protected String topic; + + @Override + public void start() { + // starts Kafka server and its dependent zookeeper + try { + testUtil.prepare(); + } catch (Exception e) { + LOG.error("Error starting kafka.", e); + } + } + + @Override + public void stop() { + try { + testUtil.tearDown(); + } catch (IOException e) { + LOG.error("Error stopping kafka.", e); + } + } + + @Override + public void setHadoopConfiguration(Configuration conf) { + // do nothing + } + + @Override + public Configuration getHadoopConfiguration() { + return null; + } + + @Override + public void setRootPath(String path) { + // do nothing + } + + @Override + public String getRootPath() { + return null; + } + + @Override + public void setKdc(KdcRunner kdc) { + // do nothing + } + +} diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java deleted file mode 100644 index 162adf87..00000000 --- a/test/src/main/java/org/apache/sqoop/test/testcases/HiveConnectorTestCase.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.test.testcases; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.test.db.HiveProvider; -import org.apache.sqoop.test.hive.InternalHiveServerRunner; -import org.apache.sqoop.test.hive.HiveServerRunner; -import org.apache.sqoop.test.hive.HiveServerRunnerFactory; -import org.apache.sqoop.test.hive.InternalMetastoreServerRunner; -import org.apache.sqoop.test.hive.MetastoreServerRunner; -import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory; -import org.apache.sqoop.test.utils.HdfsUtils; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; - -public class HiveConnectorTestCase extends ConnectorTestCase { - private static final Logger LOG = Logger.getLogger(HiveConnectorTestCase.class); - - protected HiveServerRunner hiveServerRunner; - protected MetastoreServerRunner metastoreServerRunner; - protected HiveProvider hiveProvider; - - private void ensureWarehouseDirectory(Configuration conf) throws Exception { - String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); - StringBuilder dir = new StringBuilder(); - for (String part : warehouseDirectory.split("/")) { - dir.append(part).append("/"); - Path path = new Path(dir.toString()); - if (!hdfsClient.exists(path)) { - hdfsClient.mkdirs(path); - } - } - hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777)); - } - - @BeforeMethod(alwaysRun = true) - public void startHive() throws Exception { - String databasePath = HdfsUtils.joinPathFragments(getTemporaryPath(), "metastore_db"); - metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class); - metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(hadoopCluster.getConfiguration())); - metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, - "jdbc:derby:;databaseName=" + databasePath + ";create=true"); - ensureWarehouseDirectory(metastoreServerRunner.getConfiguration()); - LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName()); - metastoreServerRunner.start(); - - hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class); - hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration())); - LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName()); - hiveServerRunner.start(); - - LOG.info("Starting Hive Provider: " + provider.getClass().getName()); - hiveProvider = new HiveProvider(hiveServerRunner.getUrl()); - hiveProvider.start(); - } - - @AfterMethod(alwaysRun = true) - public void stopHive() throws Exception { - if (hiveProvider != null) { - LOG.info("Stopping Hive Provider: " + provider.getClass().getName()); - hiveProvider.stop(); - } - - if (hiveServerRunner != null) { - LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName()); - hiveServerRunner.stop(); - } - - if (metastoreServerRunner != null) { - LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName()); - metastoreServerRunner.stop(); - } - } -} diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java deleted file mode 100644 index f15c07ec..00000000 --- a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.test.testcases; - -import kafka.message.MessageAndMetadata; -import org.apache.sqoop.model.MConfigList; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.model.MLink; -import org.testng.Assert; -import org.apache.sqoop.common.test.kafka.TestUtil; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.HashSet; -import java.util.Arrays; - -import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText; - -public class KafkaConnectorTestCase extends ConnectorTestCase { - private static TestUtil testUtil = TestUtil.getInstance(); - protected String topic; - - @BeforeClass(alwaysRun = true) - public void startKafka() throws Exception { - // starts Kafka server and its dependent zookeeper - testUtil.prepare(); - } - - @AfterClass(alwaysRun = true) - public void stopKafka() throws IOException { - testUtil.tearDown(); - } - - protected void fillKafkaLinkConfig(MLink link) { - MConfigList configs = link.getConnectorLinkConfig(); - configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl()); - configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl()); - - } - - protected void fillKafkaToConfig(MJob job){ - MConfigList toConfig = job.getToJobConfig(); - toConfig.getStringInput("toJobConfig.topic").setValue(topic); - List topics = new ArrayList(1); - topics.add(topic); - testUtil.initTopicList(topics); - } - - /** - * Compare strings in content to the messages in Kafka topic - * @param content - * @throws UnsupportedEncodingException - */ - protected void validateContent(String[] content) throws UnsupportedEncodingException { - - Set inputSet = new HashSet(Arrays.asList(content)); - Set outputSet = new HashSet(); - - for(String str: content) { - MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(topic); - outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8"))); - } - - Assert.assertEquals(inputSet, outputSet); - } -} diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java index fa660d52..68855257 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java @@ -17,17 +17,20 @@ */ package org.apache.sqoop.integration.connector.hdfs; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; -/** - */ -public class AppendModeTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class AppendModeTest extends SqoopTestCase { @Test public void test() throws Exception { diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java index c9531318..c6ce1e87 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java @@ -23,14 +23,19 @@ import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.test.asserts.HdfsAsserts; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.HdfsUtils; import org.testng.annotations.Test; /** * Test schemaless to schemaless transfer by using two hdfs connectors */ -public class FromHDFSToHDFSTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class}) +public class FromHDFSToHDFSTest extends SqoopTestCase { @Test public void test() throws Exception { diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java index d48c6d2e..37306e2e 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java @@ -20,7 +20,12 @@ import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -29,7 +34,8 @@ import static org.testng.Assert.assertEquals; -public class HdfsIncrementalReadTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class HdfsIncrementalReadTest extends SqoopTestCase { @BeforeMethod(alwaysRun = true) public void createTable() { diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java index cd2ed6c1..0e14b7aa 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/InformalJobNameExecuteTest.java @@ -21,14 +21,20 @@ import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.*; import java.sql.Timestamp; import static org.testng.Assert.assertEquals; -public class InformalJobNameExecuteTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class InformalJobNameExecuteTest extends SqoopTestCase { private String jobName; diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java index 2759b42a..e5e3d267 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java @@ -19,14 +19,17 @@ import org.apache.hadoop.fs.Path; import org.apache.sqoop.client.ClientError; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.error.code.HdfsConnectorError; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -34,9 +37,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -/** - */ -public class OutputDirectoryTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class OutputDirectoryTest extends SqoopTestCase { @Test public void testOutputDirectoryIsAFile() throws Exception { createAndLoadTableCities(); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java index f98870db..c8576999 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java @@ -25,7 +25,12 @@ import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.test.asserts.HdfsAsserts; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.SkipException; import org.testng.annotations.Test; @@ -42,7 +47,8 @@ * -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI... * -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx... */ -public class S3Test extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class S3Test extends SqoopTestCase { public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket"; public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access"; diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java index e9644227..ec9f733b 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hive/FromRDBMSToKiteHiveTest.java @@ -18,16 +18,20 @@ package org.apache.sqoop.integration.connector.hive; import org.apache.sqoop.common.test.asserts.ProviderAsserts; -import org.apache.sqoop.common.test.db.DatabaseProvider; -import org.apache.sqoop.common.test.db.DatabaseProviderFactory; +import org.apache.sqoop.common.test.db.HiveProvider; import org.apache.sqoop.common.test.db.TableName; import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.HiveConnectorTestCase; -import org.testng.ITest; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HiveInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -38,14 +42,14 @@ import java.util.List; @Test(groups = {"slow", "no-real-cluster"}) -public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements ITest { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, HiveInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class FromRDBMSToKiteHiveTest extends SqoopTestCase { private String testName; private FileFormat fileFormat; private MLink rdbmsLink; private MLink kiteLink; - private String hiveTableName; @Factory(dataProvider="rdbms-to-kite-hive-test") public FromRDBMSToKiteHiveTest(FileFormat fileFormat) { @@ -54,7 +58,6 @@ public FromRDBMSToKiteHiveTest(FileFormat fileFormat) { @DataProvider(name="rdbms-to-kite-hive-test", parallel=true) public static Object[][] data() throws Exception { - DatabaseProvider provider = DatabaseProviderFactory.getProvider(System.getProperties()); return new Object[][]{ {FileFormat.AVRO}, {FileFormat.PARQUET} @@ -99,7 +102,7 @@ public void createLinks() { // Kite link kiteLink = getClient().createLink("kite-connector"); kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority") - .setValue(metastoreServerRunner.getAuthority()); + .setValue(getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveMetastore().getAuthority()); saveLink(kiteLink); } @@ -126,6 +129,7 @@ public void testCities() throws Exception { saveJob(job); executeJob(job); + HiveProvider hiveProvider = getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveProvider(); // Assert correct output ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 1}, "1"); ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 2}, "2"); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java index ac5a61af..9c0ee847 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/AllTypesTest.java @@ -27,9 +27,13 @@ import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.ParametrizedUtils; -import org.testng.ITest; import org.testng.ITestContext; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; @@ -43,7 +47,8 @@ * Test transfer of all supported data types. */ @Test(groups = "slow") -public class AllTypesTest extends ConnectorTestCase implements ITest { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class AllTypesTest extends SqoopTestCase { private static String testName; diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java index 07eaba11..933bc08d 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java @@ -17,13 +17,15 @@ */ package org.apache.sqoop.integration.connector.jdbc.generic; -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; -import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -32,10 +34,8 @@ import static org.testng.Assert.assertEquals; -/** - * - */ -public class FromHDFSToRDBMSTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class FromHDFSToRDBMSTest extends SqoopTestCase { @BeforeMethod(alwaysRun = true) public void createTable() { createTableCities(); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 4cb0edcc..7e660918 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -22,7 +22,12 @@ import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; import java.util.List; @@ -30,7 +35,8 @@ /** * Import simple table with various configurations. */ -public class FromRDBMSToHDFSTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class FromRDBMSToHDFSTest extends SqoopTestCase { @Test public void testCities() throws Exception { diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java index 0c04fd95..83012eb1 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java @@ -23,22 +23,23 @@ import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.ParametrizedUtils; -import org.testng.ITest; import org.testng.ITestContext; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; -import java.lang.reflect.Method; - import static org.testng.Assert.assertEquals; -/** - */ -public class IncrementalReadTest extends ConnectorTestCase implements ITest { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class IncrementalReadTest extends SqoopTestCase { public static Object[] COLUMNS = new Object [][] { // column - last value - new max value diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java index 8129c6a3..e5e886e2 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/PartitionerTest.java @@ -18,15 +18,17 @@ package org.apache.sqoop.integration.connector.jdbc.generic; import com.google.common.collect.Iterables; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.ParametrizedUtils; -import org.testng.ITest; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -35,7 +37,8 @@ * */ @Test(groups = "slow") -public class PartitionerTest extends ConnectorTestCase implements ITest { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class PartitionerTest extends SqoopTestCase { /** * Columns that we will use as partition column with maximal number of diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java index 70b6eff7..890fc105 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java @@ -19,21 +19,23 @@ import static org.testng.Assert.assertEquals; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.test.db.TableName; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.test.data.Cities; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; import java.sql.Timestamp; -/** - * - */ -public class TableStagedRDBMSTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class TableStagedRDBMSTest extends SqoopTestCase { @Test public void testStagedTransfer() throws Exception { diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java index ea8c7b9f..5e349d1d 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromHDFSToKafkaTest.java @@ -20,11 +20,17 @@ import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.KafkaConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; @Test(groups = "no-real-cluster") -public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class}) +public class FromHDFSToKafkaTest extends SqoopTestCase { public static final String[] input = { "A BIRD came down the walk:", @@ -32,9 +38,10 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase { "He bit an angle-worm in halves", "And ate the fellow raw." }; + @Test - public void testBasic() throws Exception { - topic = getTestName(); + public void testFromHDFSToKafka() throws Exception { + String topic = getTestName(); createFromFile("input-0001",input); @@ -53,7 +60,7 @@ public void testBasic() throws Exception { // Job connector configs fillHdfsFromConfig(job); - fillKafkaToConfig(job); + fillKafkaToConfig(job, topic); // driver config MDriverConfig driverConfig = job.getDriverConfig(); @@ -63,7 +70,7 @@ public void testBasic() throws Exception { executeJob(job); // this will assert the content of the array matches the content of the topic - validateContent(input); + validateContent(input, topic); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java index a34378af..9ae1334b 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -17,16 +17,20 @@ */ package org.apache.sqoop.integration.connector.kafka; -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.KafkaConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.testng.annotations.Test; @Test(groups = "no-real-cluster") -public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, DatabaseInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class}) +public class FromRDBMSToKafkaTest extends SqoopTestCase { private static final String[] input = { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", @@ -36,8 +40,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { }; @Test - public void testBasic() throws Exception { - topic = getTestName(); + public void testFromRDBMSToKafka() throws Exception { + String topic = getTestName(); createAndLoadTableCities(); @@ -58,7 +62,7 @@ public void testBasic() throws Exception { fillRdbmsFromConfig(job, "id"); // set Kafka "TO" job config - fillKafkaToConfig(job); + fillKafkaToConfig(job, topic); // driver config MDriverConfig driverConfig = job.getDriverConfig(); @@ -68,7 +72,7 @@ public void testBasic() throws Exception { executeJob(job); // this will assert the content of the array matches the content of the topic - validateContent(input); + validateContent(input, topic); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java index be9fef10..10f36144 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java @@ -22,7 +22,12 @@ import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; -import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.HdfsUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -31,7 +36,8 @@ import java.util.List; @Test -public class FromRDBMSToKiteTest extends ConnectorTestCase { +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class FromRDBMSToKiteTest extends SqoopTestCase { @BeforeMethod(alwaysRun = true) public void createTable() { createAndLoadTableCities(); @@ -51,7 +57,7 @@ public void dropTable() { */ @Override public String getMapreduceDirectory() { - return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", ""); + return HdfsUtils.joinPathFragments(getInfrastructureProvider(HadoopInfrastructureProvider.class).getInstance().getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", ""); } @Test @@ -64,7 +70,8 @@ public void testCities() throws Exception { // Kite link MLink kiteLink = getClient().createLink("kite-connector"); kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority()); - kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath()); + kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue( + getInfrastructureProvider(SqoopInfrastructureProvider.class).getInstance().getConfigurationPath()); saveLink(kiteLink); // Job creation