diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java index ae1b60dd..e9cea2db 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java @@ -23,8 +23,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Blob; import java.sql.SQLException; -import java.sql.Statement; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -37,6 +37,7 @@ public class ProviderAsserts { private static final Logger LOG = Logger.getLogger(ProviderAsserts.class); + private static final String ERROR_MESSAGE_PRFIX = "Columns do not match on position: "; /** * Assert row in the table. @@ -59,8 +60,11 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob if (expectedValue == null) { assertNull(actualValue); } else { - assertEquals(expectedValue.toString(), actualValue.toString(), - "Columns do not match on position: " + i); + if (expectedValue instanceof Blob) { + assertBlob(rs.getBlob(i), (Blob) expectedValue, i); + } else { + assertEquals(expectedValue.toString(), actualValue.toString(), ERROR_MESSAGE_PRFIX + i); + } } i++; } @@ -74,6 +78,12 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob } } + private static void assertBlob(Blob actualValue, Blob expectedValue, int colPosition) throws SQLException { + byte[] actual = actualValue.getBytes(1, (int)actualValue.length()); + byte[] expected = expectedValue.getBytes(1, (int)expectedValue.length()); + assertEquals(actual, expected, ERROR_MESSAGE_PRFIX + colPosition); + } + private ProviderAsserts() { // Instantiation is prohibited } diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java index afc50164..f3efa924 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java @@ -24,7 +24,6 @@ import java.math.BigDecimal; import java.sql.Connection; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -32,6 +31,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Blob; import java.util.LinkedList; import java.util.List; @@ -157,7 +157,7 @@ public String getJdbcDriver() { * * @return */ - public DatabaseTypeList getDatabaseTypes() { + public DatabaseTypeList getDatabaseTypes() throws Exception { return new DefaultTypeList(); } @@ -304,7 +304,9 @@ public void insertRow(TableName tableName, Object ...values) { * Return rows that match given conditions. * * @param tableName Table name - * @param conditions Conditions in form of double values - column name and value, for example: "id", 1 or "last_update_date", null + * @param conditions Conditions in form of double values - column name and value, for example: + * "id", 1 or "last_update_date", null. + * For Blob data type, it can't be used as a condition in where clause directly, skip it. * @return PreparedStatement representing the requested query */ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] conditions) { @@ -326,9 +328,10 @@ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] throw new RuntimeException("Each odd item should be a string with column name."); } - if(value == null) { + // Blob can't be used in where clause directly, skip the where clause for Blob + if (value == null) { conditionList.add(escapeColumnName((String) columnName) + " IS NULL"); - } else { + } else if (! (value instanceof Blob)) { conditionList.add(escapeColumnName((String) columnName) + " = ?"); } } @@ -340,7 +343,8 @@ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] PreparedStatement preparedStatement = getConnection().prepareStatement(sb.toString()); for(int i = 1; i < conditions.length; i += 2) { Object value = conditions[i]; - if (value != null) { + // skip the Blob data type + if (value != null && ! (value instanceof Blob)) { insertObjectIntoPreparedStatement(preparedStatement, i, value); } } @@ -374,6 +378,8 @@ private void insertObjectIntoPreparedStatement(PreparedStatement preparedStateme preparedStatement.setTimestamp(parameterIndex, (Timestamp) value); } else if (value instanceof BigDecimal) { preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value); + } else if (value instanceof Blob) { + preparedStatement.setBlob(parameterIndex, (Blob) value); } else { preparedStatement.setObject(parameterIndex, value); } diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java index 8f3e434c..839e5618 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java @@ -169,7 +169,7 @@ public String getConnectionPassword() { } @Override - public DatabaseTypeList getDatabaseTypes() { + public DatabaseTypeList getDatabaseTypes() throws Exception { return new DerbyTypeList(); } } diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java index 642651de..fc02b833 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java @@ -17,14 +17,17 @@ */ package org.apache.sqoop.common.test.db.types; +import javax.sql.rowset.serial.SerialBlob; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.math.RoundingMode; +import java.sql.SQLException; /** * Source: https://db.apache.org/derby/docs/10.7/ref/crefsqlj31068.html */ public class DerbyTypeList extends DatabaseTypeList { - public DerbyTypeList() { + public DerbyTypeList() throws SQLException, UnsupportedEncodingException { super(); // Numeric types @@ -106,6 +109,10 @@ public DerbyTypeList() { .build()); // BLOB + add(DatabaseType.builder("BLOB(1K)") + .addExample("", new SerialBlob("test data".getBytes("ISO-8859-1")), "'test data'") + .build()); + // CLOB // Time // Timestamp diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java index 3a3f9e87..ee385c04 100644 --- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java +++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java @@ -31,6 +31,7 @@ import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Blob; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; import org.apache.sqoop.schema.type.Date; @@ -238,6 +239,9 @@ private static Column restoreColumn(JSONObject obj) { case BIT: output = new Bit(name); break; + case BLOB: + output = new Blob(name); + break; case DATE: output = new Date(name); break; diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Blob.java b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java new file mode 100644 index 00000000..17d5e6b0 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.schema.type; + +public class Blob extends Binary { + + public Blob(String name) { + super(name); + } + + @Override + public ColumnType getType() { + return ColumnType.BLOB; + } + + @Override + public String toString() { + return new StringBuilder("Blob{") + .append(super.toString()) + .append("}") + .toString(); + } + +} diff --git a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java index 9e415bfd..ac98ee89 100644 --- a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java +++ b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java @@ -29,6 +29,7 @@ public enum ColumnType { ARRAY, BINARY, BIT, + BLOB, DATE, DATE_TIME, DECIMAL, diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java index 0235f28a..41af1773 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -21,7 +21,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; +import java.sql.Blob; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -68,6 +68,11 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From // check type of the column Column schemaColumn = schemaColumns[i]; switch (schemaColumn.getType()) { + case BLOB: + // convert the blob to byte[] + Blob blob = resultSet.getBlob(i + 1); + array[i] = blob.getBytes(1, (int)blob.length()); + break; case DATE: // convert the sql date to JODA time as prescribed the Sqoop IDF spec array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1)); @@ -83,7 +88,6 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From default: //for anything else array[i] = resultSet.getObject(i + 1); - } } context.getDataWriter().writeArrayRecord(array); diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java index a6ffa7ce..f8f9f0dc 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java @@ -20,6 +20,7 @@ import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Blob; import org.apache.sqoop.schema.type.Date; import org.apache.sqoop.schema.type.DateTime; import org.apache.sqoop.schema.type.Decimal; @@ -88,9 +89,11 @@ public static Column sqlTypeToSchemaType(int sqlType, String columnName, int pre case Types.BOOLEAN: return new Bit(columnName); + case Types.BLOB: + return new Blob(columnName); + case Types.BINARY: case Types.VARBINARY: - case Types.BLOB: case Types.LONGVARBINARY: return new Binary(columnName); diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java index fc251003..9baa7433 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -608,6 +608,7 @@ public static String toCSV(Object[] objectArray, Schema schema, String nullValue csvString.append(toCSVString(objectArray[i].toString())); break; case BINARY: + case BLOB: case UNKNOWN: csvString.append(toCSVByteArray((byte[]) objectArray[i])); break; @@ -714,6 +715,7 @@ private static Object toObject(String csvString, Column column) { returnValue = toText(csvString); break; case BINARY: + case BLOB: // Unknown is treated as a binary type case UNKNOWN: returnValue = toByteArray(csvString);