diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java index a9cff77b..187ad214 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java @@ -48,6 +48,7 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, toJobConfig.toJobConfig); + LOG.info("Generated final dataset URI: " + uri); if (Datasets.exists(uri)) { LOG.error("Overwrite an existing dataset is not expected in new create mode."); throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java new file mode 100644 index 00000000..1d4e7e6a --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.kite; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + */ +public class FromRDBMSToKiteTest extends ConnectorTestCase { + @BeforeMethod(alwaysRun = true) + public void createTable() { + createAndLoadTableCities(); + } + + @AfterMethod(alwaysRun = true) + public void dropTable() { + super.dropTable(); + } + + /** + * Kite requires that last directory is dataset name and one to last is namespace. + * + * Both names have special requirements ([A-Za-z_][A-Za-z0-9_]*), so we're inserting + * "namespace" constant in namespace filed, to preserve our (Sqoop integration tests) + * directory structures. + */ + @Override + public String getMapreduceDirectory() { + return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), "namespace", name).replaceAll("/$", ""); + } + + @Test + public void testCities() throws Exception { + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // Kite link + MLink kiteLink = getClient().createLink("kite-connector"); + kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority()); + saveLink(kiteLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), kiteLink.getPersistenceId()); + + // Set rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + // TODO: Kite have troubles with some data types, so we're limiting the columns to int only + fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id")); + + // Fill the Kite "TO" config + MConfigList toConfig = job.getJobConfig(Direction.TO); + toConfig.getStringInput("toJobConfig.uri").setValue("dataset:hdfs:" + getMapreduceDirectory()); + toConfig.getEnumInput("toJobConfig.fileFormat").setValue(FileFormat.CSV); + + // driver config + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1); + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "\"1\"", + "\"2\"", + "\"3\"", + "\"4\"" + ); + } + +}