From d01117f7b9d069ac50a737e1a817b4ba6cc49c6b Mon Sep 17 00:00:00 2001 From: Arvind Prabhakar Date: Thu, 15 Sep 2011 07:05:25 +0000 Subject: [PATCH] SQOOP-336. Avro import does not support varbinary types. (Tom White via Arvind Prabhakar) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1170979 13f79535-47bb-0310-9956-ffa450edef68 --- .../sqoop/orm/AvroSchemaGenerator.java | 1 + .../com/cloudera/sqoop/TestAvroImport.java | 104 +++++++----------- 2 files changed, 43 insertions(+), 62 deletions(-) diff --git a/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java index 734f007b..50796ca6 100644 --- a/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java +++ b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java @@ -101,6 +101,7 @@ private Type toAvroType(int sqlType) { case Types.TIMESTAMP: return Type.LONG; case Types.BINARY: + case Types.VARBINARY: return Type.BYTES; default: throw new IllegalArgumentException("Cannot convert SQL type " diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java index abbc05d1..b0473c82 100644 --- a/src/test/com/cloudera/sqoop/TestAvroImport.java +++ b/src/test/com/cloudera/sqoop/TestAvroImport.java @@ -19,19 +19,20 @@ package com.cloudera.sqoop; import java.io.IOException; -import java.sql.Connection; +import java.nio.ByteBuffer; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.FsInput; +import org.apache.avro.util.Utf8; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,7 +64,7 @@ protected String[] getOutputArgv(boolean includeHadoopFlags) { } args.add("--table"); - args.add(HsqldbTestServer.getTableName()); + args.add(getTableName()); args.add("--connect"); args.add(HsqldbTestServer.getUrl()); args.add("--warehouse-dir"); @@ -75,13 +76,14 @@ protected String[] getOutputArgv(boolean includeHadoopFlags) { return args.toArray(new String[0]); } - // this test just uses the two int table. - protected String getTableName() { - return HsqldbTestServer.getTableName(); - } - public void testAvroImport() throws IOException { + String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", + "VARCHAR(6)", "VARBINARY(2)", }; + String [] vals = { "true", "100", "200", "1.0", "2.0", + "'s'", "'0102'", }; + createTableWithColTypes(types, vals); + runImport(getOutputArgv(true)); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); @@ -89,72 +91,50 @@ public void testAvroImport() throws IOException { Schema schema = reader.getSchema(); assertEquals(Schema.Type.RECORD, schema.getType()); List fields = schema.getFields(); - assertEquals(2, fields.size()); + assertEquals(types.length, fields.size()); - assertEquals("INTFIELD1", fields.get(0).name()); - assertEquals(Schema.Type.UNION, fields.get(0).schema().getType()); - assertEquals(Schema.Type.INT, - fields.get(0).schema().getTypes().get(0).getType()); - assertEquals(Schema.Type.NULL, - fields.get(0).schema().getTypes().get(1).getType()); - - assertEquals("INTFIELD2", fields.get(1).name()); - assertEquals(Schema.Type.UNION, fields.get(1).schema().getType()); - assertEquals(Schema.Type.INT, - fields.get(1).schema().getTypes().get(0).getType()); - assertEquals(Schema.Type.NULL, - fields.get(1).schema().getTypes().get(1).getType()); + checkField(fields.get(0), "DATA_COL0", Schema.Type.BOOLEAN); + checkField(fields.get(1), "DATA_COL1", Schema.Type.INT); + checkField(fields.get(2), "DATA_COL2", Schema.Type.LONG); + checkField(fields.get(3), "DATA_COL3", Schema.Type.FLOAT); + checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE); + checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING); + checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES); GenericRecord record1 = reader.next(); - assertEquals(1, record1.get("INTFIELD1")); - assertEquals(8, record1.get("INTFIELD2")); + assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); + assertEquals("DATA_COL1", 100, record1.get("DATA_COL1")); + assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2")); + assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3")); + assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4")); + assertEquals("DATA_COL5", new Utf8("s"), record1.get("DATA_COL5")); + Object object = record1.get("DATA_COL6"); + assertTrue(object instanceof ByteBuffer); + ByteBuffer b = ((ByteBuffer) object); + assertEquals((byte) 1, b.get(0)); + assertEquals((byte) 2, b.get(1)); + } + + private void checkField(Field field, String name, Type type) { + assertEquals(name, field.name()); + assertEquals(Schema.Type.UNION, field.schema().getType()); + assertEquals(type, field.schema().getTypes().get(0).getType()); + assertEquals(Schema.Type.NULL, field.schema().getTypes().get(1).getType()); } public void testNullableAvroImport() throws IOException, SQLException { - addNullRecord(); // Add a pair of NULL values to twointtable. + String [] types = { "INT" }; + String [] vals = { null }; + createTableWithColTypes(types, vals); + runImport(getOutputArgv(true)); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); DataFileReader reader = read(outputFile); - boolean foundNullRecord = false; - // Iterate thru the records in the output file til we find one that - // matches (NULL, NULL). - for (GenericRecord record : reader) { - LOG.debug("Input record: " + record); - if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) { - LOG.debug("Got null record"); - foundNullRecord = true; - } - } + GenericRecord record1 = reader.next(); + assertNull(record1.get("DATA_COL0")); - assertTrue(foundNullRecord); - } - - /** - * Add a record to the TWOINTTABLE that contains (NULL, NULL). - * - * @throws SQLException if there's a problem doing the INSERT statement. - */ - private void addNullRecord() throws SQLException { - Connection connection = null; - Statement st = null; - try { - connection = this.getManager().getConnection(); - st = connection.createStatement(); - st.executeUpdate("INSERT INTO " + getTableName() - + " VALUES(NULL, NULL)"); - - connection.commit(); - } finally { - if (null != st) { - st.close(); - } - - if (null != connection) { - connection.close(); - } - } } private DataFileReader read(Path filename) throws IOException {