5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 21:03:00 +08:00

SQOOP-2727: Sqoop2: Use PreparedStatements for constructing GenericJDBCPartitions

(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-12-16 20:35:17 +01:00
parent 0ab7c05e11
commit 6e33d6afb4
10 changed files with 539 additions and 213 deletions

View File

@ -371,9 +371,9 @@ private void insertObjectIntoPreparedStatement(PreparedStatement preparedStateme
} else if (value instanceof Character) {
preparedStatement.setString(parameterIndex, value.toString());
} else if (value instanceof Timestamp) {
preparedStatement.setString(parameterIndex, value.toString());
preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
} else if (value instanceof BigDecimal) {
preparedStatement.setString(parameterIndex, value.toString());
preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
} else {
preparedStatement.setObject(parameterIndex, value);
}

View File

@ -50,7 +50,7 @@
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings({
"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"})
public class GenericJdbcExecutor {
public class GenericJdbcExecutor implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(GenericJdbcExecutor.class);

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@ -35,7 +36,8 @@
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
@edu.umd.cs.findbugs.annotations.SuppressWarnings("SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE")
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
{"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"})
public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, GenericJdbcPartition> {
public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class);
@ -43,19 +45,14 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
private long rowsRead = 0;
@Override
public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
String conditions = partition.getConditions();
query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
LOG.info("Using query: " + query);
rowsRead = 0;
Schema schema = context.getSchema();
Column[] schemaColumns = schema.getColumnsArray();
try (Statement statement = executor.getConnection().createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet resultSet = statement.executeQuery(query);) {
try (
GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
PreparedStatement preparedStatement = createPreparedStatement(context, executor, partition);
ResultSet resultSet = preparedStatement.executeQuery()
) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
if (schemaColumns.length != columnCount) {
@ -96,8 +93,6 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
} finally {
executor.close();
}
}
@ -106,4 +101,16 @@ public long getRowsRead() {
return rowsRead;
}
private PreparedStatement createPreparedStatement(ExtractorContext context, GenericJdbcExecutor executor, GenericJdbcPartition partition) throws SQLException {
String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
String condition = partition.getCondition();
query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, condition);
LOG.info("Creating PreparedStatement with query: " + query);
PreparedStatement preparedStatement = executor.getConnection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
partition.addParamsToPreparedStatement(preparedStatement);
return preparedStatement;
}
}

View File

@ -259,10 +259,6 @@ private void configurePartitionProperties(MutableContext context, LinkConfigurat
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
}
// Boundaries for the job
String min = rs.getString(1);
String max = rs.getString(2);
// Type of the partition column
ResultSetMetaData rsmd = rs.getMetaData();
if (rsmd.getColumnCount() != 2) {
@ -270,6 +266,28 @@ private void configurePartitionProperties(MutableContext context, LinkConfigurat
}
int columnType = rsmd.getColumnType(1);
String min;
String max;
// Boundaries for the job
switch (columnType) {
case Types.DATE:
min = String.valueOf(rs.getDate(1).getTime());
max = String.valueOf(rs.getDate(2).getTime());
break;
case Types.TIMESTAMP:
min = String.valueOf(rs.getTimestamp(1).getTime());
max = String.valueOf(rs.getTimestamp(2).getTime());
break;
case Types.TIME:
min = String.valueOf(rs.getTime(1).getTime());
max = String.valueOf(rs.getTime(2).getTime());
break;
default:
min = rs.getString(1);
max = rs.getString(2);
}
LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType);
context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);

View File

@ -20,34 +20,231 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.error.code.GenericJdbcConnectorError;
import org.apache.sqoop.job.etl.Partition;
@edu.umd.cs.findbugs.annotations.SuppressWarnings("DB_DUPLICATE_SWITCH_CLAUSES")
public class GenericJdbcPartition extends Partition {
private String conditions;
private String condition;
private List<Integer> sqlTypes;
private List<Object> params;
public void setConditions(String conditions) {
this.conditions = conditions;
public GenericJdbcPartition() {
sqlTypes = new ArrayList<>();
params = new ArrayList<>();
}
public String getConditions() {
return conditions;
public void setCondition(String condition) {
this.condition = condition;
}
public String getCondition() {
return condition;
}
public void addParamsToPreparedStatement(PreparedStatement ps) throws SQLException {
for (int i = 0; i < params.size(); i++) {
ps.setObject(i + 1, params.get(i));
}
}
public void addParam(int sqlType, Object param) {
sqlTypes.add(sqlType);
params.add(param);
}
public List<Object> getParams() {
return params;
}
public List<Integer> getSqlTypes() {
return sqlTypes;
}
@Override
public void readFields(DataInput in) throws IOException {
conditions = in.readUTF();
int numParams = in.readInt();
condition = in.readUTF();
for (int i = 0; i < numParams; i++) {
int type = in.readInt();
sqlTypes.add(type);
switch (type) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
params.add(in.readLong());
break;
case Types.REAL:
case Types.FLOAT:
params.add(in.readDouble());
break;
case Types.DOUBLE:
case Types.NUMERIC:
case Types.DECIMAL:
params.add(new BigDecimal(in.readUTF()));
break;
case Types.BIT:
case Types.BOOLEAN:
params.add(in.readBoolean());
break;
case Types.DATE:
params.add(new java.sql.Date(in.readLong()));
break;
case Types.TIME:
params.add(new java.sql.Time(in.readLong()));
break;
case Types.TIMESTAMP:
params.add(new java.sql.Timestamp(in.readLong()));
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
params.add(in.readUTF());
break;
default:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
String.valueOf(type));
}
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(conditions);
out.writeInt(sqlTypes.size());
out.writeUTF(condition);
for (int i = 0; i < sqlTypes.size(); i++) {
int type = sqlTypes.get(i);
out.writeInt(type);
switch (type) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
out.writeLong((Long) params.get(i));
break;
case Types.REAL:
case Types.FLOAT:
out.writeDouble((Double) params.get(i));
break;
case Types.DOUBLE:
case Types.NUMERIC:
case Types.DECIMAL:
out.writeUTF(params.get(i).toString());
break;
case Types.BIT:
case Types.BOOLEAN:
out.writeBoolean((Boolean) params.get(i));
break;
case Types.DATE:
out.writeLong(((java.sql.Date) params.get(i)).getTime());
break;
case Types.TIME:
out.writeLong(((java.sql.Time) params.get(i)).getTime());
break;
case Types.TIMESTAMP:
out.writeLong(((java.sql.Timestamp) params.get(i)).getTime());
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
out.writeUTF((String) params.get(i));
break;
default:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
String.valueOf(type));
}
}
}
@Override
public String toString() {
return conditions;
public String toString(){
return asStringWithTimezone(TimeZone.getTimeZone("UTC"));
}
}
public String asStringWithTimezone(TimeZone timeZone) {
DateFormat dateDateFormat = new SimpleDateFormat("yyyy-MM-dd");
DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
timeDateFormat.setTimeZone(timeZone);
DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
timestampDateFormat.setTimeZone(timeZone);
String condition = getCondition();
for (int j = 0; j < getParams().size(); j++) {
Object param = getParams().get(j);
int sqlType = getSqlTypes().get(j);
switch (sqlType) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.REAL:
case Types.FLOAT:
case Types.DOUBLE:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.BIT:
case Types.BOOLEAN:
condition = condition.replaceFirst("\\?", param.toString());
break;
case Types.DATE:
condition = condition.replaceFirst("\\?", "'" + dateDateFormat.format(param) + "'");
break;
case Types.TIME:
condition = condition.replaceFirst("\\?", "'" + timeDateFormat.format(param) + "'");
break;
case Types.TIMESTAMP:
condition = condition.replaceFirst("\\?", "'" + timestampDateFormat.format(param) + "'");
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
default:
condition = condition.replaceFirst("\\?", "'" + param.toString() + "'");
break;
}
}
return condition;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GenericJdbcPartition that = (GenericJdbcPartition) o;
if (getCondition() != null ? !getCondition().equals(that.getCondition())
: that.getCondition() != null)
return false;
if (!getSqlTypes().equals(that.getSqlTypes())) return false;
return getParams().equals(that.getParams());
}
@Override
public int hashCode() {
int result = getCondition() != null ? getCondition().hashCode() : 0;
result = 31 * result + getSqlTypes().hashCode();
result = 31 * result + getParams().hashCode();
return result;
}
}

View File

@ -49,7 +49,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
@Override
public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfig,
FromJobConfiguration fromJobConfig) {
FromJobConfiguration fromJobConfig) {
List<Partition> partitions = new LinkedList<Partition>();
numberPartitions = context.getMaxPartitions();
@ -65,14 +65,14 @@ public List<Partition> getPartitions(PartitionerContext context, LinkConfigurati
if (partitionMinValue == null && partitionMaxValue == null) {
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(partitionColumnName + " IS NULL");
partition.setCondition(partitionColumnName + " IS NULL");
partitions.add(partition);
return partitions;
}
if (allowNullValueInPartitionColumn) {
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(partitionColumnName + " IS NULL");
partition.setCondition(partitionColumnName + " IS NULL");
partitions.add(partition);
if (numberPartitions > 1) {
numberPartitions -= 1;
@ -83,48 +83,48 @@ public List<Partition> getPartitions(PartitionerContext context, LinkConfigurati
}
switch (partitionColumnType) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
// Integer column
partitions.addAll(partitionIntegerColumn());
break;
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
// Integer column
partitions.addAll(partitionIntegerColumn(partitionColumnType));
break;
case Types.REAL:
case Types.FLOAT:
case Types.DOUBLE:
// Floating point column
partitions.addAll(partitionFloatingPointColumn());
break;
case Types.REAL:
case Types.FLOAT:
case Types.DOUBLE:
// Floating point column
partitions.addAll(partitionFloatingPointColumn(partitionColumnType));
break;
case Types.NUMERIC:
case Types.DECIMAL:
// Decimal column
partitions.addAll(partitionNumericColumn());
break;
case Types.NUMERIC:
case Types.DECIMAL:
// Decimal column
partitions.addAll(partitionNumericColumn(partitionColumnType));
break;
case Types.BIT:
case Types.BOOLEAN:
// Boolean column
return partitionBooleanColumn();
case Types.BIT:
case Types.BOOLEAN:
// Boolean column
return partitionBooleanColumn();
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
// Date time column
partitions.addAll(partitionDateTimeColumn());
break;
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
// Date time column
partitions.addAll(partitionDateTimeColumn());
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
// Text column
partitions.addAll(partitionTextColumn());
break;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
// Text column
partitions.addAll(partitionTextColumn());
break;
default:
throw new SqoopException(
default:
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
String.valueOf(partitionColumnType));
}
@ -135,33 +135,8 @@ public List<Partition> getPartitions(PartitionerContext context, LinkConfigurati
protected List<Partition> partitionDateTimeColumn() {
List<Partition> partitions = new LinkedList<Partition>();
long minDateValue = 0;
long maxDateValue = 0;
SimpleDateFormat sdf = null;
switch(partitionColumnType) {
case Types.DATE:
sdf = new SimpleDateFormat("yyyy-MM-dd");
minDateValue = Date.valueOf(partitionMinValue).getTime();
maxDateValue = Date.valueOf(partitionMaxValue).getTime();
break;
case Types.TIME:
sdf = new SimpleDateFormat("HH:mm:ss");
minDateValue = Time.valueOf(partitionMinValue).getTime();
maxDateValue = Time.valueOf(partitionMaxValue).getTime();
break;
// Here should be the type of Types.TIMESTAMP:
default:
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
break;
}
minDateValue += TimeZone.getDefault().getOffset(minDateValue);
maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
long minDateValue = Long.parseLong(partitionMinValue);
long maxDateValue = Long.parseLong(partitionMaxValue);
long interval = (maxDateValue - minDateValue) / numberPartitions;
long remainder = (maxDateValue - minDateValue) % numberPartitions;
@ -173,57 +148,41 @@ protected List<Partition> partitionDateTimeColumn() {
long lowerBound;
long upperBound = minDateValue;
Object objLB = null;
Object objUB = null;
GenericJdbcPartition partition;
for (int i = 1; i < numberPartitions; i++) {
lowerBound = upperBound;
upperBound = lowerBound + interval;
upperBound += (i <= remainder) ? 1 : 0;
partition = new GenericJdbcPartition();
switch(partitionColumnType) {
case Types.DATE:
objLB = new Date(lowerBound);
objUB = new Date(upperBound);
constructDateConditions(partition, Types.DATE, new Date(lowerBound), new Date(upperBound), false);
break;
case Types.TIME:
objLB = new Time(lowerBound);
objUB = new Time(upperBound);
constructDateConditions(partition, Types.TIME, new Time(lowerBound), new Time(upperBound), false);
break;
// Here should be the type of Types.TIMESTAMP:
default:
objLB = new Timestamp(lowerBound);
objUB = new Timestamp(upperBound);
constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(lowerBound), new Timestamp(upperBound), false);
break;
}
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructDateConditions(sdf, objLB, objUB, false));
partitions.add(partition);
}
partition = new GenericJdbcPartition();
switch(partitionColumnType) {
case Types.DATE:
objLB = new Date(upperBound);
objUB = new Date(maxDateValue);
constructDateConditions(partition, Types.DATE, new Date(upperBound), new Date(maxDateValue), true);
break;
case Types.TIME:
objLB = new Time(upperBound);
objUB = new Time(maxDateValue);
constructDateConditions(partition, Types.TIME, new Time(upperBound), new Time(maxDateValue), true);
break;
// Here should be the type of Types.TIMESTAMP:
default:
objLB = new Timestamp(upperBound);
objUB = new Timestamp(maxDateValue);
constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(upperBound), new Timestamp(maxDateValue), true);
break;
}
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructDateConditions(sdf, objLB, objUB, true));
partitions.add(partition);
return partitions;
}
@ -236,7 +195,7 @@ protected List<Partition> partitionTextColumn() {
// Remove common prefix if any as it does not affect outcome.
int maxPrefixLen = Math.min(partitionMinValue.length(),
partitionMaxValue.length());
partitionMaxValue.length());
// Calculate common prefix length
int cpLen = 0;
@ -259,8 +218,8 @@ protected List<Partition> partitionTextColumn() {
// Having one single value means that we can create only one single split
if(minStringBD.equals(maxStringBD)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(constructTextConditions(prefix, 0, 0,
partitionMinValue, partitionMaxValue, true, true));
constructTextConditions(partition, prefix, 0, 0,
partitionMinValue, partitionMaxValue, true, true);
partitions.add(partition);
return partitions;
}
@ -269,7 +228,7 @@ protected List<Partition> partitionTextColumn() {
List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
new BigDecimal(numberPartitions));
new BigDecimal(numberPartitions));
if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
splitSize = NUMERIC_MIN_INCREMENT;
}
@ -289,12 +248,12 @@ protected List<Partition> partitionTextColumn() {
}
if (splitPoints.size() == 0
|| splitPoints.get(0).compareTo(minStringBD) != 0) {
|| splitPoints.get(0).compareTo(minStringBD) != 0) {
splitPoints.add(0, minStringBD);
}
if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
|| splitPoints.size() == 1) {
|| splitPoints.size() == 1) {
splitPoints.add(maxStringBD);
}
@ -304,8 +263,8 @@ protected List<Partition> partitionTextColumn() {
BigDecimal end = splitPoints.get(i);
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(constructTextConditions(prefix, start, end,
partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
constructTextConditions(partition, prefix, start, end,
partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1);
partitions.add(partition);
start = end;
@ -315,7 +274,7 @@ protected List<Partition> partitionTextColumn() {
}
protected List<Partition> partitionIntegerColumn() {
protected List<Partition> partitionIntegerColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
long minValue = partitionMinValue == null ? Long.MIN_VALUE
@ -337,20 +296,18 @@ protected List<Partition> partitionIntegerColumn() {
upperBound += (i <= remainder) ? 1 : 0;
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructConditions(lowerBound, upperBound, false));
constructConditions(partition, sqlType, lowerBound, upperBound, false);
partitions.add(partition);
}
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructConditions(upperBound, maxValue, true));
constructConditions(partition, sqlType, upperBound, maxValue, true);
partitions.add(partition);
return partitions;
}
protected List<Partition> partitionFloatingPointColumn() {
protected List<Partition> partitionFloatingPointColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
@ -367,20 +324,18 @@ protected List<Partition> partitionFloatingPointColumn() {
upperBound = lowerBound + interval;
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructConditions(lowerBound, upperBound, false));
constructConditions(partition, sqlType, lowerBound, upperBound, false);
partitions.add(partition);
}
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(
constructConditions(upperBound, maxValue, true));
constructConditions(partition, sqlType, upperBound, maxValue, true);
partitions.add(partition);
return partitions;
}
protected List<Partition> partitionNumericColumn() {
protected List<Partition> partitionNumericColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
// Having one end in null is not supported
if (partitionMinValue == null || partitionMaxValue == null) {
@ -393,7 +348,7 @@ protected List<Partition> partitionNumericColumn() {
// Having one single value means that we can create only one single split
if(minValue.equals(maxValue)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(constructConditions(minValue));
constructConditions(partition, sqlType, minValue);
partitions.add(partition);
return partitions;
}
@ -420,17 +375,20 @@ protected List<Partition> partitionNumericColumn() {
}
// Turn the split points into a set of intervals.
BigDecimal start = splitPoints.get(0);
for (int i = 1; i < splitPoints.size(); i++) {
BigDecimal end = splitPoints.get(i);
BigDecimal end = splitPoints.get(0);
for (int i = 0; i < numberPartitions - 1; i++) {
BigDecimal start = splitPoints.get(i);
end = splitPoints.get(i + 1);
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
constructConditions(partition, sqlType, start, end, false);
partitions.add(partition);
start = end;
}
GenericJdbcPartition partition = new GenericJdbcPartition();
constructConditions(partition, sqlType, end, maxValue, true);
partitions.add(partition);
return partitions;
}
@ -447,9 +405,9 @@ protected List<Partition> partitionBooleanColumn() {
if(minValue.equals(maxValue)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
conditions.append(partitionColumnName).append(" = ")
.append(maxValue);
partition.setConditions(conditions.toString());
conditions.append(partitionColumnName).append(" = ?");
partition.setCondition(conditions.toString());
partition.addParam(Types.BOOLEAN, maxValue);
partitions.add(partition);
return partitions;
}
@ -459,18 +417,18 @@ protected List<Partition> partitionBooleanColumn() {
if (partitionMinValue == null) {
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" IS NULL");
partition.setConditions(conditions.toString());
partition.setCondition(conditions.toString());
partitions.add(partition);
}
partition = new GenericJdbcPartition();
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" = TRUE");
partition.setConditions(conditions.toString());
partition.setCondition(conditions.toString());
partitions.add(partition);
partition = new GenericJdbcPartition();
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" = FALSE");
partition.setConditions(conditions.toString());
partition.setCondition(conditions.toString());
partitions.add(partition);
return partitions;
}
@ -493,54 +451,53 @@ protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
}
}
protected String constructConditions(
Object lowerBound, Object upperBound, boolean lastOne) {
protected void constructConditions(
GenericJdbcPartition partition, int sqlType, Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
conditions.append(lowerBound);
conditions.append(" <= ");
partition.addParam(sqlType, lowerBound);
conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
conditions.append(lastOne ? " <= " : " < ");
conditions.append(upperBound);
return conditions.toString();
conditions.append(lastOne ? " <= ?" : " < ?");
partition.addParam(sqlType, upperBound);
partition.setCondition(conditions.toString());
}
protected String constructConditions(Object value) {
return new StringBuilder()
.append(partitionColumnName)
.append(" = ")
.append(value)
.toString()
;
protected void constructConditions(GenericJdbcPartition partition, int sqlType, Object value) {
String condition = partitionColumnName + " = ?";
partition.addParam(sqlType, value);
partition.setCondition(condition);
}
protected String constructDateConditions(SimpleDateFormat sdf,
Object lowerBound, Object upperBound, boolean lastOne) {
protected void constructDateConditions(GenericJdbcPartition partition, int dateType,
Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
conditions.append(" <= ");
partition.addParam(dateType, lowerBound);
conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
conditions.append(lastOne ? " <= " : " < ");
conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
return conditions.toString();
conditions.append(lastOne ? " <= ?" : " < ?");
partition.setCondition(conditions.toString());
partition.addParam(dateType, upperBound);
}
protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
protected void constructTextConditions(GenericJdbcPartition partition, String prefix, Object lowerBound, Object upperBound,
String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
conditions.append(" <= ");
partition.addParam(Types.VARCHAR, firstOne ? lowerStringBound : lbString);
conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
conditions.append(lastOne ? " <= " : " < ");
conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
return conditions.toString();
conditions.append(lastOne ? " <= ?" : " < ?");
partition.addParam(Types.VARCHAR, lastOne ? upperStringBound : ubString);
partition.setCondition(conditions.toString());
}
/**
@ -607,5 +564,4 @@ private String bigDecimalToText(BigDecimal bd) {
return sb.toString();
}
}

View File

@ -36,6 +36,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.sql.Types;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@ -54,7 +56,7 @@ public class TestExtractor {
public TestExtractor() {
tableName = getClass().getSimpleName().toUpperCase();
nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
}
@BeforeMethod(alwaysRun = true)
@ -63,13 +65,13 @@ public void setUp() {
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
+ executor.encloseIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+ executor.encloseIdentifier(tableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + executor.encloseIdentifier(tableName)
+ " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
+ " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
executor.executeUpdate(sql);
}
}
@ -93,7 +95,7 @@ public void testQuery() throws Exception {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
"SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
GenericJdbcPartition partition;
@ -107,15 +109,21 @@ public void testQuery() throws Exception {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
partition = new GenericJdbcPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
partition.setCondition("? <= DCOL AND DCOL < ?");
partition.addParam(Types.NUMERIC, -50.0);
partition.addParam(Types.NUMERIC, -16.6666666666666665);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
partition.setCondition("? <= DCOL AND DCOL < ?");
partition.addParam(Types.NUMERIC, -16.6666666666666665);
partition.addParam(Types.NUMERIC, 16.6666666666666667);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
partition.setCondition("? <= DCOL AND DCOL <= ?");
partition.addParam(Types.NUMERIC, 16.6666666666666667);
partition.addParam(Types.NUMERIC, 50.0);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@ -132,8 +140,8 @@ public void testSubquery() throws Exception {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
+ executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
+ executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcPartition partition;
@ -147,15 +155,21 @@ public void testSubquery() throws Exception {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
partition = new GenericJdbcPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
partition.setCondition("? <= ICOL AND ICOL < ?");
partition.addParam(Types.NUMERIC, -50);
partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("-16 <= ICOL AND ICOL < 17");
partition.setCondition("? <= ICOL AND ICOL < ?");
partition.addParam(Types.NUMERIC, -16);
partition.addParam(Types.NUMERIC, 17);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
partition.setConditions("17 <= ICOL AND ICOL < 50");
partition.setCondition("? <= ICOL AND ICOL < ?");
partition.addParam(Types.NUMERIC, 17);
partition.addParam(Types.NUMERIC, 50);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@ -172,9 +186,9 @@ public void testIncorrectSchemaColumnSize() throws Exception {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
+ executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
+ executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcPartition partition = new GenericJdbcPartition();
@ -183,7 +197,9 @@ public void testIncorrectSchemaColumnSize() throws Exception {
Schema schema = new Schema("TestIncorrectColumns");
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
partition.setConditions("-50 <= ICOL AND ICOL < -16");
partition.setCondition("? <= ICOL AND ICOL < ?");
partition.addParam(Types.NUMERIC, -50);
partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@ -193,7 +209,7 @@ public void testNullValueExtracted() throws Exception {
if (!executor.existTable(nullDataTableName)) {
executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(nullDataTableName)
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+ "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = i;
@ -210,7 +226,7 @@ public void testNullValueExtracted() throws Exception {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
"SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
"SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
Extractor extractor = new GenericJdbcExtractor();
DummyNullDataWriter writer = new DummyNullDataWriter();
@ -220,7 +236,9 @@ public void testNullValueExtracted() throws Exception {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
GenericJdbcPartition partition = new GenericJdbcPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
partition.setCondition("? <= ICOL AND ICOL < ?");
partition.addParam(Types.NUMERIC, -50);
partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.jdbc;
import static org.testng.Assert.*;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.math.BigDecimal;
import java.sql.Types;
public class TestGenericJdbcPartition {
@Test
public void testSerialization() throws Exception {
GenericJdbcPartition expectedPartition = new GenericJdbcPartition();
expectedPartition.setCondition("TEST_CONDITION");
expectedPartition.addParam(Types.TINYINT, 1L);
expectedPartition.addParam(Types.SMALLINT, 2L);
expectedPartition.addParam(Types.INTEGER, 3L);
expectedPartition.addParam(Types.BIGINT, 4L);
expectedPartition.addParam(Types.REAL, 1.1);
expectedPartition.addParam(Types.FLOAT, 1.2);
expectedPartition.addParam(Types.DOUBLE, new BigDecimal(1.3));
expectedPartition.addParam(Types.NUMERIC, new BigDecimal(1.4));
expectedPartition.addParam(Types.DECIMAL, new BigDecimal(1.5));
expectedPartition.addParam(Types.BIT, true);
expectedPartition.addParam(Types.BOOLEAN, false);
expectedPartition.addParam(Types.DATE, java.sql.Date.valueOf("2015-12-14"));
expectedPartition.addParam(Types.TIME, java.sql.Time.valueOf("00:00:00"));
expectedPartition.addParam(Types.TIMESTAMP, java.sql.Timestamp.valueOf("2015-12-16 00:00:00"));
expectedPartition.addParam(Types.CHAR, "a");
expectedPartition.addParam(Types.VARCHAR, "aa");
expectedPartition.addParam(Types.LONGVARCHAR, "aaa");
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutput dataOutput = new DataOutputStream(byteArrayOutputStream);
expectedPartition.write(dataOutput);
GenericJdbcPartition actualPartition = new GenericJdbcPartition();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
DataInput dataInput = new DataInputStream(byteArrayInputStream);
actualPartition.readFields(dataInput);
assertEquals(actualPartition, expectedPartition);
}
}

View File

@ -22,8 +22,12 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
@ -249,6 +253,37 @@ public void testNumericUnevenPartition() throws Exception {
});
}
// We may round the quotient when calculating splitsize, this tests ensure we catch those values in the final partition
@Test
public void testNumericInaccurateSplit() throws Exception {
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(1)));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(13)));
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
Partitioner partitioner = new GenericJdbcPartitioner();
PartitionerContext partitionerContext = new PartitionerContext(context, 11, null, "test_user");
List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
verifyResult(partitions, new String[] {
"1 <= DCOL AND DCOL < 2",
"2 <= DCOL AND DCOL < 3",
"3 <= DCOL AND DCOL < 4",
"4 <= DCOL AND DCOL < 5",
"5 <= DCOL AND DCOL < 6",
"6 <= DCOL AND DCOL < 7",
"7 <= DCOL AND DCOL < 8",
"8 <= DCOL AND DCOL < 9",
"9 <= DCOL AND DCOL < 10",
"10 <= DCOL AND DCOL < 11",
"11 <= DCOL AND DCOL <= 13"
});
}
@Test
public void testNumericSinglePartition() throws Exception {
MutableContext context = new MutableMapContext();
@ -276,10 +311,9 @@ public void testDatePartition() throws Exception {
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Date.valueOf("2004-10-20").toString());
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17")
.toString());
String.valueOf(Date.valueOf("2004-10-20").getTime()));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf(Date.valueOf("2013-10-17").getTime()));
LinkConfiguration linkConfig = new LinkConfiguration();
@ -300,15 +334,21 @@ public void testDatePartition() throws Exception {
@Test
public void testTimePartition() throws Exception {
DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
timeDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Long startTime = timeDateFormat.parse("01:01:01").getTime();
Long endTime = timeDateFormat.parse("10:40:50").getTime();
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Time.valueOf("01:01:01").toString());
String.valueOf(startTime));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
Time.valueOf("10:40:50").toString());
String.valueOf(endTime));
LinkConfiguration linkConfig = new LinkConfiguration();
@ -327,15 +367,21 @@ public void testTimePartition() throws Exception {
@Test
public void testTimestampPartition() throws Exception {
DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
timestampDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Long startTime = timestampDateFormat.parse("2013-01-01 01:01:01.123").getTime();
Long endTime = timestampDateFormat.parse("2013-12-31 10:40:50.654").getTime();
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
String.valueOf(startTime));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
String.valueOf(endTime));
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
@ -513,8 +559,8 @@ private void verifyResult(List<Partition> partitions,
Iterator<Partition> iterator = partitions.iterator();
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i],
((GenericJdbcPartition)iterator.next()).getConditions());
GenericJdbcPartition partition = ((GenericJdbcPartition) iterator.next());
assertEquals(partition.toString(), expected[i]);
}
}
}
}

View File

@ -89,4 +89,10 @@ public void testStagedTransfer() throws Exception {
dropTable();
}
// shorter name for oracle
@Override
public TableName getTableName() {
return new TableName("stagedrdbms");
}
}