diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index c85602c3..f9112807 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -21,14 +21,23 @@ import java.io.IOException; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Set; import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.mapreduce.ExportJobBase.FileType; +import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; +import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; + import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.ExportJobContext; import com.cloudera.sqoop.mapreduce.ExportJobBase; @@ -40,6 +49,8 @@ */ public class JdbcUpdateExportJob extends ExportJobBase { + // Fix For Issue [SQOOP-2846] + private FileType fileType; public static final Log LOG = LogFactory.getLog( JdbcUpdateExportJob.class.getName()); @@ -64,11 +75,21 @@ public JdbcUpdateExportJob(final ExportJobContext ctxt, super(ctxt, mapperClass, inputFormatClass, outputFormatClass); } + // Fix For Issue [SQOOP-2846] @Override protected Class getMapperClass() { - if (inputIsSequenceFiles()) { + if (isHCatJob) { + return SqoopHCatUtilities.getExportMapperClass(); + } + switch (fileType) { + case SEQUENCE_FILE: return SequenceFileExportMapper.class; - } else { + case AVRO_DATA_FILE: + return AvroExportMapper.class; + case PARQUET_FILE: + return ParquetExportMapper.class; + case UNKNOWN: + default: return TextExportMapper.class; } } @@ -143,5 +164,69 @@ protected void configureOutputFormat(Job job, String tableName, throw new IOException("Could not load OutputFormat", cnfe); } } -} + // Fix For Issue [SQOOP-2846] + @Override + protected void configureInputFormat(Job job, String tableName, String tableClassName, + String splitByCol) + throws ClassNotFoundException, IOException + { + + fileType = getInputFileType(); + + super.configureInputFormat(job, tableName, tableClassName, splitByCol); + + if (isHCatJob) { + SqoopHCatUtilities.configureExportInputFormat(options, job, context.getConnManager(), + tableName, + job.getConfiguration()); + return; + } else if (fileType == FileType.AVRO_DATA_FILE) { + LOG.debug("Configuring for Avro export"); + configureGenericRecordExportInputFormat(job, tableName); + } else if (fileType == FileType.PARQUET_FILE) { + LOG.debug("Configuring for Parquet export"); + configureGenericRecordExportInputFormat(job, tableName); + FileSystem fs = FileSystem.get(job.getConfiguration()); + String uri = "dataset:" + fs.makeQualified(getInputPath()); + DatasetKeyInputFormat.configure(job).readFrom(uri); + } + } + + // Fix For Issue [SQOOP-2846] + private void configureGenericRecordExportInputFormat(Job job, String tableName) + throws IOException + { + ConnManager connManager = context.getConnManager(); + Map columnTypeInts; + if (options.getCall() == null) { + columnTypeInts = connManager.getColumnTypes(tableName, options.getSqlQuery()); + } else { + columnTypeInts = connManager.getColumnTypesForProcedure(options.getCall()); + } + MapWritable columnTypes = new MapWritable(); + for (Map.Entry e : columnTypeInts.entrySet()) { + Text columnName = new Text(e.getKey()); + Text columnText = new Text(connManager.toJavaType(tableName, e.getKey(), e.getValue())); + columnTypes.put(columnName, columnText); + } + DefaultStringifier.store(job.getConfiguration(), columnTypes, + AvroExportMapper.AVRO_COLUMN_TYPES_MAP); + } + + // Fix For Issue [SQOOP-2846] + @Override + protected Class getInputFormatClass() throws ClassNotFoundException { + if (isHCatJob) { + return SqoopHCatUtilities.getInputFormatClass(); + } + switch (fileType) { + case AVRO_DATA_FILE: + return AvroInputFormat.class; + case PARQUET_FILE: + return DatasetKeyInputFormat.class; + default: + return super.getInputFormatClass(); + } + } +} diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java index f91cd484..02db6c02 100644 --- a/src/test/com/cloudera/sqoop/TestAvroExport.java +++ b/src/test/com/cloudera/sqoop/TestAvroExport.java @@ -33,6 +33,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; @@ -164,10 +165,12 @@ private Schema buildAvroSchema(ColumnGenerator... extraCols) { fields.add(buildAvroField("id", Schema.Type.INT)); fields.add(buildAvroField("msg", Schema.Type.STRING)); int colNum = 0; - for (ColumnGenerator gen : extraCols) { - if (gen.getColumnAvroSchema() != null) { - fields.add(buildAvroField(forIdx(colNum++), - gen.getColumnAvroSchema())); + // Issue [SQOOP-2846] + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnAvroSchema() != null) { + fields.add(buildAvroField(forIdx(colNum++), gen.getColumnAvroSchema())); + } } } Schema schema = Schema.createRecord("myschema", null, null, false); @@ -178,9 +181,12 @@ private Schema buildAvroSchema(ColumnGenerator... extraCols) { private void addExtraColumns(GenericRecord record, int rowNum, ColumnGenerator[] extraCols) { int colNum = 0; - for (ColumnGenerator gen : extraCols) { - if (gen.getColumnAvroSchema() != null) { - record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + // Issue [SQOOP-2846] + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnAvroSchema() != null) { + record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + } } } } @@ -253,6 +259,39 @@ private void createTable(ColumnGenerator... extraColumns) } } + /** + * Create the table definition to export and also inserting one records for + * identifying the updates. Issue [SQOOP-2846] + */ + private void createTableWithInsert() throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(getTableName()); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + sb.append(")"); + statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + Statement statement2 = conn.createStatement(); + String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');"; + statement2.execute(insertCmd); + conn.commit(); + } finally { + statement.close(); + } + } + + /** Verify that on a given row, a column has a given value. * @param id the id column specifying the row to test. */ @@ -418,6 +457,33 @@ public void testMissingDatabaseFields() throws IOException, SQLException { verifyExport(TOTAL_RECORDS); } + // Test Case for Issue [SQOOP-2846] + public void testAvroWithUpsert() throws IOException, SQLException { + String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" }; + final int TOTAL_RECORDS = 2; + // ColumnGenerator gen = colGenerator("100", + // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + try { + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + } catch (Exception e) { + // expected + assertTrue(true); + } + } + + // Test Case for Issue [SQOOP-2846] + public void testAvroWithUpdateKey() throws IOException, SQLException { + String[] argv = { "--update-key", "ID" }; + final int TOTAL_RECORDS = 1; + // ColumnGenerator gen = colGenerator("100", + // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(getMsgPrefix() + "0"); + } public void testMissingAvroFields() throws IOException, SQLException { String[] argv = {}; final int TOTAL_RECORDS = 1; diff --git a/src/test/com/cloudera/sqoop/TestParquetExport.java b/src/test/com/cloudera/sqoop/TestParquetExport.java index b938bf87..0c942482 100644 --- a/src/test/com/cloudera/sqoop/TestParquetExport.java +++ b/src/test/com/cloudera/sqoop/TestParquetExport.java @@ -32,6 +32,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; @@ -143,10 +144,11 @@ private Schema buildSchema(ColumnGenerator... extraCols) { fields.add(buildField("id", Schema.Type.INT)); fields.add(buildField("msg", Schema.Type.STRING)); int colNum = 0; - for (ColumnGenerator gen : extraCols) { - if (gen.getColumnParquetSchema() != null) { - fields.add(buildParquetField(forIdx(colNum++), - gen.getColumnParquetSchema())); + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnParquetSchema() != null) { + fields.add(buildParquetField(forIdx(colNum++), gen.getColumnParquetSchema())); + } } } Schema schema = Schema.createRecord("myschema", null, null, false); @@ -157,9 +159,11 @@ private Schema buildSchema(ColumnGenerator... extraCols) { private void addExtraColumns(GenericRecord record, int rowNum, ColumnGenerator[] extraCols) { int colNum = 0; - for (ColumnGenerator gen : extraCols) { - if (gen.getColumnParquetSchema() != null) { - record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnParquetSchema() != null) { + record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + } } } } @@ -232,6 +236,38 @@ private void createTable(ColumnGenerator... extraColumns) } } + /** + * Create the table definition to export and also inserting one records for + * identifying the updates. Issue [SQOOP-2846] + */ + private void createTableWithInsert() throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(getTableName()); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + sb.append(")"); + statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + Statement statement2 = conn.createStatement(); + String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');"; + statement2.execute(insertCmd); + conn.commit(); + } finally { + statement.close(); + } + } + /** Verify that on a given row, a column has a given value. * @param id the id column specifying the row to test. */ @@ -369,6 +405,30 @@ public void testMissingDatabaseFields() throws IOException, SQLException { verifyExport(TOTAL_RECORDS); } + public void testParquetWithUpdateKey() throws IOException, SQLException { + String[] argv = { "--update-key", "ID" }; + final int TOTAL_RECORDS = 1; + createParquetFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(getMsgPrefix() + "0"); + } + + // Test Case for Issue [SQOOP-2846] + public void testParquetWithUpsert() throws IOException, SQLException { + String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" }; + final int TOTAL_RECORDS = 2; + // ColumnGenerator gen = colGenerator("100", + // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); + createParquetFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + try { + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + } catch (Exception e) { + // expected + assertTrue(true); + } + } public void testMissingParquetFields() throws IOException, SQLException { String[] argv = {}; final int TOTAL_RECORDS = 1; diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java index ed0dc31e..786bd94f 100644 --- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java @@ -301,8 +301,31 @@ protected void verifyExport(int expectedNumRecords, Connection conn) statement.close(); } - assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal, - maxMsg); + assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal, maxMsg); + } + + // Verify Export Method For checking update's : Issue [SQOOP-2846] + protected void verifyExport(String expectedValue) throws IOException, SQLException { + Connection conn = getConnection(); + LOG.info("Verifying export: " + getTableName()); + // Check that we got back the correct number of records. + PreparedStatement statement = conn.prepareStatement("SELECT MSG FROM " + getTableName(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + String actualValue = null; + ResultSet rs = null; + try { + rs = statement.executeQuery(); + try { + rs.next(); + actualValue = rs.getString(1); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + + assertEquals("Got back unexpected row count", expectedValue, actualValue); } /**