5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 01:00:46 +08:00

SQOOP-1826: NPE in ImportTool.lastModifiedMerge during postgres import

(Ricky Nguyen via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-01-06 10:14:38 +01:00
parent 8b6eb33f17
commit c32010f440
2 changed files with 36 additions and 3 deletions

View File

@ -348,6 +348,8 @@ private String writeCopyCommand(String command) throws IOException {
public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
throws IOException, ImportException { throws IOException, ImportException {
context.setConnManager(this);
String tableName = context.getTableName(); String tableName = context.getTableName();
SqoopOptions options = context.getOptions(); SqoopOptions options = context.getOptions();

View File

@ -235,15 +235,18 @@ public void setUpData(String tableName, String schema, boolean nullEntry) {
return args.toArray(new String[0]); return args.toArray(new String[0]);
} }
private void doImportAndVerify(boolean isDirect, String [] expectedResults, private void doImportAndVerify(boolean isDirect, String[] expectedResults,
String tableName, String... extraArgs) throws IOException { String tableName, String... extraArgs) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir()); Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, tableName); Path tablePath = new Path(warehousePath, tableName);
Path filePath = new Path(tablePath, "part-m-00000");
// if importing with merge step, directory should exist and output should be from a reducer
boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key");
Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000");
File tableFile = new File(tablePath.toString()); File tableFile = new File(tablePath.toString());
if (tableFile.exists() && tableFile.isDirectory()) { if (tableFile.exists() && tableFile.isDirectory() && !isMerge) {
// remove the directory before running the import. // remove the directory before running the import.
FileListing.recursiveDeleteDir(tableFile); FileListing.recursiveDeleteDir(tableFile);
} }
@ -329,6 +332,34 @@ public void testIncrementalImport() throws IOException {
doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
} }
public void testDirectIncrementalImport() throws IOException {
String [] expectedResults = { };
String [] extraArgs = { "--incremental", "lastmodified",
"--check-column", "start_date",
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
}
public void testDirectIncrementalImportMerge() throws IOException {
String [] expectedResults = { };
String [] extraArgs = { "--incremental", "lastmodified",
"--check-column", "start_date",
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
extraArgs = new String[] { "--incremental", "lastmodified",
"--check-column", "start_date",
"--merge-key", "id",
"--last-value", "2009-04-20"
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
}
@Test @Test
public void testDifferentSchemaImport() throws IOException { public void testDifferentSchemaImport() throws IOException {
String [] expectedResults = { String [] expectedResults = {