From bf09850c33721f8d7629a121b15c7f57da9e295f Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 24 Sep 2015 21:22:17 -0700 Subject: [PATCH] SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector (Jarcec via Hari) --- .../org/apache/sqoop/common/MapContext.java | 2 +- .../apache/sqoop/common/TestMapContext.java | 4 + .../sqoop/common/TestMutableMapContext.java | 4 + connector/connector-hdfs/pom.xml | 9 +- .../connector/hdfs/HdfsFromInitializer.java | 3 +- .../connector/hdfs/HdfsToInitializer.java | 3 +- pom.xml | 6 + test/pom.xml | 6 + .../minicluster/TomcatSqoopMiniCluster.java | 4 +- .../integration/connector/hdfs/S3Test.java | 132 ++++++++++++++++++ 10 files changed, 168 insertions(+), 5 deletions(-) create mode 100644 test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java index fc722c04..31675306 100644 --- a/common/src/main/java/org/apache/sqoop/common/MapContext.java +++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java @@ -36,7 +36,7 @@ public class MapContext implements ImmutableContext { private final Map options; public MapContext(Map options) { - this.options = options; + this.options = options == null ? new HashMap() : options; } protected Map getOptions() { diff --git a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java index 2a27c0c0..22f42a45 100644 --- a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java +++ b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java @@ -23,6 +23,7 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static junit.framework.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -40,6 +41,9 @@ public void testInitalization() { options.put("testkey", "testvalue"); MapContext mc = new MapContext(options); Assert.assertEquals("testvalue", mc.getString("testkey")); + + MapContext nullMc = new MapContext(null); + assertNull(nullMc.getString("random.key.property")); } /** diff --git a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java index db7fa347..3cbe7be5 100644 --- a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java +++ b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java @@ -50,6 +50,10 @@ public void testConstructors() { assertEquals(context.getLong("long", -1), 1L); assertEquals(context.getInt("integer", -1), 13); assertEquals(context.getBoolean("boolean", false), true); + + context = new MutableMapContext(null); + context.setString("key", "value"); + assertEquals(context.getString("key"), "value"); } @Test diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index 512b54c3..a28989c1 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -59,6 +59,13 @@ limitations under the License. hadoop-mapreduce-client-jobclient provided + + + org.apache.hadoop + hadoop-aws + provided + + @@ -77,4 +84,4 @@ limitations under the License. - \ No newline at end of file + diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index 6c943a89..e98e02b7 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; @@ -51,8 +52,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig, assert jobConfig.incremental != null; Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - context.getContext().setAll(linkConfig.linkConfig.configOverrides); boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES; diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 5bb09283..29cf3b98 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; +import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; @@ -47,8 +48,8 @@ public void initialize(InitializerContext context, LinkConfiguration linkConfig, assert jobConfig.toJobConfig.outputDirectory != null; Configuration configuration = HdfsUtils.createConfiguration(linkConfig); + HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration); HdfsUtils.configurationToContext(configuration, context.getContext()); - context.getContext().setAll(linkConfig.linkConfig.configOverrides); boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode); diff --git a/pom.xml b/pom.xml index 6e334a76..ef3f5f43 100644 --- a/pom.xml +++ b/pom.xml @@ -425,6 +425,12 @@ limitations under the License. hadoop-auth ${hadoop.2.version} + + org.apache.hadoop + hadoop-aws + ${hadoop.2.version} + provided + org.apache.hive hive-jdbc diff --git a/test/pom.xml b/test/pom.xml index 3e11f597..8218477f 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -112,6 +112,12 @@ limitations under the License. hadoop-auth + + org.apache.hadoop + hadoop-aws + provided + + org.apache.hive hive-jdbc diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java index 83f42b65..a0ef78a9 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java @@ -131,7 +131,9 @@ public void start() throws Exception { jar.contains("sqljdbc") || // Microsoft SQL Server driver jar.contains("libfb303") || // Facebook thrift lib jar.contains("datanucleus-") || // Data nucleus libs - jar.contains("google") // Google libraries (guava, ...) + jar.contains("google") || // Google libraries (guava, ...) + jar.contains("joda-time") || // Joda time + jar.contains("aws-java-sdk") // Amazon AWS SDK (S3, ...) ) { extraClassPath.add(jar); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java new file mode 100644 index 00000000..73a9acf8 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.asserts.HdfsAsserts; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; + +/** + * Run with something like: + * + * mvn clean test -pl test -Dtest=S3Test + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.bucket=test-bucket + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI... + * -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx... + */ +public class S3Test extends ConnectorTestCase { + + public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket"; + public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access"; + public static final String PROPERTY_SECRET = "org.apache.sqoop.integration.connector.hdfs.s3.secret"; + + public static final String BUCKET = System.getProperty(PROPERTY_BUCKET); + public static final String ACCESS = System.getProperty(PROPERTY_ACCESS); + public static final String SECRET = System.getProperty(PROPERTY_SECRET); + + public static final String BUCKET_URL = "s3a://" + BUCKET; + + @Test + public void test() throws Exception { + // Verify presence external configuration + assumeNotNull(BUCKET, PROPERTY_BUCKET); + assumeNotNull(ACCESS, PROPERTY_ACCESS); + assumeNotNull(SECRET, PROPERTY_SECRET); + + createAndLoadTableCities(); + + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.defaultFS", BUCKET_URL); + hadoopConf.set("fs.s3a.access.key", ACCESS); + hadoopConf.set("fs.s3a.secret.key", SECRET); + FileSystem s3Client = FileSystem.get(hadoopConf); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL); + Map configOverrides = new HashMap<>(); + configOverrides.put("fs.s3a.access.key", ACCESS); + configOverrides.put("fs.s3a.secret.key", SECRET); + hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides); + saveLink(hdfsLink); + + s3Client.delete(new Path(getMapreduceDirectory()), true); + + // DB -> S3 + MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + fillRdbmsFromConfig(db2aws, "id"); + fillHdfsToConfig(db2aws, ToFormat.TEXT_FILE); + + saveJob(db2aws); + executeJob(db2aws); + + // Verifying locally imported data + HdfsAsserts.assertMapreduceOutput(s3Client, getMapreduceDirectory(), + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + // This re-creates the table completely + createTableCities(); + assertEquals(provider.rowCount(getTableName()), 0); + + // S3 -> DB + MJob aws2db = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId()); + fillHdfsFromConfig(aws2db); + fillRdbmsToConfig(aws2db); + + saveJob(aws2db); + executeJob(aws2db); + + // Final verification + assertEquals(4L, provider.rowCount(getTableName())); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); + } + + /** + * Skip this test if given value is null + */ + void assumeNotNull(String value, String key) { + if(value == null) { + throw new SkipException("Missing value for " + key); + } + } + +}