5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 22:02:31 +08:00

SQOOP-1622: Copying from staging table should be in single transaction for pg_bulkload connector

(Masatake Iwasaki via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-11-20 06:07:25 -08:00
parent 2b4e4d9bf0
commit cee138c21f
4 changed files with 9 additions and 45 deletions

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.mapreduce.ExportJobBase;

View File

@ -35,8 +35,8 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.util.LoggingUtils;
@ -203,7 +203,7 @@ public void map(LongWritable key, Writable value, Context context)
protected void cleanup(Context context)
throws IOException, InterruptedException {
LongWritable taskid =
new LongWritable(context.getTaskAttemptID().getTaskID().getId());
new LongWritable(context.getTaskAttemptID().getTaskID().getId());
context.write(taskid, new Text(tmpTableName));
if (writer != null) {

View File

@ -79,7 +79,6 @@ public void reduce(LongWritable key, Iterable<Text> values, Context context)
+ " ( SELECT * FROM " + value + " )");
stmt.executeUpdate("DROP TABLE " + value);
}
conn.commit();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute create query.", ex);
throw new IOException(ex);
@ -98,6 +97,7 @@ public void reduce(LongWritable key, Iterable<Text> values, Context context)
protected void cleanup(Context context)
throws IOException, InterruptedException {
try {
conn.commit();
conn.close();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex);

View File

@ -187,53 +187,18 @@ public void testColumnsExport() throws IOException, SQLException {
public void testMultiReduceExport() throws IOException, SQLException {
String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2");
multiFileTestWithGenericArgs(2, 10, 2, genericargs);
multiFileTest(2, 10, 2, "-D", "mapred.reduce.tasks=2");
}
public void testMultiReduceExportWithNewProp() throws IOException, SQLException {
String[] genericargs = newStrArray(null, "-Dmapreduce.job.reduces=2");
multiFileTestWithGenericArgs(2, 10, 2, genericargs);
public void testMultiReduceExportWithNewProp()
throws IOException, SQLException {
multiFileTest(2, 10, 2, "-D", "mapreduce.job.reduces=2");
}
public void testExportWithTablespace() throws IOException, SQLException {
String[] genericargs =
newStrArray(null, "-Dpgbulkload.staging.tablespace=" + TABLESPACE);
multiFileTestWithGenericArgs(1, 10, 1, genericargs);
}
protected void multiFileTestWithGenericArgs(int numFiles,
int recordsPerMap,
int numMaps,
String[] genericargs,
String... argv)
throws IOException, SQLException {
final int TOTAL_RECORDS = numFiles * recordsPerMap;
try {
LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap="
+ recordsPerMap + "; numMaps=" + numMaps);
LOG.info(" with genericargs: ");
for (String arg : genericargs) {
LOG.info(" " + arg);
}
for (int i = 0; i < numFiles; i++) {
createTextFile(i, recordsPerMap, false);
}
createTable();
runExport(getArgv(true, 10, 10,
newStrArray(newStrArray(genericargs, argv),
"-m", "" + numMaps)));
verifyExport(TOTAL_RECORDS);
} finally {
LOG.info("multi-reduce test complete");
}
multiFileTest(1, 10, 1,
"-D", "pgbulkload.staging.tablespace=" + TABLESPACE);
}
}