5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 05:19:34 +08:00

SQOOP-336. Avro import does not support varbinary types.

(Tom White via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1170979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arvind Prabhakar 2011-09-15 07:05:25 +00:00
parent 4992ce7515
commit d01117f7b9
2 changed files with 43 additions and 62 deletions

View File

@ -101,6 +101,7 @@ private Type toAvroType(int sqlType) {
case Types.TIMESTAMP:
return Type.LONG;
case Types.BINARY:
case Types.VARBINARY:
return Type.BYTES;
default:
throw new IllegalArgumentException("Cannot convert SQL type "

View File

@ -19,19 +19,20 @@
package com.cloudera.sqoop;
import java.io.IOException;
import java.sql.Connection;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -63,7 +64,7 @@ protected String[] getOutputArgv(boolean includeHadoopFlags) {
}
args.add("--table");
args.add(HsqldbTestServer.getTableName());
args.add(getTableName());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--warehouse-dir");
@ -75,13 +76,14 @@ protected String[] getOutputArgv(boolean includeHadoopFlags) {
return args.toArray(new String[0]);
}
// this test just uses the two int table.
protected String getTableName() {
return HsqldbTestServer.getTableName();
}
public void testAvroImport() throws IOException {
String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
"VARCHAR(6)", "VARBINARY(2)", };
String [] vals = { "true", "100", "200", "1.0", "2.0",
"'s'", "'0102'", };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
@ -89,72 +91,50 @@ public void testAvroImport() throws IOException {
Schema schema = reader.getSchema();
assertEquals(Schema.Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(2, fields.size());
assertEquals(types.length, fields.size());
assertEquals("INTFIELD1", fields.get(0).name());
assertEquals(Schema.Type.UNION, fields.get(0).schema().getType());
assertEquals(Schema.Type.INT,
fields.get(0).schema().getTypes().get(0).getType());
assertEquals(Schema.Type.NULL,
fields.get(0).schema().getTypes().get(1).getType());
assertEquals("INTFIELD2", fields.get(1).name());
assertEquals(Schema.Type.UNION, fields.get(1).schema().getType());
assertEquals(Schema.Type.INT,
fields.get(1).schema().getTypes().get(0).getType());
assertEquals(Schema.Type.NULL,
fields.get(1).schema().getTypes().get(1).getType());
checkField(fields.get(0), "DATA_COL0", Schema.Type.BOOLEAN);
checkField(fields.get(1), "DATA_COL1", Schema.Type.INT);
checkField(fields.get(2), "DATA_COL2", Schema.Type.LONG);
checkField(fields.get(3), "DATA_COL3", Schema.Type.FLOAT);
checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
GenericRecord record1 = reader.next();
assertEquals(1, record1.get("INTFIELD1"));
assertEquals(8, record1.get("INTFIELD2"));
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
assertEquals("DATA_COL5", new Utf8("s"), record1.get("DATA_COL5"));
Object object = record1.get("DATA_COL6");
assertTrue(object instanceof ByteBuffer);
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
}
private void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Schema.Type.UNION, field.schema().getType());
assertEquals(type, field.schema().getTypes().get(0).getType());
assertEquals(Schema.Type.NULL, field.schema().getTypes().get(1).getType());
}
public void testNullableAvroImport() throws IOException, SQLException {
addNullRecord(); // Add a pair of NULL values to twointtable.
String [] types = { "INT" };
String [] vals = { null };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
boolean foundNullRecord = false;
// Iterate thru the records in the output file til we find one that
// matches (NULL, NULL).
for (GenericRecord record : reader) {
LOG.debug("Input record: " + record);
if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) {
LOG.debug("Got null record");
foundNullRecord = true;
}
}
GenericRecord record1 = reader.next();
assertNull(record1.get("DATA_COL0"));
assertTrue(foundNullRecord);
}
/**
* Add a record to the TWOINTTABLE that contains (NULL, NULL).
*
* @throws SQLException if there's a problem doing the INSERT statement.
*/
private void addNullRecord() throws SQLException {
Connection connection = null;
Statement st = null;
try {
connection = this.getManager().getConnection();
st = connection.createStatement();
st.executeUpdate("INSERT INTO " + getTableName()
+ " VALUES(NULL, NULL)");
connection.commit();
} finally {
if (null != st) {
st.close();
}
if (null != connection) {
connection.close();
}
}
}
private DataFileReader<GenericRecord> read(Path filename) throws IOException {