diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index 8d4a097a..63b07041 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -348,6 +348,8 @@ private String writeCopyCommand(String command) throws IOException { public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) throws IOException, ImportException { + context.setConnManager(this); + String tableName = context.getTableName(); SqoopOptions options = context.getOptions(); diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java index 3b5731b6..57dd338e 100644 --- a/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java @@ -235,15 +235,18 @@ public void setUpData(String tableName, String schema, boolean nullEntry) { return args.toArray(new String[0]); } - private void doImportAndVerify(boolean isDirect, String [] expectedResults, + private void doImportAndVerify(boolean isDirect, String[] expectedResults, String tableName, String... extraArgs) throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); Path tablePath = new Path(warehousePath, tableName); - Path filePath = new Path(tablePath, "part-m-00000"); + + // if importing with merge step, directory should exist and output should be from a reducer + boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key"); + Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000"); File tableFile = new File(tablePath.toString()); - if (tableFile.exists() && tableFile.isDirectory()) { + if (tableFile.exists() && tableFile.isDirectory() && !isMerge) { // remove the directory before running the import. FileListing.recursiveDeleteDir(tableFile); } @@ -329,6 +332,34 @@ public void testIncrementalImport() throws IOException { doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); } + public void testDirectIncrementalImport() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + } + + public void testDirectIncrementalImportMerge() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + + extraArgs = new String[] { "--incremental", "lastmodified", + "--check-column", "start_date", + "--merge-key", "id", + "--last-value", "2009-04-20" + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + } + @Test public void testDifferentSchemaImport() throws IOException { String [] expectedResults = {