5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 18:32:09 +08:00

SQOOP-2652. Sqoop2: Add test case for S3 incremental import to HDFS

(Jarcec via Hari)
This commit is contained in:
Hari Shreedharan 2015-11-12 11:45:52 -08:00
parent 8ea1c8580e
commit ede322199d
2 changed files with 76 additions and 11 deletions

View File

@ -22,6 +22,7 @@
import java.lang.reflect.Method;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
@ -237,6 +238,13 @@ protected void assertToFiles(int expectedFiles) throws IOException {
* @throws IOException
*/
protected void createFromFile(String filename, String...lines) throws IOException {
createFromFile(hdfsClient, filename, lines);
}
/**
* Create file on given HDFS instance with given lines
*/
protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
}
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
@ -53,20 +54,26 @@ public class S3Test extends ConnectorTestCase {
public static final String BUCKET_URL = "s3a://" + BUCKET;
@Test
public void test() throws Exception {
// Verify presence external configuration
// S3 client (HDFS interface) to be used in the tests
private FileSystem s3Client;
public void setUpS3Client() throws Exception {
assumeNotNull(BUCKET, PROPERTY_BUCKET);
assumeNotNull(ACCESS, PROPERTY_ACCESS);
assumeNotNull(SECRET, PROPERTY_SECRET);
createAndLoadTableCities();
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", BUCKET_URL);
hadoopConf.set("fs.s3a.access.key", ACCESS);
hadoopConf.set("fs.s3a.secret.key", SECRET);
FileSystem s3Client = FileSystem.get(hadoopConf);
s3Client = FileSystem.get(hadoopConf);
}
@Test
public void testImportExport() throws Exception {
setUpS3Client();
s3Client.delete(new Path(getMapreduceDirectory()), true);
createAndLoadTableCities();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
@ -82,8 +89,6 @@ public void test() throws Exception {
hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
saveLink(hdfsLink);
s3Client.delete(new Path(getMapreduceDirectory()), true);
// DB -> S3
MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
fillRdbmsFromConfig(db2aws, "id");
@ -120,9 +125,61 @@ public void test() throws Exception {
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
}
/**
* Skip this test if given value is null
*/
@Test
public void testIncrementalRead() throws Exception {
setUpS3Client();
s3Client.delete(new Path(getMapreduceDirectory()), true);
// S3 link
MLink s3Link = getClient().createLink("hdfs-connector");
s3Link.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
Map<String, String> configOverrides = new HashMap<>();
configOverrides.put("fs.s3a.access.key", ACCESS);
configOverrides.put("fs.s3a.secret.key", SECRET);
s3Link.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
saveLink(s3Link);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// S3 -> HDFS
MJob aws2hdfs = getClient().createJob(s3Link.getPersistenceId(), hdfsLink.getPersistenceId());
fillHdfsFromConfig(aws2hdfs);
aws2hdfs.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES);
fillHdfsToConfig(aws2hdfs, ToFormat.TEXT_FILE);
aws2hdfs.getToJobConfig().getBooleanInput("toJobConfig.appendMode").setValue(true);
saveJob(aws2hdfs);
// First import (first file)
createFromFile(s3Client, "input-0001",
"1,'USA','2004-10-23','San Francisco'",
"2,'USA','2004-10-24','Sunnyvale'"
);
executeJob(aws2hdfs);
HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
"1,'USA','2004-10-23','San Francisco'",
"2,'USA','2004-10-24','Sunnyvale'"
);
// Second import (second file)
createFromFile(s3Client, "input-0002",
"3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','2004-10-26','Palo Alto'"
);
executeJob(aws2hdfs);
HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
"1,'USA','2004-10-23','San Francisco'",
"2,'USA','2004-10-24','Sunnyvale'",
"3,'Czech Republic','2004-10-25','Brno'",
"4,'USA','2004-10-26','Palo Alto'"
);
}
void assumeNotNull(String value, String key) {
if(value == null) {
throw new SkipException("Missing value for " + key);