diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java index 3b0b568c..afc50164 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java @@ -371,9 +371,9 @@ private void insertObjectIntoPreparedStatement(PreparedStatement preparedStateme } else if (value instanceof Character) { preparedStatement.setString(parameterIndex, value.toString()); } else if (value instanceof Timestamp) { - preparedStatement.setString(parameterIndex, value.toString()); + preparedStatement.setTimestamp(parameterIndex, (Timestamp) value); } else if (value instanceof BigDecimal) { - preparedStatement.setString(parameterIndex, value.toString()); + preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value); } else { preparedStatement.setObject(parameterIndex, value); } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index ff33a4bb..7c943c2b 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -50,7 +50,7 @@ */ @edu.umd.cs.findbugs.annotations.SuppressWarnings({ "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"}) -public class GenericJdbcExecutor { +public class GenericJdbcExecutor implements AutoCloseable { private static final Logger LOG = Logger.getLogger(GenericJdbcExecutor.class); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java index edb27548..d56abec3 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -35,7 +36,8 @@ import org.joda.time.LocalDateTime; import org.joda.time.LocalTime; -@edu.umd.cs.findbugs.annotations.SuppressWarnings("SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE") +@edu.umd.cs.findbugs.annotations.SuppressWarnings( + {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"}) public class GenericJdbcExtractor extends Extractor { public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class); @@ -43,19 +45,14 @@ public class GenericJdbcExtractor extends Extractor sqlTypes; + private List params; - public void setConditions(String conditions) { - this.conditions = conditions; + + public GenericJdbcPartition() { + sqlTypes = new ArrayList<>(); + params = new ArrayList<>(); } - public String getConditions() { - return conditions; + public void setCondition(String condition) { + this.condition = condition; + } + + public String getCondition() { + return condition; + } + + public void addParamsToPreparedStatement(PreparedStatement ps) throws SQLException { + for (int i = 0; i < params.size(); i++) { + ps.setObject(i + 1, params.get(i)); + } + } + + public void addParam(int sqlType, Object param) { + sqlTypes.add(sqlType); + params.add(param); + } + + + public List getParams() { + return params; + } + + public List getSqlTypes() { + return sqlTypes; } @Override public void readFields(DataInput in) throws IOException { - conditions = in.readUTF(); + int numParams = in.readInt(); + condition = in.readUTF(); + for (int i = 0; i < numParams; i++) { + int type = in.readInt(); + sqlTypes.add(type); + switch (type) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + params.add(in.readLong()); + break; + case Types.REAL: + case Types.FLOAT: + params.add(in.readDouble()); + break; + case Types.DOUBLE: + case Types.NUMERIC: + case Types.DECIMAL: + params.add(new BigDecimal(in.readUTF())); + break; + case Types.BIT: + case Types.BOOLEAN: + params.add(in.readBoolean()); + break; + case Types.DATE: + params.add(new java.sql.Date(in.readLong())); + break; + case Types.TIME: + params.add(new java.sql.Time(in.readLong())); + break; + case Types.TIMESTAMP: + params.add(new java.sql.Timestamp(in.readLong())); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + params.add(in.readUTF()); + break; + default: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, + String.valueOf(type)); + } + } } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(conditions); + out.writeInt(sqlTypes.size()); + out.writeUTF(condition); + for (int i = 0; i < sqlTypes.size(); i++) { + int type = sqlTypes.get(i); + out.writeInt(type); + switch (type) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + out.writeLong((Long) params.get(i)); + break; + case Types.REAL: + case Types.FLOAT: + out.writeDouble((Double) params.get(i)); + break; + case Types.DOUBLE: + case Types.NUMERIC: + case Types.DECIMAL: + out.writeUTF(params.get(i).toString()); + break; + case Types.BIT: + case Types.BOOLEAN: + out.writeBoolean((Boolean) params.get(i)); + break; + case Types.DATE: + out.writeLong(((java.sql.Date) params.get(i)).getTime()); + break; + case Types.TIME: + out.writeLong(((java.sql.Time) params.get(i)).getTime()); + break; + case Types.TIMESTAMP: + out.writeLong(((java.sql.Timestamp) params.get(i)).getTime()); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + out.writeUTF((String) params.get(i)); + break; + default: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, + String.valueOf(type)); + } + } } @Override - public String toString() { - return conditions; + public String toString(){ + return asStringWithTimezone(TimeZone.getTimeZone("UTC")); } -} + public String asStringWithTimezone(TimeZone timeZone) { + DateFormat dateDateFormat = new SimpleDateFormat("yyyy-MM-dd"); + + DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss"); + timeDateFormat.setTimeZone(timeZone); + + DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + timestampDateFormat.setTimeZone(timeZone); + + String condition = getCondition(); + for (int j = 0; j < getParams().size(); j++) { + Object param = getParams().get(j); + int sqlType = getSqlTypes().get(j); + switch (sqlType) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + case Types.NUMERIC: + case Types.DECIMAL: + case Types.BIT: + case Types.BOOLEAN: + condition = condition.replaceFirst("\\?", param.toString()); + break; + case Types.DATE: + condition = condition.replaceFirst("\\?", "'" + dateDateFormat.format(param) + "'"); + break; + case Types.TIME: + condition = condition.replaceFirst("\\?", "'" + timeDateFormat.format(param) + "'"); + break; + case Types.TIMESTAMP: + condition = condition.replaceFirst("\\?", "'" + timestampDateFormat.format(param) + "'"); + break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + default: + condition = condition.replaceFirst("\\?", "'" + param.toString() + "'"); + break; + } + } + return condition; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GenericJdbcPartition that = (GenericJdbcPartition) o; + + if (getCondition() != null ? !getCondition().equals(that.getCondition()) + : that.getCondition() != null) + return false; + if (!getSqlTypes().equals(that.getSqlTypes())) return false; + return getParams().equals(that.getParams()); + + } + + @Override + public int hashCode() { + int result = getCondition() != null ? getCondition().hashCode() : 0; + result = 31 * result + getSqlTypes().hashCode(); + result = 31 * result + getParams().hashCode(); + return result; + } +} \ No newline at end of file diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java index 2a42ed45..77b486cd 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java @@ -49,7 +49,7 @@ public class GenericJdbcPartitioner extends Partitioner getPartitions(PartitionerContext context, LinkConfiguration linkConfig, - FromJobConfiguration fromJobConfig) { + FromJobConfiguration fromJobConfig) { List partitions = new LinkedList(); numberPartitions = context.getMaxPartitions(); @@ -65,14 +65,14 @@ public List getPartitions(PartitionerContext context, LinkConfigurati if (partitionMinValue == null && partitionMaxValue == null) { GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(partitionColumnName + " IS NULL"); + partition.setCondition(partitionColumnName + " IS NULL"); partitions.add(partition); return partitions; } if (allowNullValueInPartitionColumn) { GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(partitionColumnName + " IS NULL"); + partition.setCondition(partitionColumnName + " IS NULL"); partitions.add(partition); if (numberPartitions > 1) { numberPartitions -= 1; @@ -83,48 +83,48 @@ public List getPartitions(PartitionerContext context, LinkConfigurati } switch (partitionColumnType) { - case Types.TINYINT: - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - // Integer column - partitions.addAll(partitionIntegerColumn()); - break; + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + // Integer column + partitions.addAll(partitionIntegerColumn(partitionColumnType)); + break; - case Types.REAL: - case Types.FLOAT: - case Types.DOUBLE: - // Floating point column - partitions.addAll(partitionFloatingPointColumn()); - break; + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + // Floating point column + partitions.addAll(partitionFloatingPointColumn(partitionColumnType)); + break; - case Types.NUMERIC: - case Types.DECIMAL: - // Decimal column - partitions.addAll(partitionNumericColumn()); - break; + case Types.NUMERIC: + case Types.DECIMAL: + // Decimal column + partitions.addAll(partitionNumericColumn(partitionColumnType)); + break; - case Types.BIT: - case Types.BOOLEAN: - // Boolean column - return partitionBooleanColumn(); + case Types.BIT: + case Types.BOOLEAN: + // Boolean column + return partitionBooleanColumn(); - case Types.DATE: - case Types.TIME: - case Types.TIMESTAMP: - // Date time column - partitions.addAll(partitionDateTimeColumn()); - break; + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + // Date time column + partitions.addAll(partitionDateTimeColumn()); + break; - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGVARCHAR: - // Text column - partitions.addAll(partitionTextColumn()); - break; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + // Text column + partitions.addAll(partitionTextColumn()); + break; - default: - throw new SqoopException( + default: + throw new SqoopException( GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, String.valueOf(partitionColumnType)); } @@ -135,33 +135,8 @@ public List getPartitions(PartitionerContext context, LinkConfigurati protected List partitionDateTimeColumn() { List partitions = new LinkedList(); - long minDateValue = 0; - long maxDateValue = 0; - SimpleDateFormat sdf = null; - switch(partitionColumnType) { - case Types.DATE: - sdf = new SimpleDateFormat("yyyy-MM-dd"); - minDateValue = Date.valueOf(partitionMinValue).getTime(); - maxDateValue = Date.valueOf(partitionMaxValue).getTime(); - break; - case Types.TIME: - sdf = new SimpleDateFormat("HH:mm:ss"); - minDateValue = Time.valueOf(partitionMinValue).getTime(); - maxDateValue = Time.valueOf(partitionMaxValue).getTime(); - break; - // Here should be the type of Types.TIMESTAMP: - default: - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - minDateValue = Timestamp.valueOf(partitionMinValue).getTime(); - maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime(); - break; - } - - - minDateValue += TimeZone.getDefault().getOffset(minDateValue); - maxDateValue += TimeZone.getDefault().getOffset(maxDateValue); - - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + long minDateValue = Long.parseLong(partitionMinValue); + long maxDateValue = Long.parseLong(partitionMaxValue); long interval = (maxDateValue - minDateValue) / numberPartitions; long remainder = (maxDateValue - minDateValue) % numberPartitions; @@ -173,57 +148,41 @@ protected List partitionDateTimeColumn() { long lowerBound; long upperBound = minDateValue; - Object objLB = null; - Object objUB = null; - + GenericJdbcPartition partition; for (int i = 1; i < numberPartitions; i++) { lowerBound = upperBound; upperBound = lowerBound + interval; upperBound += (i <= remainder) ? 1 : 0; + partition = new GenericJdbcPartition(); switch(partitionColumnType) { case Types.DATE: - objLB = new Date(lowerBound); - objUB = new Date(upperBound); + constructDateConditions(partition, Types.DATE, new Date(lowerBound), new Date(upperBound), false); break; case Types.TIME: - objLB = new Time(lowerBound); - objUB = new Time(upperBound); - + constructDateConditions(partition, Types.TIME, new Time(lowerBound), new Time(upperBound), false); break; // Here should be the type of Types.TIMESTAMP: default: - objLB = new Timestamp(lowerBound); - objUB = new Timestamp(upperBound); + constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(lowerBound), new Timestamp(upperBound), false); break; } - - GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructDateConditions(sdf, objLB, objUB, false)); partitions.add(partition); } + partition = new GenericJdbcPartition(); switch(partitionColumnType) { case Types.DATE: - objLB = new Date(upperBound); - objUB = new Date(maxDateValue); + constructDateConditions(partition, Types.DATE, new Date(upperBound), new Date(maxDateValue), true); break; case Types.TIME: - objLB = new Time(upperBound); - objUB = new Time(maxDateValue); + constructDateConditions(partition, Types.TIME, new Time(upperBound), new Time(maxDateValue), true); break; // Here should be the type of Types.TIMESTAMP: default: - objLB = new Timestamp(upperBound); - objUB = new Timestamp(maxDateValue); + constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(upperBound), new Timestamp(maxDateValue), true); break; } - - - GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructDateConditions(sdf, objLB, objUB, true)); partitions.add(partition); return partitions; } @@ -236,7 +195,7 @@ protected List partitionTextColumn() { // Remove common prefix if any as it does not affect outcome. int maxPrefixLen = Math.min(partitionMinValue.length(), - partitionMaxValue.length()); + partitionMaxValue.length()); // Calculate common prefix length int cpLen = 0; @@ -259,8 +218,8 @@ protected List partitionTextColumn() { // Having one single value means that we can create only one single split if(minStringBD.equals(maxStringBD)) { GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(constructTextConditions(prefix, 0, 0, - partitionMinValue, partitionMaxValue, true, true)); + constructTextConditions(partition, prefix, 0, 0, + partitionMinValue, partitionMaxValue, true, true); partitions.add(partition); return partitions; } @@ -269,7 +228,7 @@ protected List partitionTextColumn() { List splitPoints = new LinkedList(); BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD), - new BigDecimal(numberPartitions)); + new BigDecimal(numberPartitions)); if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { splitSize = NUMERIC_MIN_INCREMENT; } @@ -289,12 +248,12 @@ protected List partitionTextColumn() { } if (splitPoints.size() == 0 - || splitPoints.get(0).compareTo(minStringBD) != 0) { + || splitPoints.get(0).compareTo(minStringBD) != 0) { splitPoints.add(0, minStringBD); } if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0 - || splitPoints.size() == 1) { + || splitPoints.size() == 1) { splitPoints.add(maxStringBD); } @@ -304,8 +263,8 @@ protected List partitionTextColumn() { BigDecimal end = splitPoints.get(i); GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(constructTextConditions(prefix, start, end, - partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1)); + constructTextConditions(partition, prefix, start, end, + partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1); partitions.add(partition); start = end; @@ -315,7 +274,7 @@ protected List partitionTextColumn() { } - protected List partitionIntegerColumn() { + protected List partitionIntegerColumn(int sqlType) { List partitions = new LinkedList(); long minValue = partitionMinValue == null ? Long.MIN_VALUE @@ -337,20 +296,18 @@ protected List partitionIntegerColumn() { upperBound += (i <= remainder) ? 1 : 0; GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructConditions(lowerBound, upperBound, false)); + constructConditions(partition, sqlType, lowerBound, upperBound, false); partitions.add(partition); } GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructConditions(upperBound, maxValue, true)); + constructConditions(partition, sqlType, upperBound, maxValue, true); partitions.add(partition); return partitions; } - protected List partitionFloatingPointColumn() { + protected List partitionFloatingPointColumn(int sqlType) { List partitions = new LinkedList(); @@ -367,20 +324,18 @@ protected List partitionFloatingPointColumn() { upperBound = lowerBound + interval; GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructConditions(lowerBound, upperBound, false)); + constructConditions(partition, sqlType, lowerBound, upperBound, false); partitions.add(partition); } GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions( - constructConditions(upperBound, maxValue, true)); + constructConditions(partition, sqlType, upperBound, maxValue, true); partitions.add(partition); return partitions; } - protected List partitionNumericColumn() { + protected List partitionNumericColumn(int sqlType) { List partitions = new LinkedList(); // Having one end in null is not supported if (partitionMinValue == null || partitionMaxValue == null) { @@ -393,7 +348,7 @@ protected List partitionNumericColumn() { // Having one single value means that we can create only one single split if(minValue.equals(maxValue)) { GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(constructConditions(minValue)); + constructConditions(partition, sqlType, minValue); partitions.add(partition); return partitions; } @@ -420,17 +375,20 @@ protected List partitionNumericColumn() { } // Turn the split points into a set of intervals. - BigDecimal start = splitPoints.get(0); - for (int i = 1; i < splitPoints.size(); i++) { - BigDecimal end = splitPoints.get(i); + BigDecimal end = splitPoints.get(0); + for (int i = 0; i < numberPartitions - 1; i++) { + BigDecimal start = splitPoints.get(i); + end = splitPoints.get(i + 1); GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1)); + constructConditions(partition, sqlType, start, end, false); partitions.add(partition); - - start = end; } + GenericJdbcPartition partition = new GenericJdbcPartition(); + constructConditions(partition, sqlType, end, maxValue, true); + partitions.add(partition); + return partitions; } @@ -447,9 +405,9 @@ protected List partitionBooleanColumn() { if(minValue.equals(maxValue)) { GenericJdbcPartition partition = new GenericJdbcPartition(); - conditions.append(partitionColumnName).append(" = ") - .append(maxValue); - partition.setConditions(conditions.toString()); + conditions.append(partitionColumnName).append(" = ?"); + partition.setCondition(conditions.toString()); + partition.addParam(Types.BOOLEAN, maxValue); partitions.add(partition); return partitions; } @@ -459,18 +417,18 @@ protected List partitionBooleanColumn() { if (partitionMinValue == null) { conditions = new StringBuilder(); conditions.append(partitionColumnName).append(" IS NULL"); - partition.setConditions(conditions.toString()); + partition.setCondition(conditions.toString()); partitions.add(partition); } partition = new GenericJdbcPartition(); conditions = new StringBuilder(); conditions.append(partitionColumnName).append(" = TRUE"); - partition.setConditions(conditions.toString()); + partition.setCondition(conditions.toString()); partitions.add(partition); partition = new GenericJdbcPartition(); conditions = new StringBuilder(); conditions.append(partitionColumnName).append(" = FALSE"); - partition.setConditions(conditions.toString()); + partition.setCondition(conditions.toString()); partitions.add(partition); return partitions; } @@ -493,54 +451,53 @@ protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) { } } - protected String constructConditions( - Object lowerBound, Object upperBound, boolean lastOne) { + protected void constructConditions( + GenericJdbcPartition partition, int sqlType, Object lowerBound, Object upperBound, boolean lastOne) { StringBuilder conditions = new StringBuilder(); - conditions.append(lowerBound); - conditions.append(" <= "); + partition.addParam(sqlType, lowerBound); + conditions.append("? <= "); conditions.append(partitionColumnName); conditions.append(" AND "); conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append(upperBound); - return conditions.toString(); + conditions.append(lastOne ? " <= ?" : " < ?"); + partition.addParam(sqlType, upperBound); + + partition.setCondition(conditions.toString()); } - protected String constructConditions(Object value) { - return new StringBuilder() - .append(partitionColumnName) - .append(" = ") - .append(value) - .toString() - ; + protected void constructConditions(GenericJdbcPartition partition, int sqlType, Object value) { + String condition = partitionColumnName + " = ?"; + partition.addParam(sqlType, value); + partition.setCondition(condition); } - protected String constructDateConditions(SimpleDateFormat sdf, - Object lowerBound, Object upperBound, boolean lastOne) { + protected void constructDateConditions(GenericJdbcPartition partition, int dateType, + Object lowerBound, Object upperBound, boolean lastOne) { StringBuilder conditions = new StringBuilder(); - conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\''); - conditions.append(" <= "); + partition.addParam(dateType, lowerBound); + conditions.append("? <= "); conditions.append(partitionColumnName); conditions.append(" AND "); conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\''); - return conditions.toString(); + conditions.append(lastOne ? " <= ?" : " < ?"); + partition.setCondition(conditions.toString()); + partition.addParam(dateType, upperBound); } - protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound, - String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) { + protected void constructTextConditions(GenericJdbcPartition partition, String prefix, Object lowerBound, Object upperBound, + String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) { StringBuilder conditions = new StringBuilder(); String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound); String ubString = prefix + bigDecimalToText((BigDecimal)upperBound); - conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\''); - conditions.append(" <= "); + partition.addParam(Types.VARCHAR, firstOne ? lowerStringBound : lbString); + conditions.append("? <= "); conditions.append(partitionColumnName); conditions.append(" AND "); conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\''); - return conditions.toString(); + conditions.append(lastOne ? " <= ?" : " < ?"); + partition.addParam(Types.VARCHAR, lastOne ? upperStringBound : ubString); + + partition.setCondition(conditions.toString()); } /** @@ -607,5 +564,4 @@ private String bigDecimalToText(BigDecimal bd) { return sb.toString(); } - } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java index 3b521280..9847d5e3 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java @@ -36,6 +36,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.sql.Types; + import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -54,7 +56,7 @@ public class TestExtractor { public TestExtractor() { tableName = getClass().getSimpleName().toUpperCase(); - nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL"; + nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL"; } @BeforeMethod(alwaysRun = true) @@ -63,13 +65,13 @@ public void setUp() { if (!executor.existTable(tableName)) { executor.executeUpdate("CREATE TABLE " - + executor.encloseIdentifier(tableName) - + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)"); + + executor.encloseIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)"); for (int i = 0; i < NUMBER_OF_ROWS; i++) { int value = START + i; String sql = "INSERT INTO " + executor.encloseIdentifier(tableName) - + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')"; + + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')"; executor.executeUpdate(sql); } } @@ -93,7 +95,7 @@ public void testQuery() throws Exception { FromJobConfiguration jobConfig = new FromJobConfiguration(); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, - "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}"); + "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}"); GenericJdbcPartition partition; @@ -107,15 +109,21 @@ public void testQuery() throws Exception { ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); partition = new GenericJdbcPartition(); - partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); + partition.setCondition("? <= DCOL AND DCOL < ?"); + partition.addParam(Types.NUMERIC, -50.0); + partition.addParam(Types.NUMERIC, -16.6666666666666665); extractor.extract(extractorContext, linkConfig, jobConfig, partition); partition = new GenericJdbcPartition(); - partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667"); + partition.setCondition("? <= DCOL AND DCOL < ?"); + partition.addParam(Types.NUMERIC, -16.6666666666666665); + partition.addParam(Types.NUMERIC, 16.6666666666666667); extractor.extract(extractorContext, linkConfig, jobConfig, partition); partition = new GenericJdbcPartition(); - partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0"); + partition.setCondition("? <= DCOL AND DCOL <= ?"); + partition.addParam(Types.NUMERIC, 16.6666666666666667); + partition.addParam(Types.NUMERIC, 50.0); extractor.extract(extractorContext, linkConfig, jobConfig, partition); } @@ -132,8 +140,8 @@ public void testSubquery() throws Exception { FromJobConfiguration jobConfig = new FromJobConfiguration(); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, - "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM " - + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM " + + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); GenericJdbcPartition partition; @@ -147,15 +155,21 @@ public void testSubquery() throws Exception { ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); partition = new GenericJdbcPartition(); - partition.setConditions("-50 <= ICOL AND ICOL < -16"); + partition.setCondition("? <= ICOL AND ICOL < ?"); + partition.addParam(Types.NUMERIC, -50); + partition.addParam(Types.NUMERIC, -16); extractor.extract(extractorContext, linkConfig, jobConfig, partition); partition = new GenericJdbcPartition(); - partition.setConditions("-16 <= ICOL AND ICOL < 17"); + partition.setCondition("? <= ICOL AND ICOL < ?"); + partition.addParam(Types.NUMERIC, -16); + partition.addParam(Types.NUMERIC, 17); extractor.extract(extractorContext, linkConfig, jobConfig, partition); partition = new GenericJdbcPartition(); - partition.setConditions("17 <= ICOL AND ICOL < 50"); + partition.setCondition("? <= ICOL AND ICOL < ?"); + partition.addParam(Types.NUMERIC, 17); + partition.addParam(Types.NUMERIC, 50); extractor.extract(extractorContext, linkConfig, jobConfig, partition); } @@ -172,9 +186,9 @@ public void testIncorrectSchemaColumnSize() throws Exception { FromJobConfiguration jobConfig = new FromJobConfiguration(); context.setString( - GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, - "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM " - + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM " + + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); GenericJdbcPartition partition = new GenericJdbcPartition(); @@ -183,7 +197,9 @@ public void testIncorrectSchemaColumnSize() throws Exception { Schema schema = new Schema("TestIncorrectColumns"); ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); - partition.setConditions("-50 <= ICOL AND ICOL < -16"); + partition.setCondition("? <= ICOL AND ICOL < ?"); + partition.addParam(Types.NUMERIC, -50); + partition.addParam(Types.NUMERIC, -16); extractor.extract(extractorContext, linkConfig, jobConfig, partition); } @@ -193,7 +209,7 @@ public void testNullValueExtracted() throws Exception { if (!executor.existTable(nullDataTableName)) { executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(nullDataTableName) - + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)"); + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)"); for (int i = 0; i < NUMBER_OF_ROWS; i++) { int value = i; @@ -210,7 +226,7 @@ public void testNullValueExtracted() throws Exception { FromJobConfiguration jobConfig = new FromJobConfiguration(); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, - "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}"); + "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}"); Extractor extractor = new GenericJdbcExtractor(); DummyNullDataWriter writer = new DummyNullDataWriter(); @@ -220,7 +236,9 @@ public void testNullValueExtracted() throws Exception { ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user"); GenericJdbcPartition partition = new GenericJdbcPartition(); - partition.setConditions("-50 <= ICOL AND ICOL < -16"); + partition.setCondition("? <= ICOL AND ICOL < ?"); + partition.addParam(Types.NUMERIC, -50); + partition.addParam(Types.NUMERIC, -16); extractor.extract(extractorContext, linkConfig, jobConfig, partition); } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java new file mode 100644 index 00000000..0a979393 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import static org.testng.Assert.*; + +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.math.BigDecimal; +import java.sql.Types; + +public class TestGenericJdbcPartition { + + @Test + public void testSerialization() throws Exception { + GenericJdbcPartition expectedPartition = new GenericJdbcPartition(); + expectedPartition.setCondition("TEST_CONDITION"); + + expectedPartition.addParam(Types.TINYINT, 1L); + expectedPartition.addParam(Types.SMALLINT, 2L); + expectedPartition.addParam(Types.INTEGER, 3L); + expectedPartition.addParam(Types.BIGINT, 4L); + + expectedPartition.addParam(Types.REAL, 1.1); + expectedPartition.addParam(Types.FLOAT, 1.2); + + expectedPartition.addParam(Types.DOUBLE, new BigDecimal(1.3)); + expectedPartition.addParam(Types.NUMERIC, new BigDecimal(1.4)); + expectedPartition.addParam(Types.DECIMAL, new BigDecimal(1.5)); + + expectedPartition.addParam(Types.BIT, true); + expectedPartition.addParam(Types.BOOLEAN, false); + + expectedPartition.addParam(Types.DATE, java.sql.Date.valueOf("2015-12-14")); + expectedPartition.addParam(Types.TIME, java.sql.Time.valueOf("00:00:00")); + expectedPartition.addParam(Types.TIMESTAMP, java.sql.Timestamp.valueOf("2015-12-16 00:00:00")); + + expectedPartition.addParam(Types.CHAR, "a"); + expectedPartition.addParam(Types.VARCHAR, "aa"); + expectedPartition.addParam(Types.LONGVARCHAR, "aaa"); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(byteArrayOutputStream); + + expectedPartition.write(dataOutput); + + GenericJdbcPartition actualPartition = new GenericJdbcPartition(); + + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteArrayInputStream); + + actualPartition.readFields(dataInput); + + assertEquals(actualPartition, expectedPartition); + } + +} diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java index 3a767ab3..b250f945 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java @@ -22,8 +22,12 @@ import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.TimeZone; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; @@ -249,6 +253,37 @@ public void testNumericUnevenPartition() throws Exception { }); } + // We may round the quotient when calculating splitsize, this tests ensure we catch those values in the final partition + @Test + public void testNumericInaccurateSplit() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(1))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(13))); + + LinkConfiguration linkConfig = new LinkConfiguration(); + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 11, null, "test_user"); + List partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig); + + verifyResult(partitions, new String[] { + "1 <= DCOL AND DCOL < 2", + "2 <= DCOL AND DCOL < 3", + "3 <= DCOL AND DCOL < 4", + "4 <= DCOL AND DCOL < 5", + "5 <= DCOL AND DCOL < 6", + "6 <= DCOL AND DCOL < 7", + "7 <= DCOL AND DCOL < 8", + "8 <= DCOL AND DCOL < 9", + "9 <= DCOL AND DCOL < 10", + "10 <= DCOL AND DCOL < 11", + "11 <= DCOL AND DCOL <= 13" + }); + } + @Test public void testNumericSinglePartition() throws Exception { MutableContext context = new MutableMapContext(); @@ -276,10 +311,9 @@ public void testDatePartition() throws Exception { context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, - Date.valueOf("2004-10-20").toString()); - context.setString(GenericJdbcConnectorConstants - .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17") - .toString()); + String.valueOf(Date.valueOf("2004-10-20").getTime())); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(Date.valueOf("2013-10-17").getTime())); LinkConfiguration linkConfig = new LinkConfiguration(); @@ -300,15 +334,21 @@ public void testDatePartition() throws Exception { @Test public void testTimePartition() throws Exception { + DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss"); + timeDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + Long startTime = timeDateFormat.parse("01:01:01").getTime(); + Long endTime = timeDateFormat.parse("10:40:50").getTime(); + MutableContext context = new MutableMapContext(); context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, - Time.valueOf("01:01:01").toString()); + String.valueOf(startTime)); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, - Time.valueOf("10:40:50").toString()); + String.valueOf(endTime)); LinkConfiguration linkConfig = new LinkConfiguration(); @@ -327,15 +367,21 @@ public void testTimePartition() throws Exception { @Test public void testTimestampPartition() throws Exception { + DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + timestampDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + Long startTime = timestampDateFormat.parse("2013-01-01 01:01:01.123").getTime(); + Long endTime = timestampDateFormat.parse("2013-12-31 10:40:50.654").getTime(); + MutableContext context = new MutableMapContext(); context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, - Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); + String.valueOf(startTime)); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, - Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); + String.valueOf(endTime)); LinkConfiguration linkConfig = new LinkConfiguration(); FromJobConfiguration jobConfig = new FromJobConfiguration(); @@ -513,8 +559,8 @@ private void verifyResult(List partitions, Iterator iterator = partitions.iterator(); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], - ((GenericJdbcPartition)iterator.next()).getConditions()); + GenericJdbcPartition partition = ((GenericJdbcPartition) iterator.next()); + assertEquals(partition.toString(), expected[i]); } } -} +} \ No newline at end of file diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java index 2565c3f0..70b6eff7 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java @@ -89,4 +89,10 @@ public void testStagedTransfer() throws Exception { dropTable(); } + // shorter name for oracle + @Override + public TableName getTableName() { + return new TableName("stagedrdbms"); + } + }