5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-20 19:00:48 +08:00

SQOOP-2542: Sqoop2: Provide test infrastructure base class for connector tests

(Dian Fu via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-12-30 05:16:40 -08:00
parent a03adec290
commit 8d63df7145
23 changed files with 551 additions and 275 deletions

View File

@ -67,6 +67,7 @@ private KafkaConsumer getKafkaConsumer() {
}
public void initTopicList(List<String> topics) {
kafkaConsumer = new KafkaConsumer();
getKafkaConsumer().initTopicList(topics);
}
@ -85,7 +86,6 @@ public void prepare() throws Exception {
} catch (InterruptedException e) {
// ignore
}
getKafkaConsumer();
logger.info("Completed the prepare phase.");
}

View File

@ -23,7 +23,6 @@
import org.apache.sqoop.common.test.utils.NetworkUtils;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
@ -41,6 +40,11 @@ public abstract class MetastoreServerRunner {
private final int port;
private final String warehouseDirectory;
/**
* Temporary path that can be used as a root for other directories of metastore.
*/
private String temporaryPath;
public MetastoreServerRunner(String hostname, int port) throws Exception {
this.hostname = hostname;
this.warehouseDirectory = "/user/hive/" + UUID.randomUUID();
@ -131,12 +135,30 @@ public int getPort() {
return this.port;
}
/**
* Get temporary path.
*
* @return
*/
public String getTemporaryPath() {
return temporaryPath;
}
/**
* Set temporary path.
*
* @param temporaryPath
*/
public void setTemporaryPath(String temporaryPath) {
this.temporaryPath = temporaryPath;
}
private void printConfig() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
config.logVars(new PrintStream(baos.toString("UTF-8"), "UTF-8"));
config.logVars(new PrintStream(baos, false, "UTF-8"));
LOG.debug("Hive server runner configuration:\n" + baos.toString("UTF-8"));
} catch (UnsupportedEncodingException |FileNotFoundException e) {
} catch (UnsupportedEncodingException e) {
LOG.warn("Error to print the Hive server runner configuration.", e);
}
}

View File

@ -20,19 +20,23 @@
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.sqoop.common.test.asserts.ProviderAsserts;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.data.Cities;
import org.apache.sqoop.test.data.ShortStories;
import org.apache.sqoop.test.data.UbuntuReleases;
@ -45,6 +49,7 @@
import org.apache.sqoop.test.utils.HdfsUtils;
import org.apache.sqoop.test.utils.SqoopUtils;
import org.apache.sqoop.validation.Status;
import org.testng.Assert;
import org.testng.ITest;
import org.testng.ITestContext;
import org.testng.ITestNGMethod;
@ -52,16 +57,22 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeSuite;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.message.MessageAndMetadata;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
/**
* Use Infrastructure annotation to boot up miniclusters.
@ -98,12 +109,16 @@ public void finished(MSubmission submission) {
private static String suiteName;
private String methodName;
protected String methodName;
private SqoopClient client;
private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
protected FileSystem hdfsClient;
protected DatabaseProvider provider;
@BeforeSuite
public static void findSuiteName(ITestContext context) {
suiteName = context.getSuite().getName();
@ -269,11 +284,9 @@ public void fillRdbmsLinkConfig(MLink link) {
* @param partitionColumn
*/
public void fillRdbmsFromConfig(MJob job, String partitionColumn) {
DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
MConfigList fromConfig = job.getFromJobConfig();
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName(partitionColumn));
fromConfig.getStringInput("fromJobConfig.tableName").setValue(getTableName().getTableName());
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
}
/**
@ -281,10 +294,8 @@ public void fillRdbmsFromConfig(MJob job, String partitionColumn) {
* @param job
*/
public void fillRdbmsToConfig(MJob job) {
DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.tableName").setValue(provider.escapeTableName(getTableName().getTableName()));
toConfig.getStringInput("toJobConfig.tableName").setValue(getTableName().getTableName());
}
/**
@ -318,6 +329,12 @@ public void fillHdfsToConfig(MJob job, ToFormat output) {
toConfig.getStringInput("toJobConfig.outputDirectory").setValue(getMapreduceDirectory());
}
public void fillHdfsLink(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.confDir").setValue(
(getInfrastructureProvider(SqoopInfrastructureProvider.class)).getInstance().getConfigurationPath());
}
public String getSqoopServerUrl() {
if (getInfrastructureProvider(SqoopInfrastructureProvider.class) == null) {
return null;
@ -335,11 +352,8 @@ public DelegationTokenAuthenticatedURL.Token getAuthToken() {
return authToken;
}
/**
* Create a sqoop client
*/
@BeforeMethod
public void initSqoopClient() throws Exception {
public void init() throws Exception {
String serverUrl = getSqoopServerUrl();
if (serverUrl != null) {
@ -351,6 +365,15 @@ public void initSqoopClient() throws Exception {
kdcProvider.getInstance().authenticateWithSqoopServer(new URL(serverUrl), authToken);
}
}
if (getInfrastructureProvider(HadoopInfrastructureProvider.class) != null) {
hdfsClient = FileSystem.get(getInfrastructureProvider(HadoopInfrastructureProvider.class).getHadoopConfiguration());
hdfsClient.delete(new Path(getMapreduceDirectory()), true);
}
if (getInfrastructureProvider(DatabaseInfrastructureProvider.class) != null) {
provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
}
}
/**
@ -389,6 +412,16 @@ public void executeJob(String jobName) throws Exception {
assertEquals(SubmissionStatus.SUCCEEDED, finalSubmission.getStatus(), "Submission finished with error: " + finalSubmission.getError().getErrorSummary());
}
/**
* Run given job.
*
* @param job Job object
* @throws Exception
*/
protected void executeJob(MJob job) throws Exception {
executeJob(job.getName());
}
/**
* Fetch table name to be used by this test.
* @return TableName
@ -493,4 +526,108 @@ public void clearLink() {
getClient().deleteLink(link.getName());
}
}
/**
* Assert that execution has generated following lines.
*
* As the lines can be spread between multiple files the ordering do not make
* a difference.
*
* @param lines
* @throws IOException
*/
protected void assertTo(String... lines) throws IOException {
// TODO(VB): fix this to be not directly dependent on hdfs/MR
HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines);
}
/**
* Verify number of TO files.
*
* @param expectedFiles Expected number of files
*/
protected void assertToFiles(int expectedFiles) throws IOException {
// TODO(VB): fix this to be not directly dependent on hdfs/MR
HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles);
}
/**
* Assert row in testing table.
*
* @param conditions Conditions in config that are expected by the database provider
* @param values Values that are expected in the table (with corresponding types)
*/
protected void assertRow(Object[] conditions, Object ...values) {
DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
ProviderAsserts.assertRow(provider, getTableName(), conditions, values);
}
/**
* Assert row in table "cities".
*
* @param values Values that are expected
*/
protected void assertRowInCities(Object... values) {
assertRow(new Object[]{"id", values[0]}, values);
}
/**
* Create FROM file with specified content.
*
* @param filename Input file name
* @param lines Individual lines that should be written into the file
* @throws IOException
*/
protected void createFromFile(String filename, String...lines) throws IOException {
createFromFile(hdfsClient, filename, lines);
}
/**
* Create file on given HDFS instance with given lines
*/
protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
}
/**
* Create table cities.
*/
protected void createTableCities() {
DatabaseProvider provider = getInfrastructureProvider(DatabaseInfrastructureProvider.class).getInstance();
new Cities(provider, getTableName()).createTables();
}
protected void fillKafkaLinkConfig(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.brokerList").setValue(TestUtil.getInstance().getKafkaServerUrl());
configs.getStringInput("linkConfig.zookeeperConnect").setValue(TestUtil.getInstance().getZkUrl());
}
protected void fillKafkaToConfig(MJob job, String topic){
MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.topic").setValue(topic);
List<String> topics = new ArrayList<String>(1);
topics.add(topic);
TestUtil.getInstance().initTopicList(topics);
}
/**
* Compare strings in content to the messages in Kafka topic
* @param content
* @throws UnsupportedEncodingException
*/
protected void validateContent(String[] content, String topic) throws UnsupportedEncodingException {
Set<String> inputSet = new HashSet<String>(Arrays.asList(content));
Set<String> outputSet = new HashSet<String>();
for(int i = 0; i < content.length; i++) {
MessageAndMetadata<byte[],byte[]> fetchedMsg =
TestUtil.getInstance().getNextMessageFromConsumer(topic);
outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8")));
}
Assert.assertEquals(inputSet, outputSet);
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.infrastructure.providers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.test.db.HiveProvider;
import org.apache.sqoop.test.hive.HiveServerRunner;
import org.apache.sqoop.test.hive.HiveServerRunnerFactory;
import org.apache.sqoop.test.hive.InternalHiveServerRunner;
import org.apache.sqoop.test.hive.InternalMetastoreServerRunner;
import org.apache.sqoop.test.hive.MetastoreServerRunner;
import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory;
import org.apache.sqoop.test.kdc.KdcRunner;
import org.apache.sqoop.test.utils.HdfsUtils;
public class HiveInfrastructureProvider extends InfrastructureProvider {
private static final Logger LOG = Logger.getLogger(HiveInfrastructureProvider.class);
private HiveServerRunner hiveServerRunner;
private MetastoreServerRunner metastoreServerRunner;
private HiveProvider hiveProvider;
private FileSystem hdfsClient;
public HiveInfrastructureProvider() {
try {
metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class);
hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class);
} catch (Exception e) {
LOG.error("Error fetching Hadoop runner.", e);
}
}
@Override
public void start() {
try {
LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName());
metastoreServerRunner.start();
LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName());
hiveServerRunner.start();
LOG.info("Starting Hive Provider: " + HiveProvider.class.getName());
hiveProvider = new HiveProvider(hiveServerRunner.getUrl());
hiveProvider.start();
} catch (Exception e) {
LOG.error("Error starting hive runner.", e);
}
}
@Override
public void stop() {
try {
if (hiveProvider != null) {
LOG.info("Stopping Hive Provider: " + hiveProvider.getClass().getName());
hiveProvider.stop();
}
if (hiveServerRunner != null) {
LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName());
hiveServerRunner.stop();
}
if (metastoreServerRunner != null) {
LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName());
metastoreServerRunner.stop();
}
} catch(Exception e) {
LOG.error("Could not stop hive runner.", e);
}
}
@Override
public void setHadoopConfiguration(Configuration conf) {
try {
hdfsClient = FileSystem.get(conf);
String databasePath = HdfsUtils.joinPathFragments(getRootPath(), "metastore_db");
metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(conf));
metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:;databaseName=" + databasePath + ";create=true");
ensureWarehouseDirectory(metastoreServerRunner.getConfiguration());
hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration()));
} catch (Exception e) {
LOG.error("Could not set configuration.", e);
}
}
@Override
public Configuration getHadoopConfiguration() {
return hiveServerRunner.getConfiguration();
}
@Override
public void setRootPath(String path) {
metastoreServerRunner.setTemporaryPath(path);
}
@Override
public String getRootPath() {
return metastoreServerRunner.getTemporaryPath();
}
@Override
public void setKdc(KdcRunner kdc) {
// Do nothing for the time being. Need to handle this when we support kerberos enabled MiniCluster.
}
public MetastoreServerRunner getHiveMetastore() {
return metastoreServerRunner;
}
public HiveServerRunner getHiveServer() {
return hiveServerRunner;
}
public HiveProvider getHiveProvider() {
return hiveProvider;
}
private void ensureWarehouseDirectory(Configuration conf) throws Exception {
String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
StringBuilder dir = new StringBuilder();
for (String part : warehouseDirectory.split("/")) {
dir.append(part).append("/");
Path path = new Path(dir.toString());
if (!hdfsClient.exists(path)) {
hdfsClient.mkdirs(path);
}
}
hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777));
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.infrastructure.providers;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.apache.sqoop.test.kdc.KdcRunner;
public class KafkaInfrastructureProvider extends InfrastructureProvider {
private static final Logger LOG = Logger.getLogger(KafkaInfrastructureProvider.class);
private static TestUtil testUtil = TestUtil.getInstance();
protected String topic;
@Override
public void start() {
// starts Kafka server and its dependent zookeeper
try {
testUtil.prepare();
} catch (Exception e) {
LOG.error("Error starting kafka.", e);
}
}
@Override
public void stop() {
try {
testUtil.tearDown();
} catch (IOException e) {
LOG.error("Error stopping kafka.", e);
}
}
@Override
public void setHadoopConfiguration(Configuration conf) {
// do nothing
}
@Override
public Configuration getHadoopConfiguration() {
return null;
}
@Override
public void setRootPath(String path) {
// do nothing
}
@Override
public String getRootPath() {
return null;
}
@Override
public void setKdc(KdcRunner kdc) {
// do nothing
}
}

View File

@ -1,94 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.testcases;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.test.db.HiveProvider;
import org.apache.sqoop.test.hive.InternalHiveServerRunner;
import org.apache.sqoop.test.hive.HiveServerRunner;
import org.apache.sqoop.test.hive.HiveServerRunnerFactory;
import org.apache.sqoop.test.hive.InternalMetastoreServerRunner;
import org.apache.sqoop.test.hive.MetastoreServerRunner;
import org.apache.sqoop.test.hive.MetastoreServerRunnerFactory;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
public class HiveConnectorTestCase extends ConnectorTestCase {
private static final Logger LOG = Logger.getLogger(HiveConnectorTestCase.class);
protected HiveServerRunner hiveServerRunner;
protected MetastoreServerRunner metastoreServerRunner;
protected HiveProvider hiveProvider;
private void ensureWarehouseDirectory(Configuration conf) throws Exception {
String warehouseDirectory = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
StringBuilder dir = new StringBuilder();
for (String part : warehouseDirectory.split("/")) {
dir.append(part).append("/");
Path path = new Path(dir.toString());
if (!hdfsClient.exists(path)) {
hdfsClient.mkdirs(path);
}
}
hdfsClient.setPermission(new Path(dir.toString()), new FsPermission((short)01777));
}
@BeforeMethod(alwaysRun = true)
public void startHive() throws Exception {
String databasePath = HdfsUtils.joinPathFragments(getTemporaryPath(), "metastore_db");
metastoreServerRunner = MetastoreServerRunnerFactory.getRunner(System.getProperties(), InternalMetastoreServerRunner.class);
metastoreServerRunner.setConfiguration(metastoreServerRunner.prepareConfiguration(hadoopCluster.getConfiguration()));
metastoreServerRunner.getConfiguration().set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:;databaseName=" + databasePath + ";create=true");
ensureWarehouseDirectory(metastoreServerRunner.getConfiguration());
LOG.info("Starting Metastore Server: " + metastoreServerRunner.getClass().getName());
metastoreServerRunner.start();
hiveServerRunner = HiveServerRunnerFactory.getRunner(System.getProperties(), InternalHiveServerRunner.class);
hiveServerRunner.setConfiguration(hiveServerRunner.prepareConfiguration(metastoreServerRunner.getConfiguration()));
LOG.info("Starting Hive Server: " + hiveServerRunner.getClass().getName());
hiveServerRunner.start();
LOG.info("Starting Hive Provider: " + provider.getClass().getName());
hiveProvider = new HiveProvider(hiveServerRunner.getUrl());
hiveProvider.start();
}
@AfterMethod(alwaysRun = true)
public void stopHive() throws Exception {
if (hiveProvider != null) {
LOG.info("Stopping Hive Provider: " + provider.getClass().getName());
hiveProvider.stop();
}
if (hiveServerRunner != null) {
LOG.info("Stopping Hive Server: " + hiveServerRunner.getClass().getName());
hiveServerRunner.stop();
}
if (metastoreServerRunner != null) {
LOG.info("Stopping Metastore Server: " + metastoreServerRunner.getClass().getName());
metastoreServerRunner.stop();
}
}
}

View File

@ -1,87 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.test.testcases;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.testng.Assert;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.toText;
public class KafkaConnectorTestCase extends ConnectorTestCase {
private static TestUtil testUtil = TestUtil.getInstance();
protected String topic;
@BeforeClass(alwaysRun = true)
public void startKafka() throws Exception {
// starts Kafka server and its dependent zookeeper
testUtil.prepare();
}
@AfterClass(alwaysRun = true)
public void stopKafka() throws IOException {
testUtil.tearDown();
}
protected void fillKafkaLinkConfig(MLink link) {
MConfigList configs = link.getConnectorLinkConfig();
configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl());
configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl());
}
protected void fillKafkaToConfig(MJob job){
MConfigList toConfig = job.getToJobConfig();
toConfig.getStringInput("toJobConfig.topic").setValue(topic);
List<String> topics = new ArrayList<String>(1);
topics.add(topic);
testUtil.initTopicList(topics);
}
/**
* Compare strings in content to the messages in Kafka topic
* @param content
* @throws UnsupportedEncodingException
*/
protected void validateContent(String[] content) throws UnsupportedEncodingException {
Set<String> inputSet = new HashSet<String>(Arrays.asList(content));
Set<String> outputSet = new HashSet<String>();
for(String str: content) {
MessageAndMetadata<byte[],byte[]> fetchedMsg =
testUtil.getNextMessageFromConsumer(topic);
outputSet.add(toText(new String(fetchedMsg.message(), "UTF-8")));
}
Assert.assertEquals(inputSet, outputSet);
}
}

View File

@ -17,17 +17,20 @@
*/
package org.apache.sqoop.integration.connector.hdfs;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
/**
*/
public class AppendModeTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class AppendModeTest extends SqoopTestCase {
@Test
public void test() throws Exception {

View File

@ -23,14 +23,19 @@
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.Test;
/**
* Test schemaless to schemaless transfer by using two hdfs connectors
*/
public class FromHDFSToHDFSTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class})
public class FromHDFSToHDFSTest extends SqoopTestCase {
@Test
public void test() throws Exception {

View File

@ -20,7 +20,12 @@
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -29,7 +34,8 @@
import static org.testng.Assert.assertEquals;
public class HdfsIncrementalReadTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class HdfsIncrementalReadTest extends SqoopTestCase {
@BeforeMethod(alwaysRun = true)
public void createTable() {

View File

@ -21,14 +21,20 @@
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.*;
import java.sql.Timestamp;
import static org.testng.Assert.assertEquals;
public class InformalJobNameExecuteTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class InformalJobNameExecuteTest extends SqoopTestCase {
private String jobName;

View File

@ -19,14 +19,17 @@
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.client.ClientError;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@ -34,9 +37,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
*/
public class OutputDirectoryTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class OutputDirectoryTest extends SqoopTestCase {
@Test
public void testOutputDirectoryIsAFile() throws Exception {
createAndLoadTableCities();

View File

@ -25,7 +25,12 @@
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.SkipException;
import org.testng.annotations.Test;
@ -42,7 +47,8 @@
* -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI...
* -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx...
*/
public class S3Test extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class S3Test extends SqoopTestCase {
public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket";
public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access";

View File

@ -18,16 +18,20 @@
package org.apache.sqoop.integration.connector.hive;
import org.apache.sqoop.common.test.asserts.ProviderAsserts;
import org.apache.sqoop.common.test.db.DatabaseProvider;
import org.apache.sqoop.common.test.db.DatabaseProviderFactory;
import org.apache.sqoop.common.test.db.HiveProvider;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.HiveConnectorTestCase;
import org.testng.ITest;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HiveInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@ -38,14 +42,14 @@
import java.util.List;
@Test(groups = {"slow", "no-real-cluster"})
public class FromRDBMSToKiteHiveTest extends HiveConnectorTestCase implements ITest {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, HiveInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class FromRDBMSToKiteHiveTest extends SqoopTestCase {
private String testName;
private FileFormat fileFormat;
private MLink rdbmsLink;
private MLink kiteLink;
private String hiveTableName;
@Factory(dataProvider="rdbms-to-kite-hive-test")
public FromRDBMSToKiteHiveTest(FileFormat fileFormat) {
@ -54,7 +58,6 @@ public FromRDBMSToKiteHiveTest(FileFormat fileFormat) {
@DataProvider(name="rdbms-to-kite-hive-test", parallel=true)
public static Object[][] data() throws Exception {
DatabaseProvider provider = DatabaseProviderFactory.getProvider(System.getProperties());
return new Object[][]{
{FileFormat.AVRO},
{FileFormat.PARQUET}
@ -99,7 +102,7 @@ public void createLinks() {
// Kite link
kiteLink = getClient().createLink("kite-connector");
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority")
.setValue(metastoreServerRunner.getAuthority());
.setValue(getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveMetastore().getAuthority());
saveLink(kiteLink);
}
@ -126,6 +129,7 @@ public void testCities() throws Exception {
saveJob(job);
executeJob(job);
HiveProvider hiveProvider = getInfrastructureProvider(HiveInfrastructureProvider.class).getHiveProvider();
// Assert correct output
ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 1}, "1");
ProviderAsserts.assertRow(hiveProvider, new TableName(getHiveTableName()), new Object[]{"id", 2}, "2");

View File

@ -27,9 +27,13 @@
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.testng.ITest;
import org.testng.ITestContext;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
@ -43,7 +47,8 @@
* Test transfer of all supported data types.
*/
@Test(groups = "slow")
public class AllTypesTest extends ConnectorTestCase implements ITest {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class AllTypesTest extends SqoopTestCase {
private static String testName;

View File

@ -17,13 +17,15 @@
*/
package org.apache.sqoop.integration.connector.jdbc.generic;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -32,10 +34,8 @@
import static org.testng.Assert.assertEquals;
/**
*
*/
public class FromHDFSToRDBMSTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class FromHDFSToRDBMSTest extends SqoopTestCase {
@BeforeMethod(alwaysRun = true)
public void createTable() {
createTableCities();

View File

@ -22,7 +22,12 @@
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
import java.util.List;
@ -30,7 +35,8 @@
/**
* Import simple table with various configurations.
*/
public class FromRDBMSToHDFSTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class FromRDBMSToHDFSTest extends SqoopTestCase {
@Test
public void testCities() throws Exception {

View File

@ -23,22 +23,23 @@
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.testng.ITest;
import org.testng.ITestContext;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
import static org.testng.Assert.assertEquals;
/**
*/
public class IncrementalReadTest extends ConnectorTestCase implements ITest {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class IncrementalReadTest extends SqoopTestCase {
public static Object[] COLUMNS = new Object [][] {
// column - last value - new max value

View File

@ -18,15 +18,17 @@
package org.apache.sqoop.integration.connector.jdbc.generic;
import com.google.common.collect.Iterables;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.ParametrizedUtils;
import org.testng.ITest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
@ -35,7 +37,8 @@
*
*/
@Test(groups = "slow")
public class PartitionerTest extends ConnectorTestCase implements ITest {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class PartitionerTest extends SqoopTestCase {
/**
* Columns that we will use as partition column with maximal number of

View File

@ -19,21 +19,23 @@
import static org.testng.Assert.assertEquals;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.test.db.TableName;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.data.Cities;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
import java.sql.Timestamp;
/**
*
*/
public class TableStagedRDBMSTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class TableStagedRDBMSTest extends SqoopTestCase {
@Test
public void testStagedTransfer() throws Exception {

View File

@ -20,11 +20,17 @@
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
@Test(groups = "no-real-cluster")
public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class})
public class FromHDFSToKafkaTest extends SqoopTestCase {
public static final String[] input = {
"A BIRD came down the walk:",
@ -32,9 +38,10 @@ public class FromHDFSToKafkaTest extends KafkaConnectorTestCase {
"He bit an angle-worm in halves",
"And ate the fellow raw."
};
@Test
public void testBasic() throws Exception {
topic = getTestName();
public void testFromHDFSToKafka() throws Exception {
String topic = getTestName();
createFromFile("input-0001",input);
@ -53,7 +60,7 @@ public void testBasic() throws Exception {
// Job connector configs
fillHdfsFromConfig(job);
fillKafkaToConfig(job);
fillKafkaToConfig(job, topic);
// driver config
MDriverConfig driverConfig = job.getDriverConfig();
@ -63,7 +70,7 @@ public void testBasic() throws Exception {
executeJob(job);
// this will assert the content of the array matches the content of the topic
validateContent(input);
validateContent(input, topic);
}

View File

@ -17,16 +17,20 @@
*/
package org.apache.sqoop.integration.connector.kafka;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KafkaInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.testng.annotations.Test;
@Test(groups = "no-real-cluster")
public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, DatabaseInfrastructureProvider.class, KafkaInfrastructureProvider.class, SqoopInfrastructureProvider.class})
public class FromRDBMSToKafkaTest extends SqoopTestCase {
private static final String[] input = {
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
@ -36,8 +40,8 @@ public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
};
@Test
public void testBasic() throws Exception {
topic = getTestName();
public void testFromRDBMSToKafka() throws Exception {
String topic = getTestName();
createAndLoadTableCities();
@ -58,7 +62,7 @@ public void testBasic() throws Exception {
fillRdbmsFromConfig(job, "id");
// set Kafka "TO" job config
fillKafkaToConfig(job);
fillKafkaToConfig(job, topic);
// driver config
MDriverConfig driverConfig = job.getDriverConfig();
@ -68,7 +72,7 @@ public void testBasic() throws Exception {
executeJob(job);
// this will assert the content of the array matches the content of the topic
validateContent(input);
validateContent(input, topic);
}

View File

@ -22,7 +22,12 @@
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@ -31,7 +36,8 @@
import java.util.List;
@Test
public class FromRDBMSToKiteTest extends ConnectorTestCase {
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class FromRDBMSToKiteTest extends SqoopTestCase {
@BeforeMethod(alwaysRun = true)
public void createTable() {
createAndLoadTableCities();
@ -51,7 +57,7 @@ public void dropTable() {
*/
@Override
public String getMapreduceDirectory() {
return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", "");
return HdfsUtils.joinPathFragments(getInfrastructureProvider(HadoopInfrastructureProvider.class).getInstance().getTestDirectory(), getClass().getName(), "namespace", getTestName()).replaceAll("/$", "");
}
@Test
@ -64,7 +70,8 @@ public void testCities() throws Exception {
// Kite link
MLink kiteLink = getClient().createLink("kite-connector");
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority());
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(
getInfrastructureProvider(SqoopInfrastructureProvider.class).getInstance().getConfigurationPath());
saveLink(kiteLink);
// Job creation