diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java index 2349f1cd..bd4ba6a2 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java @@ -22,6 +22,7 @@ import java.lang.reflect.Method; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; @@ -237,6 +238,13 @@ protected void assertToFiles(int expectedFiles) throws IOException { * @throws IOException */ protected void createFromFile(String filename, String...lines) throws IOException { + createFromFile(hdfsClient, filename, lines); + } + + /** + * Create file on given HDFS instance with given lines + */ + protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException { HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines); } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java index 73a9acf8..7fec3106 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; @@ -53,20 +54,26 @@ public class S3Test extends ConnectorTestCase { public static final String BUCKET_URL = "s3a://" + BUCKET; - @Test - public void test() throws Exception { - // Verify presence external configuration + // S3 client (HDFS interface) to be used in the tests + private FileSystem s3Client; + + public void setUpS3Client() throws Exception { assumeNotNull(BUCKET, PROPERTY_BUCKET); assumeNotNull(ACCESS, PROPERTY_ACCESS); assumeNotNull(SECRET, PROPERTY_SECRET); - createAndLoadTableCities(); - Configuration hadoopConf = new Configuration(); hadoopConf.set("fs.defaultFS", BUCKET_URL); hadoopConf.set("fs.s3a.access.key", ACCESS); hadoopConf.set("fs.s3a.secret.key", SECRET); - FileSystem s3Client = FileSystem.get(hadoopConf); + s3Client = FileSystem.get(hadoopConf); + } + + @Test + public void testImportExport() throws Exception { + setUpS3Client(); + s3Client.delete(new Path(getMapreduceDirectory()), true); + createAndLoadTableCities(); // RDBMS link MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); @@ -82,8 +89,6 @@ public void test() throws Exception { hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides); saveLink(hdfsLink); - s3Client.delete(new Path(getMapreduceDirectory()), true); - // DB -> S3 MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); fillRdbmsFromConfig(db2aws, "id"); @@ -120,9 +125,61 @@ public void test() throws Exception { assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); } - /** - * Skip this test if given value is null - */ + @Test + public void testIncrementalRead() throws Exception { + setUpS3Client(); + s3Client.delete(new Path(getMapreduceDirectory()), true); + + // S3 link + MLink s3Link = getClient().createLink("hdfs-connector"); + s3Link.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL); + Map configOverrides = new HashMap<>(); + configOverrides.put("fs.s3a.access.key", ACCESS); + configOverrides.put("fs.s3a.secret.key", SECRET); + s3Link.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides); + saveLink(s3Link); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); + saveLink(hdfsLink); + + // S3 -> HDFS + MJob aws2hdfs = getClient().createJob(s3Link.getPersistenceId(), hdfsLink.getPersistenceId()); + fillHdfsFromConfig(aws2hdfs); + aws2hdfs.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES); + + fillHdfsToConfig(aws2hdfs, ToFormat.TEXT_FILE); + aws2hdfs.getToJobConfig().getBooleanInput("toJobConfig.appendMode").setValue(true); + saveJob(aws2hdfs); + + // First import (first file) + createFromFile(s3Client, "input-0001", + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'" + ); + executeJob(aws2hdfs); + + HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'" + ); + + // Second import (second file) + createFromFile(s3Client, "input-0002", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + executeJob(aws2hdfs); + + HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + } + void assumeNotNull(String value, String key) { if(value == null) { throw new SkipException("Missing value for " + key);