mirror of
https://github.com/apache/sqoop.git
synced 2025-05-07 00:39:46 +08:00
SQOOP-1049: Sqoop2: Record not imported if partition column value is NULL
(Mengwei Ding via Jarek Jarcec Cecho)
This commit is contained in:
parent
abb8cf8754
commit
91b121ccaa
@ -363,7 +363,9 @@ public static String toJson(Object configuration) {
|
||||
jsonForm.put(inputName, value);
|
||||
} else if(type.isEnum()) {
|
||||
jsonForm.put(inputName, value.toString());
|
||||
} else {
|
||||
} else if(type == Boolean.class) {
|
||||
jsonForm.put(inputName, value);
|
||||
}else {
|
||||
throw new SqoopException(ModelError.MODEL_004,
|
||||
"Unsupported type " + type.getName() + " for input " + formName + "." + inputName);
|
||||
}
|
||||
@ -450,7 +452,9 @@ public static void fillValues(String json, Object configuration) {
|
||||
inputField.set(formValue, ((Long)jsonInputs.get(inputName)).intValue());
|
||||
} else if(type.isEnum()) {
|
||||
inputField.set(formValue, Enum.valueOf((Class<? extends Enum>) inputField.getType(), (String) jsonInputs.get(inputName)));
|
||||
} else {
|
||||
} else if(type == Boolean.class) {
|
||||
inputField.set(formValue, (Boolean) jsonInputs.get(inputName));
|
||||
}else {
|
||||
throw new SqoopException(ModelError.MODEL_004,
|
||||
"Unsupported type " + type.getName() + " for input " + formName + "." + inputName);
|
||||
}
|
||||
|
@ -45,41 +45,58 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
|
||||
private int partitionColumnType;
|
||||
private String partitionMinValue;
|
||||
private String partitionMaxValue;
|
||||
private Boolean partitionColumnNull;
|
||||
|
||||
@Override
|
||||
public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) {
|
||||
List<Partition> partitions = new LinkedList<Partition>();
|
||||
|
||||
numberPartitions = context.getMaxPartitions();
|
||||
partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
|
||||
partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
|
||||
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
|
||||
partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
|
||||
|
||||
partitionColumnNull = job.table.partitionColumnNull;
|
||||
if (partitionColumnNull == null) {
|
||||
partitionColumnNull = false;
|
||||
}
|
||||
|
||||
if (partitionMinValue == null && partitionMaxValue == null) {
|
||||
List<Partition> partitions = new LinkedList<Partition>();
|
||||
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
|
||||
partition.setConditions(partitionColumnName + " IS NULL");
|
||||
partitions.add(partition);
|
||||
return partitions;
|
||||
}
|
||||
|
||||
if (partitionColumnNull) {
|
||||
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
|
||||
partition.setConditions(partitionColumnName + " IS NULL");
|
||||
partitions.add(partition);
|
||||
numberPartitions -= 1;
|
||||
}
|
||||
|
||||
switch (partitionColumnType) {
|
||||
case Types.TINYINT:
|
||||
case Types.SMALLINT:
|
||||
case Types.INTEGER:
|
||||
case Types.BIGINT:
|
||||
// Integer column
|
||||
return partitionIntegerColumn();
|
||||
partitions.addAll(partitionIntegerColumn());
|
||||
break;
|
||||
|
||||
case Types.REAL:
|
||||
case Types.FLOAT:
|
||||
case Types.DOUBLE:
|
||||
// Floating point column
|
||||
return partitionFloatingPointColumn();
|
||||
partitions.addAll(partitionFloatingPointColumn());
|
||||
break;
|
||||
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
// Decimal column
|
||||
return partitionNumericColumn();
|
||||
partitions.addAll(partitionNumericColumn());
|
||||
break;
|
||||
|
||||
case Types.BIT:
|
||||
case Types.BOOLEAN:
|
||||
@ -90,20 +107,25 @@ public List<Partition> getPartitions(PartitionerContext context,ConnectionConfig
|
||||
case Types.TIME:
|
||||
case Types.TIMESTAMP:
|
||||
// Date time column
|
||||
return partitionDateTimeColumn();
|
||||
partitions.addAll(partitionDateTimeColumn());
|
||||
break;
|
||||
|
||||
case Types.CHAR:
|
||||
case Types.VARCHAR:
|
||||
case Types.LONGVARCHAR:
|
||||
// Text column
|
||||
return partitionTextColumn();
|
||||
partitions.addAll(partitionTextColumn());
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new SqoopException(
|
||||
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
|
||||
String.valueOf(partitionColumnType));
|
||||
}
|
||||
|
||||
return partitions;
|
||||
}
|
||||
|
||||
protected List<Partition> partitionDateTimeColumn() {
|
||||
List<Partition> partitions = new LinkedList<Partition>();
|
||||
|
||||
|
@ -30,5 +30,6 @@ public class ImportTableForm {
|
||||
@Input(size = 2000) public String sql;
|
||||
@Input(size = 50) public String columns;
|
||||
@Input(size = 50) public String partitionColumn;
|
||||
@Input public Boolean partitionColumnNull;
|
||||
@Input(size = 50) public String boundaryQuery;
|
||||
}
|
||||
|
@ -81,6 +81,10 @@ table.dataDirectory.help = The sub-directory under warehouse for data
|
||||
table.partitionColumn.label = Partition column name
|
||||
table.partitionColumn.help = A specific column for data partition
|
||||
|
||||
# Table pcol is null
|
||||
table.partitionColumnNull.label = Nulls in partition column
|
||||
table.partitionColumnNull.help = Whether there are null values in partition column
|
||||
|
||||
# Table boundary
|
||||
table.boundaryQuery.label = Boundary query
|
||||
table.boundaryQuery.help = The boundary query for data partition
|
||||
|
@ -457,6 +457,36 @@ public void testVarcharPartitionWithCommonPrefix() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
public void testPatitionWithNullValues() throws Exception {
|
||||
MutableContext context = new MutableMapContext();
|
||||
context.setString(GenericJdbcConnectorConstants
|
||||
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
|
||||
context.setString(GenericJdbcConnectorConstants
|
||||
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
|
||||
context.setString(GenericJdbcConnectorConstants
|
||||
.CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
|
||||
context.setString(GenericJdbcConnectorConstants
|
||||
.CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE");
|
||||
|
||||
ConnectionConfiguration connConf = new ConnectionConfiguration();
|
||||
ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||
jobConf.table.partitionColumnNull = true;
|
||||
|
||||
Partitioner partitioner = new GenericJdbcImportPartitioner();
|
||||
PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
|
||||
|
||||
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
|
||||
|
||||
verifyResult(partitions, new String[] {
|
||||
"VCCOL IS NULL",
|
||||
"'AAA' <= VCCOL AND VCCOL < 'AAB'",
|
||||
"'AAB' <= VCCOL AND VCCOL < 'AAC'",
|
||||
"'AAC' <= VCCOL AND VCCOL < 'AAD'",
|
||||
"'AAD' <= VCCOL AND VCCOL <= 'AAE'",
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private void verifyResult(List<Partition> partitions,
|
||||
String[] expected) {
|
||||
assertEquals(expected.length, partitions.size());
|
||||
|
Loading…
Reference in New Issue
Block a user