mirror of
https://github.com/apache/sqoop.git
synced 2025-05-19 02:10:54 +08:00
SQOOP-785: Add splitter implementation for NUMERIC/DECIMAL type
(Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
parent
c8af63092d
commit
641a9c856f
@ -65,7 +65,12 @@ public enum GenericJdbcConnectorError implements ErrorCode {
|
||||
/** The table columns cannot be specified when
|
||||
* the table sql is specified during export. */
|
||||
GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified "
|
||||
+ "when the table sql is specified during export");
|
||||
+ "when the table sql is specified during export"),
|
||||
|
||||
/** Unsupported values in partition column */
|
||||
GENERIC_JDBC_CONNECTOR_0015("Partition column contains unsupported values"),
|
||||
|
||||
;
|
||||
|
||||
private final String message;
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Types;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -30,6 +31,8 @@
|
||||
|
||||
public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
|
||||
|
||||
private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
|
||||
|
||||
private long numberPartitions;
|
||||
private String partitionColumnName;
|
||||
private int partitionColumnType;
|
||||
@ -61,7 +64,7 @@ public List<Partition> getPartitions(PartitionerContext context,ConnectionConfig
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
// Decimal column
|
||||
// TODO: Add partition function
|
||||
return partitionNumericColumn();
|
||||
|
||||
case Types.BIT:
|
||||
case Types.BOOLEAN:
|
||||
@ -163,6 +166,68 @@ protected List<Partition> partitionFloatingPointColumn() {
|
||||
return partitions;
|
||||
}
|
||||
|
||||
protected List<Partition> partitionNumericColumn() {
|
||||
List<Partition> partitions = new LinkedList<Partition>();
|
||||
|
||||
// All null valeus will result in single partition
|
||||
if (partitionMinValue == null && partitionMaxValue == null) {
|
||||
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
|
||||
partition.setConditions(partitionColumnName + "IS NULL");
|
||||
partitions.add(partition);
|
||||
return partitions;
|
||||
}
|
||||
|
||||
// Having one end in null is not supported
|
||||
if (partitionMinValue == null || partitionMaxValue == null) {
|
||||
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
|
||||
}
|
||||
|
||||
BigDecimal minValue = new BigDecimal(partitionMinValue);
|
||||
BigDecimal maxValue = new BigDecimal(partitionMaxValue);
|
||||
|
||||
// Get all the split points together.
|
||||
List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
|
||||
|
||||
BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
|
||||
if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
|
||||
splitSize = NUMERIC_MIN_INCREMENT;
|
||||
}
|
||||
|
||||
BigDecimal curVal = minValue;
|
||||
|
||||
while (curVal.compareTo(maxValue) <= 0) {
|
||||
splitPoints.add(curVal);
|
||||
curVal = curVal.add(splitSize);
|
||||
}
|
||||
|
||||
if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
|
||||
splitPoints.remove(splitPoints.size() - 1);
|
||||
splitPoints.add(maxValue);
|
||||
}
|
||||
|
||||
// Turn the split points into a set of intervals.
|
||||
BigDecimal start = splitPoints.get(0);
|
||||
for (int i = 1; i < splitPoints.size(); i++) {
|
||||
BigDecimal end = splitPoints.get(i);
|
||||
|
||||
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
|
||||
partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
|
||||
partitions.add(partition);
|
||||
|
||||
start = end;
|
||||
}
|
||||
|
||||
return partitions;
|
||||
}
|
||||
|
||||
protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
|
||||
try {
|
||||
return numerator.divide(denominator);
|
||||
} catch (ArithmeticException ae) {
|
||||
return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
|
||||
}
|
||||
}
|
||||
|
||||
protected String constructConditions(
|
||||
Object lowerBound, Object upperBound, boolean lastOne) {
|
||||
StringBuilder conditions = new StringBuilder();
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.jdbc;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Types;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -178,7 +179,6 @@ public void testFloatingPointUnevenPartition() throws Exception {
|
||||
context.setString(
|
||||
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
|
||||
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
|
||||
context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
|
||||
|
||||
ConnectionConfiguration connConf = new ConnectionConfiguration();
|
||||
ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||
@ -194,6 +194,50 @@ public void testFloatingPointUnevenPartition() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
public void testNumericEvenPartition() throws Exception {
|
||||
MutableContext context = new MutableMapContext();
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL");
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
|
||||
|
||||
ConnectionConfiguration connConf = new ConnectionConfiguration();
|
||||
ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||
|
||||
Partitioner partitioner = new GenericJdbcImportPartitioner();
|
||||
PartitionerContext partitionerContext = new PartitionerContext(context, 5);
|
||||
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
|
||||
|
||||
verifyResult(partitions, new String[] {
|
||||
"-5 <= ICOL AND ICOL < -3",
|
||||
"-3 <= ICOL AND ICOL < -1",
|
||||
"-1 <= ICOL AND ICOL < 1",
|
||||
"1 <= ICOL AND ICOL < 3",
|
||||
"3 <= ICOL AND ICOL <= 5"
|
||||
});
|
||||
}
|
||||
|
||||
public void testNumericUnevenPartition() throws Exception {
|
||||
MutableContext context = new MutableMapContext();
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
|
||||
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
|
||||
|
||||
ConnectionConfiguration connConf = new ConnectionConfiguration();
|
||||
ImportJobConfiguration jobConf = new ImportJobConfiguration();
|
||||
|
||||
Partitioner partitioner = new GenericJdbcImportPartitioner();
|
||||
PartitionerContext partitionerContext = new PartitionerContext(context, 3);
|
||||
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
|
||||
|
||||
verifyResult(partitions, new String[]{
|
||||
"-5 <= DCOL AND DCOL < -2",
|
||||
"-2 <= DCOL AND DCOL < 1",
|
||||
"1 <= DCOL AND DCOL <= 5"
|
||||
});
|
||||
}
|
||||
|
||||
private void verifyResult(List<Partition> partitions,
|
||||
String[] expected) {
|
||||
assertEquals(expected.length, partitions.size());
|
||||
|
Loading…
Reference in New Issue
Block a user