From 91b121ccaa9e40dc8a86b3a00d68300e7ba4c890 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sun, 21 Jul 2013 10:27:55 -0700 Subject: [PATCH] SQOOP-1049: Sqoop2: Record not imported if partition column value is NULL (Mengwei Ding via Jarek Jarcec Cecho) --- .../org/apache/sqoop/model/FormUtils.java | 8 +++-- .../jdbc/GenericJdbcImportPartitioner.java | 34 +++++++++++++++---- .../jdbc/configuration/ImportTableForm.java | 1 + ...eneric-jdbc-connector-resources.properties | 4 +++ .../connector/jdbc/TestImportPartitioner.java | 30 ++++++++++++++++ 5 files changed, 69 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/sqoop/model/FormUtils.java b/common/src/main/java/org/apache/sqoop/model/FormUtils.java index f3dee8eb..27db8af3 100644 --- a/common/src/main/java/org/apache/sqoop/model/FormUtils.java +++ b/common/src/main/java/org/apache/sqoop/model/FormUtils.java @@ -363,7 +363,9 @@ public static String toJson(Object configuration) { jsonForm.put(inputName, value); } else if(type.isEnum()) { jsonForm.put(inputName, value.toString()); - } else { + } else if(type == Boolean.class) { + jsonForm.put(inputName, value); + }else { throw new SqoopException(ModelError.MODEL_004, "Unsupported type " + type.getName() + " for input " + formName + "." + inputName); } @@ -450,7 +452,9 @@ public static void fillValues(String json, Object configuration) { inputField.set(formValue, ((Long)jsonInputs.get(inputName)).intValue()); } else if(type.isEnum()) { inputField.set(formValue, Enum.valueOf((Class) inputField.getType(), (String) jsonInputs.get(inputName))); - } else { + } else if(type == Boolean.class) { + inputField.set(formValue, (Boolean) jsonInputs.get(inputName)); + }else { throw new SqoopException(ModelError.MODEL_004, "Unsupported type " + type.getName() + " for input " + formName + "." + inputName); } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index 7792c574..8d0c4ab0 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -45,41 +45,58 @@ public class GenericJdbcImportPartitioner extends Partitioner getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) { + List partitions = new LinkedList(); + numberPartitions = context.getMaxPartitions(); partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1); partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE); partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE); + partitionColumnNull = job.table.partitionColumnNull; + if (partitionColumnNull == null) { + partitionColumnNull = false; + } + if (partitionMinValue == null && partitionMaxValue == null) { - List partitions = new LinkedList(); GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); partition.setConditions(partitionColumnName + " IS NULL"); partitions.add(partition); return partitions; } + if (partitionColumnNull) { + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(partitionColumnName + " IS NULL"); + partitions.add(partition); + numberPartitions -= 1; + } + switch (partitionColumnType) { case Types.TINYINT: case Types.SMALLINT: case Types.INTEGER: case Types.BIGINT: // Integer column - return partitionIntegerColumn(); + partitions.addAll(partitionIntegerColumn()); + break; case Types.REAL: case Types.FLOAT: case Types.DOUBLE: // Floating point column - return partitionFloatingPointColumn(); + partitions.addAll(partitionFloatingPointColumn()); + break; case Types.NUMERIC: case Types.DECIMAL: // Decimal column - return partitionNumericColumn(); + partitions.addAll(partitionNumericColumn()); + break; case Types.BIT: case Types.BOOLEAN: @@ -90,20 +107,25 @@ public List getPartitions(PartitionerContext context,ConnectionConfig case Types.TIME: case Types.TIMESTAMP: // Date time column - return partitionDateTimeColumn(); + partitions.addAll(partitionDateTimeColumn()); + break; case Types.CHAR: case Types.VARCHAR: case Types.LONGVARCHAR: // Text column - return partitionTextColumn(); + partitions.addAll(partitionTextColumn()); + break; default: throw new SqoopException( GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, String.valueOf(partitionColumnType)); } + + return partitions; } + protected List partitionDateTimeColumn() { List partitions = new LinkedList(); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java index ef272365..0991b283 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java @@ -30,5 +30,6 @@ public class ImportTableForm { @Input(size = 2000) public String sql; @Input(size = 50) public String columns; @Input(size = 50) public String partitionColumn; + @Input public Boolean partitionColumnNull; @Input(size = 50) public String boundaryQuery; } diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties index 44fc984d..0950e322 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-resources.properties @@ -81,6 +81,10 @@ table.dataDirectory.help = The sub-directory under warehouse for data table.partitionColumn.label = Partition column name table.partitionColumn.help = A specific column for data partition +# Table pcol is null +table.partitionColumnNull.label = Nulls in partition column +table.partitionColumnNull.help = Whether there are null values in partition column + # Table boundary table.boundaryQuery.label = Boundary query table.boundaryQuery.help = The boundary query for data partition diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java index 7ecc9000..a33dd6cf 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -457,6 +457,36 @@ public void testVarcharPartitionWithCommonPrefix() throws Exception { } + public void testPatitionWithNullValues() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + jobConf.table.partitionColumnNull = true; + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "VCCOL IS NULL", + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL <= 'AAE'", + }); + + } + private void verifyResult(List partitions, String[] expected) { assertEquals(expected.length, partitions.size());