From 641a9c856f2bfdcb3612a028d522969507c1e336 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Thu, 14 Feb 2013 23:48:42 -0800 Subject: [PATCH] SQOOP-785: Add splitter implementation for NUMERIC/DECIMAL type (Jarcec Cecho via Cheolsoo Park) --- .../jdbc/GenericJdbcConnectorError.java | 7 +- .../jdbc/GenericJdbcImportPartitioner.java | 67 ++++++++++++++++++- .../connector/jdbc/TestImportPartitioner.java | 46 ++++++++++++- 3 files changed, 117 insertions(+), 3 deletions(-) diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index 1fcea5f3..f2ac979d 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -65,7 +65,12 @@ public enum GenericJdbcConnectorError implements ErrorCode { /** The table columns cannot be specified when * the table sql is specified during export. */ GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified " - + "when the table sql is specified during export"); + + "when the table sql is specified during export"), + + /** Unsupported values in partition column */ + GENERIC_JDBC_CONNECTOR_0015("Partition column contains unsupported values"), + + ; private final String message; diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index d276c577..6d1a9fd7 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.jdbc; +import java.math.BigDecimal; import java.sql.Types; import java.util.LinkedList; import java.util.List; @@ -30,6 +31,8 @@ public class GenericJdbcImportPartitioner extends Partitioner { + private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE); + private long numberPartitions; private String partitionColumnName; private int partitionColumnType; @@ -61,7 +64,7 @@ public List getPartitions(PartitionerContext context,ConnectionConfig case Types.NUMERIC: case Types.DECIMAL: // Decimal column - // TODO: Add partition function + return partitionNumericColumn(); case Types.BIT: case Types.BOOLEAN: @@ -163,6 +166,68 @@ protected List partitionFloatingPointColumn() { return partitions; } + protected List partitionNumericColumn() { + List partitions = new LinkedList(); + + // All null valeus will result in single partition + if (partitionMinValue == null && partitionMaxValue == null) { + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(partitionColumnName + "IS NULL"); + partitions.add(partition); + return partitions; + } + + // Having one end in null is not supported + if (partitionMinValue == null || partitionMaxValue == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015); + } + + BigDecimal minValue = new BigDecimal(partitionMinValue); + BigDecimal maxValue = new BigDecimal(partitionMaxValue); + + // Get all the split points together. + List splitPoints = new LinkedList(); + + BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions)); + if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { + splitSize = NUMERIC_MIN_INCREMENT; + } + + BigDecimal curVal = minValue; + + while (curVal.compareTo(maxValue) <= 0) { + splitPoints.add(curVal); + curVal = curVal.add(splitSize); + } + + if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) { + splitPoints.remove(splitPoints.size() - 1); + splitPoints.add(maxValue); + } + + // Turn the split points into a set of intervals. + BigDecimal start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + BigDecimal end = splitPoints.get(i); + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1)); + partitions.add(partition); + + start = end; + } + + return partitions; + } + + protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) { + try { + return numerator.divide(denominator); + } catch (ArithmeticException ae) { + return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP); + } + } + protected String constructConditions( Object lowerBound, Object upperBound, boolean lastOne) { StringBuilder conditions = new StringBuilder(); diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java index 43eb1c29..3150e7ce 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.jdbc; +import java.math.BigDecimal; import java.sql.Types; import java.util.Iterator; import java.util.List; @@ -178,7 +179,6 @@ public void testFloatingPointUnevenPartition() throws Exception { context.setString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); - context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3"); ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); @@ -194,6 +194,50 @@ public void testFloatingPointUnevenPartition() throws Exception { }); } + public void testNumericEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + public void testNumericUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3); + List partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "-5 <= DCOL AND DCOL < -2", + "-2 <= DCOL AND DCOL < 1", + "1 <= DCOL AND DCOL <= 5" + }); + } + private void verifyResult(List partitions, String[] expected) { assertEquals(expected.length, partitions.size());