mirror of
https://github.com/apache/sqoop.git
synced 2025-05-10 22:13:07 +08:00
SQOOP-2797: Sqoop2: Add new schema object for the Blob
(Colin Ma via Jarek Jarcec Cecho)
This commit is contained in:
parent
7e092f5bf4
commit
cea627fa14
@ -23,8 +23,8 @@
|
|||||||
|
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.Blob;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
|
||||||
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
import static org.testng.Assert.assertNull;
|
import static org.testng.Assert.assertNull;
|
||||||
@ -37,6 +37,7 @@
|
|||||||
public class ProviderAsserts {
|
public class ProviderAsserts {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(ProviderAsserts.class);
|
private static final Logger LOG = Logger.getLogger(ProviderAsserts.class);
|
||||||
|
private static final String ERROR_MESSAGE_PRFIX = "Columns do not match on position: ";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert row in the table.
|
* Assert row in the table.
|
||||||
@ -59,8 +60,11 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob
|
|||||||
if (expectedValue == null) {
|
if (expectedValue == null) {
|
||||||
assertNull(actualValue);
|
assertNull(actualValue);
|
||||||
} else {
|
} else {
|
||||||
assertEquals(expectedValue.toString(), actualValue.toString(),
|
if (expectedValue instanceof Blob) {
|
||||||
"Columns do not match on position: " + i);
|
assertBlob(rs.getBlob(i), (Blob) expectedValue, i);
|
||||||
|
} else {
|
||||||
|
assertEquals(expectedValue.toString(), actualValue.toString(), ERROR_MESSAGE_PRFIX + i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
@ -74,6 +78,12 @@ public static void assertRow(DatabaseProvider provider, TableName tableName, Ob
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertBlob(Blob actualValue, Blob expectedValue, int colPosition) throws SQLException {
|
||||||
|
byte[] actual = actualValue.getBytes(1, (int)actualValue.length());
|
||||||
|
byte[] expected = expectedValue.getBytes(1, (int)expectedValue.length());
|
||||||
|
assertEquals(actual, expected, ERROR_MESSAGE_PRFIX + colPosition);
|
||||||
|
}
|
||||||
|
|
||||||
private ProviderAsserts() {
|
private ProviderAsserts() {
|
||||||
// Instantiation is prohibited
|
// Instantiation is prohibited
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.Date;
|
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
@ -32,6 +31,7 @@
|
|||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.sql.Blob;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -157,7 +157,7 @@ public String getJdbcDriver() {
|
|||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public DatabaseTypeList getDatabaseTypes() {
|
public DatabaseTypeList getDatabaseTypes() throws Exception {
|
||||||
return new DefaultTypeList();
|
return new DefaultTypeList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,7 +304,9 @@ public void insertRow(TableName tableName, Object ...values) {
|
|||||||
* Return rows that match given conditions.
|
* Return rows that match given conditions.
|
||||||
*
|
*
|
||||||
* @param tableName Table name
|
* @param tableName Table name
|
||||||
* @param conditions Conditions in form of double values - column name and value, for example: "id", 1 or "last_update_date", null
|
* @param conditions Conditions in form of double values - column name and value, for example:
|
||||||
|
* "id", 1 or "last_update_date", null.
|
||||||
|
* For Blob data type, it can't be used as a condition in where clause directly, skip it.
|
||||||
* @return PreparedStatement representing the requested query
|
* @return PreparedStatement representing the requested query
|
||||||
*/
|
*/
|
||||||
public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] conditions) {
|
public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] conditions) {
|
||||||
@ -326,9 +328,10 @@ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[]
|
|||||||
throw new RuntimeException("Each odd item should be a string with column name.");
|
throw new RuntimeException("Each odd item should be a string with column name.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Blob can't be used in where clause directly, skip the where clause for Blob
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
conditionList.add(escapeColumnName((String) columnName) + " IS NULL");
|
conditionList.add(escapeColumnName((String) columnName) + " IS NULL");
|
||||||
} else {
|
} else if (! (value instanceof Blob)) {
|
||||||
conditionList.add(escapeColumnName((String) columnName) + " = ?");
|
conditionList.add(escapeColumnName((String) columnName) + " = ?");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -340,7 +343,8 @@ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[]
|
|||||||
PreparedStatement preparedStatement = getConnection().prepareStatement(sb.toString());
|
PreparedStatement preparedStatement = getConnection().prepareStatement(sb.toString());
|
||||||
for(int i = 1; i < conditions.length; i += 2) {
|
for(int i = 1; i < conditions.length; i += 2) {
|
||||||
Object value = conditions[i];
|
Object value = conditions[i];
|
||||||
if (value != null) {
|
// skip the Blob data type
|
||||||
|
if (value != null && ! (value instanceof Blob)) {
|
||||||
insertObjectIntoPreparedStatement(preparedStatement, i, value);
|
insertObjectIntoPreparedStatement(preparedStatement, i, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -374,6 +378,8 @@ private void insertObjectIntoPreparedStatement(PreparedStatement preparedStateme
|
|||||||
preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
|
preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
|
||||||
} else if (value instanceof BigDecimal) {
|
} else if (value instanceof BigDecimal) {
|
||||||
preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
|
preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
|
||||||
|
} else if (value instanceof Blob) {
|
||||||
|
preparedStatement.setBlob(parameterIndex, (Blob) value);
|
||||||
} else {
|
} else {
|
||||||
preparedStatement.setObject(parameterIndex, value);
|
preparedStatement.setObject(parameterIndex, value);
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ public String getConnectionPassword() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatabaseTypeList getDatabaseTypes() {
|
public DatabaseTypeList getDatabaseTypes() throws Exception {
|
||||||
return new DerbyTypeList();
|
return new DerbyTypeList();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,14 +17,17 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.common.test.db.types;
|
package org.apache.sqoop.common.test.db.types;
|
||||||
|
|
||||||
|
import javax.sql.rowset.serial.SerialBlob;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Source: https://db.apache.org/derby/docs/10.7/ref/crefsqlj31068.html
|
* Source: https://db.apache.org/derby/docs/10.7/ref/crefsqlj31068.html
|
||||||
*/
|
*/
|
||||||
public class DerbyTypeList extends DatabaseTypeList {
|
public class DerbyTypeList extends DatabaseTypeList {
|
||||||
public DerbyTypeList() {
|
public DerbyTypeList() throws SQLException, UnsupportedEncodingException {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
// Numeric types
|
// Numeric types
|
||||||
@ -106,6 +109,10 @@ public DerbyTypeList() {
|
|||||||
.build());
|
.build());
|
||||||
|
|
||||||
// BLOB
|
// BLOB
|
||||||
|
add(DatabaseType.builder("BLOB(1K)")
|
||||||
|
.addExample("", new SerialBlob("test data".getBytes("ISO-8859-1")), "'test data'")
|
||||||
|
.build());
|
||||||
|
|
||||||
// CLOB
|
// CLOB
|
||||||
// Time
|
// Time
|
||||||
// Timestamp
|
// Timestamp
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.sqoop.schema.type.Array;
|
import org.apache.sqoop.schema.type.Array;
|
||||||
import org.apache.sqoop.schema.type.Binary;
|
import org.apache.sqoop.schema.type.Binary;
|
||||||
import org.apache.sqoop.schema.type.Bit;
|
import org.apache.sqoop.schema.type.Bit;
|
||||||
|
import org.apache.sqoop.schema.type.Blob;
|
||||||
import org.apache.sqoop.schema.type.Column;
|
import org.apache.sqoop.schema.type.Column;
|
||||||
import org.apache.sqoop.schema.type.ColumnType;
|
import org.apache.sqoop.schema.type.ColumnType;
|
||||||
import org.apache.sqoop.schema.type.Date;
|
import org.apache.sqoop.schema.type.Date;
|
||||||
@ -238,6 +239,9 @@ private static Column restoreColumn(JSONObject obj) {
|
|||||||
case BIT:
|
case BIT:
|
||||||
output = new Bit(name);
|
output = new Bit(name);
|
||||||
break;
|
break;
|
||||||
|
case BLOB:
|
||||||
|
output = new Blob(name);
|
||||||
|
break;
|
||||||
case DATE:
|
case DATE:
|
||||||
output = new Date(name);
|
output = new Date(name);
|
||||||
break;
|
break;
|
||||||
|
39
common/src/main/java/org/apache/sqoop/schema/type/Blob.java
Normal file
39
common/src/main/java/org/apache/sqoop/schema/type/Blob.java
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.schema.type;
|
||||||
|
|
||||||
|
public class Blob extends Binary {
|
||||||
|
|
||||||
|
public Blob(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ColumnType getType() {
|
||||||
|
return ColumnType.BLOB;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new StringBuilder("Blob{")
|
||||||
|
.append(super.toString())
|
||||||
|
.append("}")
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -29,6 +29,7 @@ public enum ColumnType {
|
|||||||
ARRAY,
|
ARRAY,
|
||||||
BINARY,
|
BINARY,
|
||||||
BIT,
|
BIT,
|
||||||
|
BLOB,
|
||||||
DATE,
|
DATE,
|
||||||
DATE_TIME,
|
DATE_TIME,
|
||||||
DECIMAL,
|
DECIMAL,
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.ResultSetMetaData;
|
import java.sql.ResultSetMetaData;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Blob;
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.sqoop.common.SqoopException;
|
import org.apache.sqoop.common.SqoopException;
|
||||||
@ -68,6 +68,11 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From
|
|||||||
// check type of the column
|
// check type of the column
|
||||||
Column schemaColumn = schemaColumns[i];
|
Column schemaColumn = schemaColumns[i];
|
||||||
switch (schemaColumn.getType()) {
|
switch (schemaColumn.getType()) {
|
||||||
|
case BLOB:
|
||||||
|
// convert the blob to byte[]
|
||||||
|
Blob blob = resultSet.getBlob(i + 1);
|
||||||
|
array[i] = blob.getBytes(1, (int)blob.length());
|
||||||
|
break;
|
||||||
case DATE:
|
case DATE:
|
||||||
// convert the sql date to JODA time as prescribed the Sqoop IDF spec
|
// convert the sql date to JODA time as prescribed the Sqoop IDF spec
|
||||||
array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1));
|
array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1));
|
||||||
@ -83,7 +88,6 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfig, From
|
|||||||
default:
|
default:
|
||||||
//for anything else
|
//for anything else
|
||||||
array[i] = resultSet.getObject(i + 1);
|
array[i] = resultSet.getObject(i + 1);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
context.getDataWriter().writeArrayRecord(array);
|
context.getDataWriter().writeArrayRecord(array);
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import org.apache.sqoop.schema.type.Column;
|
import org.apache.sqoop.schema.type.Column;
|
||||||
import org.apache.sqoop.schema.type.Binary;
|
import org.apache.sqoop.schema.type.Binary;
|
||||||
import org.apache.sqoop.schema.type.Bit;
|
import org.apache.sqoop.schema.type.Bit;
|
||||||
|
import org.apache.sqoop.schema.type.Blob;
|
||||||
import org.apache.sqoop.schema.type.Date;
|
import org.apache.sqoop.schema.type.Date;
|
||||||
import org.apache.sqoop.schema.type.DateTime;
|
import org.apache.sqoop.schema.type.DateTime;
|
||||||
import org.apache.sqoop.schema.type.Decimal;
|
import org.apache.sqoop.schema.type.Decimal;
|
||||||
@ -88,9 +89,11 @@ public static Column sqlTypeToSchemaType(int sqlType, String columnName, int pre
|
|||||||
case Types.BOOLEAN:
|
case Types.BOOLEAN:
|
||||||
return new Bit(columnName);
|
return new Bit(columnName);
|
||||||
|
|
||||||
|
case Types.BLOB:
|
||||||
|
return new Blob(columnName);
|
||||||
|
|
||||||
case Types.BINARY:
|
case Types.BINARY:
|
||||||
case Types.VARBINARY:
|
case Types.VARBINARY:
|
||||||
case Types.BLOB:
|
|
||||||
case Types.LONGVARBINARY:
|
case Types.LONGVARBINARY:
|
||||||
return new Binary(columnName);
|
return new Binary(columnName);
|
||||||
|
|
||||||
|
@ -608,6 +608,7 @@ public static String toCSV(Object[] objectArray, Schema schema, String nullValue
|
|||||||
csvString.append(toCSVString(objectArray[i].toString()));
|
csvString.append(toCSVString(objectArray[i].toString()));
|
||||||
break;
|
break;
|
||||||
case BINARY:
|
case BINARY:
|
||||||
|
case BLOB:
|
||||||
case UNKNOWN:
|
case UNKNOWN:
|
||||||
csvString.append(toCSVByteArray((byte[]) objectArray[i]));
|
csvString.append(toCSVByteArray((byte[]) objectArray[i]));
|
||||||
break;
|
break;
|
||||||
@ -714,6 +715,7 @@ private static Object toObject(String csvString, Column column) {
|
|||||||
returnValue = toText(csvString);
|
returnValue = toText(csvString);
|
||||||
break;
|
break;
|
||||||
case BINARY:
|
case BINARY:
|
||||||
|
case BLOB:
|
||||||
// Unknown is treated as a binary type
|
// Unknown is treated as a binary type
|
||||||
case UNKNOWN:
|
case UNKNOWN:
|
||||||
returnValue = toByteArray(csvString);
|
returnValue = toByteArray(csvString);
|
||||||
|
Loading…
Reference in New Issue
Block a user