diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java index 32fe077a..2831f4bc 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.config.ConfigurationHelper; import org.apache.sqoop.lib.DelimiterSet; import org.apache.sqoop.manager.ConnManager; import org.apache.sqoop.mapreduce.ExportJobBase; diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java index 16af84c6..e85e9df6 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java @@ -35,8 +35,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.sqoop.lib.SqoopRecord; import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.sqoop.lib.SqoopRecord; import org.apache.sqoop.mapreduce.AutoProgressMapper; import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.apache.sqoop.util.LoggingUtils; @@ -203,7 +203,7 @@ public void map(LongWritable key, Writable value, Context context) protected void cleanup(Context context) throws IOException, InterruptedException { LongWritable taskid = - new LongWritable(context.getTaskAttemptID().getTaskID().getId()); + new LongWritable(context.getTaskAttemptID().getTaskID().getId()); context.write(taskid, new Text(tmpTableName)); if (writer != null) { diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java index 3dc05a7a..88e86bec 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java @@ -79,7 +79,6 @@ public void reduce(LongWritable key, Iterable values, Context context) + " ( SELECT * FROM " + value + " )"); stmt.executeUpdate("DROP TABLE " + value); } - conn.commit(); } catch (SQLException ex) { LoggingUtils.logAll(LOG, "Unable to execute create query.", ex); throw new IOException(ex); @@ -98,6 +97,7 @@ public void reduce(LongWritable key, Iterable values, Context context) protected void cleanup(Context context) throws IOException, InterruptedException { try { + conn.commit(); conn.close(); } catch (SQLException ex) { LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex); diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java index fc5fd6d7..a93da714 100644 --- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java @@ -187,53 +187,18 @@ public void testColumnsExport() throws IOException, SQLException { public void testMultiReduceExport() throws IOException, SQLException { - String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2"); - multiFileTestWithGenericArgs(2, 10, 2, genericargs); + multiFileTest(2, 10, 2, "-D", "mapred.reduce.tasks=2"); } - public void testMultiReduceExportWithNewProp() throws IOException, SQLException { - String[] genericargs = newStrArray(null, "-Dmapreduce.job.reduces=2"); - multiFileTestWithGenericArgs(2, 10, 2, genericargs); + public void testMultiReduceExportWithNewProp() + throws IOException, SQLException { + multiFileTest(2, 10, 2, "-D", "mapreduce.job.reduces=2"); } public void testExportWithTablespace() throws IOException, SQLException { - String[] genericargs = - newStrArray(null, "-Dpgbulkload.staging.tablespace=" + TABLESPACE); - multiFileTestWithGenericArgs(1, 10, 1, genericargs); - } - - - protected void multiFileTestWithGenericArgs(int numFiles, - int recordsPerMap, - int numMaps, - String[] genericargs, - String... argv) - throws IOException, SQLException { - - final int TOTAL_RECORDS = numFiles * recordsPerMap; - - try { - LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap=" - + recordsPerMap + "; numMaps=" + numMaps); - LOG.info(" with genericargs: "); - for (String arg : genericargs) { - LOG.info(" " + arg); - } - - for (int i = 0; i < numFiles; i++) { - createTextFile(i, recordsPerMap, false); - } - - createTable(); - - runExport(getArgv(true, 10, 10, - newStrArray(newStrArray(genericargs, argv), - "-m", "" + numMaps))); - verifyExport(TOTAL_RECORDS); - } finally { - LOG.info("multi-reduce test complete"); - } + multiFileTest(1, 10, 1, + "-D", "pgbulkload.staging.tablespace=" + TABLESPACE); } }