5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:50:47 +08:00

SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector

(Jarcec via Hari)
This commit is contained in:
Hari Shreedharan 2015-09-24 21:22:17 -07:00
parent a70975c669
commit bf09850c33
10 changed files with 168 additions and 5 deletions

View File

@ -36,7 +36,7 @@ public class MapContext implements ImmutableContext {
private final Map<String, String> options;
public MapContext(Map<String, String> options) {
this.options = options;
this.options = options == null ? new HashMap<String, String>() : options;
}
protected Map<String, String> getOptions() {

View File

@ -23,6 +23,7 @@
import org.testng.Assert;
import org.testng.annotations.Test;
import static junit.framework.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@ -40,6 +41,9 @@ public void testInitalization() {
options.put("testkey", "testvalue");
MapContext mc = new MapContext(options);
Assert.assertEquals("testvalue", mc.getString("testkey"));
MapContext nullMc = new MapContext(null);
assertNull(nullMc.getString("random.key.property"));
}
/**

View File

@ -50,6 +50,10 @@ public void testConstructors() {
assertEquals(context.getLong("long", -1), 1L);
assertEquals(context.getInt("integer", -1), 13);
assertEquals(context.getBoolean("boolean", false), true);
context = new MutableMapContext(null);
context.setString("key", "value");
assertEquals(context.getString("key"), "value");
}
@Test

View File

@ -59,6 +59,13 @@ limitations under the License.
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
@ -51,8 +52,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
assert jobConfig.incremental != null;
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
HdfsUtils.configurationToContext(configuration, context.getContext());
context.getContext().setAll(linkConfig.linkConfig.configOverrides);
boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@ -47,8 +48,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
assert jobConfig.toJobConfig.outputDirectory != null;
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
HdfsUtils.configurationToContext(configuration, context.getContext());
context.getContext().setAll(linkConfig.linkConfig.configOverrides);
boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);

View File

@ -425,6 +425,12 @@ limitations under the License.
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.2.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>

View File

@ -112,6 +112,12 @@ limitations under the License.
<artifactId>hadoop-auth</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>

View File

@ -131,7 +131,9 @@ public void start() throws Exception {
jar.contains("sqljdbc") || // Microsoft SQL Server driver
jar.contains("libfb303") || // Facebook thrift lib
jar.contains("datanucleus-") || // Data nucleus libs
jar.contains("google") // Google libraries (guava, ...)
jar.contains("google") || // Google libraries (guava, ...)
jar.contains("joda-time") || // Joda time
jar.contains("aws-java-sdk") // Amazon AWS SDK (S3, ...)
) {
extraClassPath.add(jar);
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.testng.SkipException;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import static org.testng.Assert.assertEquals;
/**
* Run with something like:
*
* mvn clean test -pl test -Dtest=S3Test
* -Dorg.apache.sqoop.integration.connector.hdfs.s3.bucket=test-bucket
* -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI...
* -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx...
*/
public class S3Test extends ConnectorTestCase {
public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket";
public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access";
public static final String PROPERTY_SECRET = "org.apache.sqoop.integration.connector.hdfs.s3.secret";
public static final String BUCKET = System.getProperty(PROPERTY_BUCKET);
public static final String ACCESS = System.getProperty(PROPERTY_ACCESS);
public static final String SECRET = System.getProperty(PROPERTY_SECRET);
public static final String BUCKET_URL = "s3a://" + BUCKET;
@Test
public void test() throws Exception {
// Verify presence external configuration
assumeNotNull(BUCKET, PROPERTY_BUCKET);
assumeNotNull(ACCESS, PROPERTY_ACCESS);
assumeNotNull(SECRET, PROPERTY_SECRET);
createAndLoadTableCities();
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", BUCKET_URL);
hadoopConf.set("fs.s3a.access.key", ACCESS);
hadoopConf.set("fs.s3a.secret.key", SECRET);
FileSystem s3Client = FileSystem.get(hadoopConf);
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
Map<String, String> configOverrides = new HashMap<>();
configOverrides.put("fs.s3a.access.key", ACCESS);
configOverrides.put("fs.s3a.secret.key", SECRET);
hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
saveLink(hdfsLink);
s3Client.delete(new Path(getMapreduceDirectory()), true);
// DB -> S3
MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
fillRdbmsFromConfig(db2aws, "id");
fillHdfsToConfig(db2aws, ToFormat.TEXT_FILE);
saveJob(db2aws);
executeJob(db2aws);
// Verifying locally imported data
HdfsAsserts.assertMapreduceOutput(s3Client, getMapreduceDirectory(),
"1,'USA','2004-10-23','San Francisco'",
"2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','2004-10-26','Palo Alto'"
);
// This re-creates the table completely
createTableCities();
assertEquals(provider.rowCount(getTableName()), 0);
// S3 -> DB
MJob aws2db = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId());
fillHdfsFromConfig(aws2db);
fillRdbmsToConfig(aws2db);
saveJob(aws2db);
executeJob(aws2db);
// Final verification
assertEquals(4L, provider.rowCount(getTableName()));
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
}
/**
* Skip this test if given value is null
*/
void assumeNotNull(String value, String key) {
if(value == null) {
throw new SkipException("Missing value for " + key);
}
}
}