mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 02:52:19 +08:00
SQOOP-2846: Sqoop Export with update-key failing for avro data file
(VISHNU S NAIR via Jarek Jarcec Cecho)
This commit is contained in:
parent
a0b730c77e
commit
0c8b105481
@ -21,14 +21,23 @@
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.DefaultStringifier;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
|
||||
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
|
||||
|
||||
import com.cloudera.sqoop.manager.ConnManager;
|
||||
import com.cloudera.sqoop.manager.ExportJobContext;
|
||||
import com.cloudera.sqoop.mapreduce.ExportJobBase;
|
||||
@ -40,6 +49,8 @@
|
||||
*/
|
||||
public class JdbcUpdateExportJob extends ExportJobBase {
|
||||
|
||||
// Fix For Issue [SQOOP-2846]
|
||||
private FileType fileType;
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
JdbcUpdateExportJob.class.getName());
|
||||
|
||||
@ -64,11 +75,21 @@ public JdbcUpdateExportJob(final ExportJobContext ctxt,
|
||||
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
|
||||
}
|
||||
|
||||
// Fix For Issue [SQOOP-2846]
|
||||
@Override
|
||||
protected Class<? extends Mapper> getMapperClass() {
|
||||
if (inputIsSequenceFiles()) {
|
||||
if (isHCatJob) {
|
||||
return SqoopHCatUtilities.getExportMapperClass();
|
||||
}
|
||||
switch (fileType) {
|
||||
case SEQUENCE_FILE:
|
||||
return SequenceFileExportMapper.class;
|
||||
} else {
|
||||
case AVRO_DATA_FILE:
|
||||
return AvroExportMapper.class;
|
||||
case PARQUET_FILE:
|
||||
return ParquetExportMapper.class;
|
||||
case UNKNOWN:
|
||||
default:
|
||||
return TextExportMapper.class;
|
||||
}
|
||||
}
|
||||
@ -143,5 +164,69 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
throw new IOException("Could not load OutputFormat", cnfe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fix For Issue [SQOOP-2846]
|
||||
@Override
|
||||
protected void configureInputFormat(Job job, String tableName, String tableClassName,
|
||||
String splitByCol)
|
||||
throws ClassNotFoundException, IOException
|
||||
{
|
||||
|
||||
fileType = getInputFileType();
|
||||
|
||||
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
|
||||
|
||||
if (isHCatJob) {
|
||||
SqoopHCatUtilities.configureExportInputFormat(options, job, context.getConnManager(),
|
||||
tableName,
|
||||
job.getConfiguration());
|
||||
return;
|
||||
} else if (fileType == FileType.AVRO_DATA_FILE) {
|
||||
LOG.debug("Configuring for Avro export");
|
||||
configureGenericRecordExportInputFormat(job, tableName);
|
||||
} else if (fileType == FileType.PARQUET_FILE) {
|
||||
LOG.debug("Configuring for Parquet export");
|
||||
configureGenericRecordExportInputFormat(job, tableName);
|
||||
FileSystem fs = FileSystem.get(job.getConfiguration());
|
||||
String uri = "dataset:" + fs.makeQualified(getInputPath());
|
||||
DatasetKeyInputFormat.configure(job).readFrom(uri);
|
||||
}
|
||||
}
|
||||
|
||||
// Fix For Issue [SQOOP-2846]
|
||||
private void configureGenericRecordExportInputFormat(Job job, String tableName)
|
||||
throws IOException
|
||||
{
|
||||
ConnManager connManager = context.getConnManager();
|
||||
Map<String, Integer> columnTypeInts;
|
||||
if (options.getCall() == null) {
|
||||
columnTypeInts = connManager.getColumnTypes(tableName, options.getSqlQuery());
|
||||
} else {
|
||||
columnTypeInts = connManager.getColumnTypesForProcedure(options.getCall());
|
||||
}
|
||||
MapWritable columnTypes = new MapWritable();
|
||||
for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
|
||||
Text columnName = new Text(e.getKey());
|
||||
Text columnText = new Text(connManager.toJavaType(tableName, e.getKey(), e.getValue()));
|
||||
columnTypes.put(columnName, columnText);
|
||||
}
|
||||
DefaultStringifier.store(job.getConfiguration(), columnTypes,
|
||||
AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
|
||||
}
|
||||
|
||||
// Fix For Issue [SQOOP-2846]
|
||||
@Override
|
||||
protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException {
|
||||
if (isHCatJob) {
|
||||
return SqoopHCatUtilities.getInputFormatClass();
|
||||
}
|
||||
switch (fileType) {
|
||||
case AVRO_DATA_FILE:
|
||||
return AvroInputFormat.class;
|
||||
case PARQUET_FILE:
|
||||
return DatasetKeyInputFormat.class;
|
||||
default:
|
||||
return super.getInputFormatClass();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -33,6 +33,7 @@
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ -164,10 +165,12 @@ private Schema buildAvroSchema(ColumnGenerator... extraCols) {
|
||||
fields.add(buildAvroField("id", Schema.Type.INT));
|
||||
fields.add(buildAvroField("msg", Schema.Type.STRING));
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnAvroSchema() != null) {
|
||||
fields.add(buildAvroField(forIdx(colNum++),
|
||||
gen.getColumnAvroSchema()));
|
||||
// Issue [SQOOP-2846]
|
||||
if (null != extraCols) {
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnAvroSchema() != null) {
|
||||
fields.add(buildAvroField(forIdx(colNum++), gen.getColumnAvroSchema()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Schema schema = Schema.createRecord("myschema", null, null, false);
|
||||
@ -178,9 +181,12 @@ private Schema buildAvroSchema(ColumnGenerator... extraCols) {
|
||||
private void addExtraColumns(GenericRecord record, int rowNum,
|
||||
ColumnGenerator[] extraCols) {
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnAvroSchema() != null) {
|
||||
record.put(forIdx(colNum++), gen.getExportValue(rowNum));
|
||||
// Issue [SQOOP-2846]
|
||||
if (null != extraCols) {
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnAvroSchema() != null) {
|
||||
record.put(forIdx(colNum++), gen.getExportValue(rowNum));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -253,6 +259,39 @@ private void createTable(ColumnGenerator... extraColumns)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the table definition to export and also inserting one records for
|
||||
* identifying the updates. Issue [SQOOP-2846]
|
||||
*/
|
||||
private void createTableWithInsert() throws SQLException {
|
||||
Connection conn = getConnection();
|
||||
PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE TABLE ");
|
||||
sb.append(getTableName());
|
||||
sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
|
||||
sb.append(")");
|
||||
statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
Statement statement2 = conn.createStatement();
|
||||
String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
|
||||
statement2.execute(insertCmd);
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Verify that on a given row, a column has a given value.
|
||||
* @param id the id column specifying the row to test.
|
||||
*/
|
||||
@ -418,6 +457,33 @@ public void testMissingDatabaseFields() throws IOException, SQLException {
|
||||
verifyExport(TOTAL_RECORDS);
|
||||
}
|
||||
|
||||
// Test Case for Issue [SQOOP-2846]
|
||||
public void testAvroWithUpsert() throws IOException, SQLException {
|
||||
String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
|
||||
final int TOTAL_RECORDS = 2;
|
||||
// ColumnGenerator gen = colGenerator("100",
|
||||
// Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
|
||||
createAvroFile(0, TOTAL_RECORDS, null);
|
||||
createTableWithInsert();
|
||||
try {
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
} catch (Exception e) {
|
||||
// expected
|
||||
assertTrue(true);
|
||||
}
|
||||
}
|
||||
|
||||
// Test Case for Issue [SQOOP-2846]
|
||||
public void testAvroWithUpdateKey() throws IOException, SQLException {
|
||||
String[] argv = { "--update-key", "ID" };
|
||||
final int TOTAL_RECORDS = 1;
|
||||
// ColumnGenerator gen = colGenerator("100",
|
||||
// Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
|
||||
createAvroFile(0, TOTAL_RECORDS, null);
|
||||
createTableWithInsert();
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(getMsgPrefix() + "0");
|
||||
}
|
||||
public void testMissingAvroFields() throws IOException, SQLException {
|
||||
String[] argv = {};
|
||||
final int TOTAL_RECORDS = 1;
|
||||
|
@ -32,6 +32,7 @@
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ -143,10 +144,11 @@ private Schema buildSchema(ColumnGenerator... extraCols) {
|
||||
fields.add(buildField("id", Schema.Type.INT));
|
||||
fields.add(buildField("msg", Schema.Type.STRING));
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnParquetSchema() != null) {
|
||||
fields.add(buildParquetField(forIdx(colNum++),
|
||||
gen.getColumnParquetSchema()));
|
||||
if (null != extraCols) {
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnParquetSchema() != null) {
|
||||
fields.add(buildParquetField(forIdx(colNum++), gen.getColumnParquetSchema()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Schema schema = Schema.createRecord("myschema", null, null, false);
|
||||
@ -157,9 +159,11 @@ private Schema buildSchema(ColumnGenerator... extraCols) {
|
||||
private void addExtraColumns(GenericRecord record, int rowNum,
|
||||
ColumnGenerator[] extraCols) {
|
||||
int colNum = 0;
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnParquetSchema() != null) {
|
||||
record.put(forIdx(colNum++), gen.getExportValue(rowNum));
|
||||
if (null != extraCols) {
|
||||
for (ColumnGenerator gen : extraCols) {
|
||||
if (gen.getColumnParquetSchema() != null) {
|
||||
record.put(forIdx(colNum++), gen.getExportValue(rowNum));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -232,6 +236,38 @@ private void createTable(ColumnGenerator... extraColumns)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the table definition to export and also inserting one records for
|
||||
* identifying the updates. Issue [SQOOP-2846]
|
||||
*/
|
||||
private void createTableWithInsert() throws SQLException {
|
||||
Connection conn = getConnection();
|
||||
PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("CREATE TABLE ");
|
||||
sb.append(getTableName());
|
||||
sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
|
||||
sb.append(")");
|
||||
statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
try {
|
||||
statement.executeUpdate();
|
||||
Statement statement2 = conn.createStatement();
|
||||
String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
|
||||
statement2.execute(insertCmd);
|
||||
conn.commit();
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
}
|
||||
|
||||
/** Verify that on a given row, a column has a given value.
|
||||
* @param id the id column specifying the row to test.
|
||||
*/
|
||||
@ -369,6 +405,30 @@ public void testMissingDatabaseFields() throws IOException, SQLException {
|
||||
verifyExport(TOTAL_RECORDS);
|
||||
}
|
||||
|
||||
public void testParquetWithUpdateKey() throws IOException, SQLException {
|
||||
String[] argv = { "--update-key", "ID" };
|
||||
final int TOTAL_RECORDS = 1;
|
||||
createParquetFile(0, TOTAL_RECORDS, null);
|
||||
createTableWithInsert();
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(getMsgPrefix() + "0");
|
||||
}
|
||||
|
||||
// Test Case for Issue [SQOOP-2846]
|
||||
public void testParquetWithUpsert() throws IOException, SQLException {
|
||||
String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
|
||||
final int TOTAL_RECORDS = 2;
|
||||
// ColumnGenerator gen = colGenerator("100",
|
||||
// Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
|
||||
createParquetFile(0, TOTAL_RECORDS, null);
|
||||
createTableWithInsert();
|
||||
try {
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
} catch (Exception e) {
|
||||
// expected
|
||||
assertTrue(true);
|
||||
}
|
||||
}
|
||||
public void testMissingParquetFields() throws IOException, SQLException {
|
||||
String[] argv = {};
|
||||
final int TOTAL_RECORDS = 1;
|
||||
|
@ -301,8 +301,31 @@ protected void verifyExport(int expectedNumRecords, Connection conn)
|
||||
statement.close();
|
||||
}
|
||||
|
||||
assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal,
|
||||
maxMsg);
|
||||
assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal, maxMsg);
|
||||
}
|
||||
|
||||
// Verify Export Method For checking update's : Issue [SQOOP-2846]
|
||||
protected void verifyExport(String expectedValue) throws IOException, SQLException {
|
||||
Connection conn = getConnection();
|
||||
LOG.info("Verifying export: " + getTableName());
|
||||
// Check that we got back the correct number of records.
|
||||
PreparedStatement statement = conn.prepareStatement("SELECT MSG FROM " + getTableName(),
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
String actualValue = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
rs = statement.executeQuery();
|
||||
try {
|
||||
rs.next();
|
||||
actualValue = rs.getString(1);
|
||||
} finally {
|
||||
rs.close();
|
||||
}
|
||||
} finally {
|
||||
statement.close();
|
||||
}
|
||||
|
||||
assertEquals("Got back unexpected row count", expectedValue, actualValue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user