5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 18:59:35 +08:00

SQOOP-2157: Sqoop2: Kite: Add simple integration test for TO direction

(Jarek Jarcec Cecho via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-03-18 23:40:54 -07:00
parent fbad49ebdc
commit 24feea1850
2 changed files with 102 additions and 0 deletions

View File

@ -48,6 +48,7 @@ public void initialize(InitializerContext context,
LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
String uri = ConfigUtil.buildDatasetUri( String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig); linkConfig.linkConfig, toJobConfig.toJobConfig);
LOG.info("Generated final dataset URI: " + uri);
if (Datasets.exists(uri)) { if (Datasets.exists(uri)) {
LOG.error("Overwrite an existing dataset is not expected in new create mode."); LOG.error("Overwrite an existing dataset is not expected in new create mode.");
throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.kite;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.model.MConfigList;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
*/
public class FromRDBMSToKiteTest extends ConnectorTestCase {
@BeforeMethod(alwaysRun = true)
public void createTable() {
createAndLoadTableCities();
}
@AfterMethod(alwaysRun = true)
public void dropTable() {
super.dropTable();
}
/**
* Kite requires that last directory is dataset name and one to last is namespace.
*
* Both names have special requirements ([A-Za-z_][A-Za-z0-9_]*), so we're inserting
* "namespace" constant in namespace filed, to preserve our (Sqoop integration tests)
* directory structures.
*/
@Override
public String getMapreduceDirectory() {
return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), "namespace", name).replaceAll("/$", "");
}
@Test
public void testCities() throws Exception {
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// Kite link
MLink kiteLink = getClient().createLink("kite-connector");
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority());
saveLink(kiteLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), kiteLink.getPersistenceId());
// Set rdbms "FROM" config
MConfigList fromConfig = job.getJobConfig(Direction.FROM);
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
// TODO: Kite have troubles with some data types, so we're limiting the columns to int only
fromConfig.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id"));
// Fill the Kite "TO" config
MConfigList toConfig = job.getJobConfig(Direction.TO);
toConfig.getStringInput("toJobConfig.uri").setValue("dataset:hdfs:" + getMapreduceDirectory());
toConfig.getEnumInput("toJobConfig.fileFormat").setValue(FileFormat.CSV);
// driver config
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);
saveJob(job);
executeJob(job);
// Assert correct output
assertTo(
"\"1\"",
"\"2\"",
"\"3\"",
"\"4\""
);
}
}