diff --git a/LICENSE.txt b/LICENSE.txt index c36c7ad0..48b2c3bc 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -362,3 +362,7 @@ For lib/slf4j-api-.jar: For lib/snappy-java-.jar: The Apache License, Version 2.0 + +Some parts of the code were copied from the Apache Hive Project: + + The Apache License, Version 2.0 \ No newline at end of file diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 4b2f9ced..d8f1dfc1 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -18,7 +18,7 @@ # This properties file lists the versions of the various artifacts we use. # It drives ivy and the generation of a maven POM -avro.version=1.7.5 +avro.version=1.8.0-SNAPSHOT kite-data.version=1.0.0 diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index dffbf6e0..90cc9d04 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.avro; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; @@ -50,12 +51,29 @@ * The service class provides methods for creating and converting Avro objects. */ public final class AvroUtil { + public static boolean isDecimal(Schema.Field field) { + return isDecimal(field.schema()); + } + + public static boolean isDecimal(Schema schema) { + if (schema.getType().equals(Schema.Type.UNION)) { + for (Schema type : schema.getTypes()) { + if (isDecimal(type)) { + return true; + } + } + + return false; + } else { + return "decimal".equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP)); + } + } /** * Convert a Sqoop's Java representation to Avro representation. */ - public static Object toAvro(Object o, boolean bigDecimalFormatString) { - if (o instanceof BigDecimal) { + public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString) { + if (o instanceof BigDecimal && !isDecimal(field)) { if (bigDecimalFormatString) { // Returns a string representation of this without an exponent field. return ((BigDecimal) o).toPlainString(); @@ -111,8 +129,9 @@ public static GenericRecord toGenericRecord(Map fieldMap, Schema schema, boolean bigDecimalFormatString) { GenericRecord record = new GenericData.Record(schema); for (Map.Entry entry : fieldMap.entrySet()) { - Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString); String avroColumn = toAvroColumn(entry.getKey()); + Schema.Field field = schema.getField(avroColumn); + Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString); record.put(avroColumn, avroObject); } return record; @@ -187,7 +206,12 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) { throw new IllegalArgumentException("Only support union with null"); } case FIXED: - return new BytesWritable(((GenericFixed) avroObject).bytes()); + if (isDecimal(schema)) { + // Should automatically be a BigDecimal object. + return avroObject; + } else { + return new BytesWritable(((GenericFixed) avroObject).bytes()); + } case RECORD: case ARRAY: case MAP: diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java index e19c17b4..bd6e99b0 100644 --- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java +++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java @@ -100,6 +100,11 @@ public final class ConfigurationConstants { */ public static final String PROP_SPLIT_LIMIT = "split.limit"; + /** + * Enable avro logical types (decimal support only). + */ + public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable"; + private ConfigurationConstants() { // Disable Explicit Object Creation } diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index d9569c59..f98feb3e 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -32,6 +32,8 @@ import java.util.Set; import java.util.StringTokenizer; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema.Type; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -222,6 +224,22 @@ public Type toAvroType(int sqlType) { } } + /** + * Resolve a database-specific type to Avro logical data type. + * @param sqlType sql type + * @return avro type + */ + public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) { + switch (sqlType) { + case Types.NUMERIC: + case Types.DECIMAL: + return LogicalTypes.decimal(precision, scale); + default: + throw new IllegalArgumentException("Cannot convert SQL type " + + sqlType + " to avro logical type"); + } + } + /** * Return java type for SQL type. * @param tableName table name @@ -258,6 +276,20 @@ public Type toAvroType(String tableName, String columnName, int sqlType) { return toAvroType(sqlType); } + /** + * Return avro logical type for SQL type. + * @param tableName table name + * @param columnName column name + * @param sqlType sql type + * @param precision precision + * @param scale scale + * @return avro type + */ + public LogicalType toAvroLogicalType(String tableName, String columnName, int sqlType, Integer precision, Integer scale) { + // ignore table name and column name by default. + return toAvroLogicalType(sqlType, precision, scale); + } + /** * Return an unordered mapping from colname to sqltype for * all columns in a table. diff --git a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java index 20f056a9..76c3458b 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java @@ -18,8 +18,10 @@ package org.apache.sqoop.mapreduce; +import org.apache.avro.Conversions; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.reflect.ReflectData; import org.apache.hadoop.io.NullWritable; import java.io.IOException; @@ -30,6 +32,14 @@ public class AvroExportMapper extends GenericRecordExportMapper, NullWritable> { + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + // Add decimal support + ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); + } + @Override protected void map(AvroWrapper key, NullWritable value, Context context) throws IOException, InterruptedException { diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java index 0ea5ca41..450f947a 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java @@ -68,8 +68,7 @@ protected void map(LongWritable key, SqoopRecord val, Context context) throw new IOException(sqlE); } - GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), - schema, bigDecimalFormatString); + GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString); wrapper.datum(outKey); context.write(wrapper, NullWritable.get()); } diff --git a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java index aed1e720..d95feb05 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java @@ -23,10 +23,12 @@ import java.net.URLDecoder; import java.util.Map; +import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -34,9 +36,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL; import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL; import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; -import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL; import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY; import static org.apache.avro.mapred.AvroOutputFormat.EXT; import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY; @@ -53,6 +55,7 @@ public class AvroOutputFormat static void configureDataFileWriter(DataFileWriter writer, TaskAttemptContext context) throws UnsupportedEncodingException { if (FileOutputFormat.getCompressOutput(context)) { + // Default level must be greater than 0. int level = context.getConfiguration() .getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL); String codecName = context.getConfiguration() @@ -90,6 +93,9 @@ public RecordWriter, NullWritable> getRecordWriter( isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration()) : AvroJob.getOutputSchema(context.getConfiguration()); + // Add decimal support + ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); + final DataFileWriter WRITER = new DataFileWriter(new ReflectDatumWriter()); diff --git a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java index ab263c16..b60ee426 100644 --- a/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/GenericRecordExportMapper.java @@ -21,7 +21,9 @@ import com.cloudera.sqoop.lib.SqoopRecord; import com.cloudera.sqoop.mapreduce.AutoProgressMapper; import com.cloudera.sqoop.orm.ClassWriter; +import org.apache.avro.Conversions; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DefaultStringifier; @@ -76,6 +78,9 @@ protected void setup(Context context) throws IOException, InterruptedException { columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP, MapWritable.class); + + // Add decimal support + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); } protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException { diff --git a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java index 0a693d02..3c31c43a 100644 --- a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java +++ b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java @@ -19,11 +19,13 @@ package org.apache.sqoop.orm; import java.io.IOException; +import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.avro.LogicalType; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -34,6 +36,7 @@ import com.cloudera.sqoop.manager.ConnManager; import org.apache.sqoop.avro.AvroUtil; +import org.apache.sqoop.config.ConfigurationConstants; import org.codehaus.jackson.node.NullNode; /** @@ -44,6 +47,20 @@ public class AvroSchemaGenerator { public static final Log LOG = LogFactory.getLog(AvroSchemaGenerator.class.getName()); + /** + * Map precision to the number bytes needed for binary conversion. + * @see Apache Hive. + */ + public static final int MAX_PRECISION = 38; + public static final int PRECISION_TO_BYTE_COUNT[] = new int[MAX_PRECISION]; + static { + for (int prec = 1; prec <= MAX_PRECISION; prec++) { + // Estimated number of bytes needed. + PRECISION_TO_BYTE_COUNT[prec - 1] = (int) + Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8); + } + } + private final SqoopOptions options; private final ConnManager connManager; private final String tableName; @@ -65,14 +82,18 @@ public Schema generate() throws IOException { public Schema generate(String schemaNameOverride) throws IOException { ClassWriter classWriter = new ClassWriter(options, connManager, tableName, null); + Map> columnInfo = classWriter.getColumnInfo(); Map columnTypes = classWriter.getColumnTypes(); String[] columnNames = classWriter.getColumnNames(columnTypes); List fields = new ArrayList(); for (String columnName : columnNames) { String cleanedCol = AvroUtil.toAvroIdentifier(ClassWriter.toJavaIdentifier(columnName)); - int sqlType = columnTypes.get(columnName); - Schema avroSchema = toAvroSchema(sqlType, columnName); + List columnInfoList = columnInfo.get(columnName); + int sqlType = columnInfoList.get(0); + Integer precision = columnInfoList.get(1); + Integer scale = columnInfoList.get(2); + Schema avroSchema = toAvroSchema(sqlType, columnName, precision, scale); Field field = new Field(cleanedCol, avroSchema, null, NullNode.getInstance()); field.addProp("columnName", columnName); field.addProp("sqlType", Integer.toString(sqlType)); @@ -98,17 +119,27 @@ public Schema generate(String schemaNameOverride) throws IOException { * * @param sqlType Original SQL type (might be overridden by user) * @param columnName Column name from the query + * @param precision Fixed point precision + * @param scale Fixed point scale * @return Schema */ - public Schema toAvroSchema(int sqlType, String columnName) { + public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) { List childSchemas = new ArrayList(); childSchemas.add(Schema.create(Schema.Type.NULL)); - childSchemas.add(Schema.create(toAvroType(columnName, sqlType))); + if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false) + && isLogicalType(sqlType)) { + childSchemas.add( + toAvroLogicalType(columnName, sqlType, precision, scale) + .addToSchema(Schema.create(Type.BYTES)) + ); + } else { + childSchemas.add(Schema.create(toAvroType(columnName, sqlType))); + } return Schema.createUnion(childSchemas); } public Schema toAvroSchema(int sqlType) { - return toAvroSchema(sqlType, null); + return toAvroSchema(sqlType, null, null, null); } private Type toAvroType(String columnName, int sqlType) { @@ -134,4 +165,18 @@ private Type toAvroType(String columnName, int sqlType) { return connManager.toAvroType(tableName, columnName, sqlType); } + + private LogicalType toAvroLogicalType(String columnName, int sqlType, Integer precision, Integer scale) { + return connManager.toAvroLogicalType(tableName, columnName, sqlType, precision, scale); + } + + private static boolean isLogicalType(int sqlType) { + switch(sqlType) { + case Types.DECIMAL: + case Types.NUMERIC: + return true; + default: + return false; + } + } } diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java index bf40d2cf..5202408a 100644 --- a/src/java/org/apache/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/sqoop/orm/ClassWriter.java @@ -26,6 +26,7 @@ import java.io.Writer; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -1848,6 +1849,14 @@ protected Map getColumnTypes() throws IOException { } } + protected Map> getColumnInfo() throws IOException { + if (options.getCall() == null) { + return connManager.getColumnInfo(tableName, options.getSqlQuery()); + } else { + return connManager.getColumnInfoForProcedure(options.getCall()); + } + } + /** * Generate the ORM code for a table object containing the named columns. * @param columnTypes - mapping from column names to sql types diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java index 53030485..137a6e1a 100644 --- a/src/test/com/cloudera/sqoop/TestAvroExport.java +++ b/src/test/com/cloudera/sqoop/TestAvroExport.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.PreparedStatement; @@ -35,6 +36,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileWriter; @@ -301,6 +304,8 @@ protected void assertColMinAndMax(String colName, ColumnGenerator generator) } public void testSupportedAvroTypes() throws IOException, SQLException { + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); + String[] argv = {}; final int TOTAL_RECORDS = 1 * 10; @@ -308,6 +313,8 @@ public void testSupportedAvroTypes() throws IOException, SQLException { Schema fixed = Schema.createFixed("myfixed", null, null, 2); Schema enumeration = Schema.createEnum("myenum", null, null, Lists.newArrayList("a", "b")); + Schema decimalSchema = LogicalTypes.decimal(3,2) + .addToSchema(Schema.createFixed("dec1", null, null, 2)); ColumnGenerator[] gens = new ColumnGenerator[] { colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"), @@ -323,6 +330,10 @@ public void testSupportedAvroTypes() throws IOException, SQLException { b, "BINARY(2)"), colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration, "a", "VARCHAR(8)"), + colGenerator(new BigDecimal("2.00"), decimalSchema, + new BigDecimal("2.00"), "DECIMAL(3,2)"), + colGenerator("22.00", Schema.create(Schema.Type.STRING), + new BigDecimal("22.00"), "DECIMAL(4,2)"), }; createAvroFile(0, TOTAL_RECORDS, gens); createTable(gens); diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java index af4b4816..00d7a951 100644 --- a/src/test/com/cloudera/sqoop/TestAvroImport.java +++ b/src/test/com/cloudera/sqoop/TestAvroImport.java @@ -118,12 +118,12 @@ public void testUnsupportedCodec() throws IOException { * to those that {@link #getOutputArgv(boolean, String[])} * returns */ - private void avroImportTestHelper(String[] extraArgs, String codec) - throws IOException { + protected void avroImportTestHelper(String[] extraArgs, String codec) + throws IOException { String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)", - "VARBINARY(2)", }; - String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", }; + "VARBINARY(2)", "DECIMAL(3,2)"}; + String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", "'1.00'"}; createTableWithColTypes(types, vals); runImport(getOutputArgv(true, extraArgs)); @@ -142,6 +142,7 @@ private void avroImportTestHelper(String[] extraArgs, String codec) checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE); checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING); checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES); + checkField(fields.get(7), "DATA_COL7", Schema.Type.STRING); GenericRecord record1 = reader.next(); assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); @@ -155,6 +156,7 @@ private void avroImportTestHelper(String[] extraArgs, String codec) ByteBuffer b = ((ByteBuffer) object); assertEquals((byte) 1, b.get(0)); assertEquals((byte) 2, b.get(1)); + assertEquals("DATA_COL7", "1.00", record1.get("DATA_COL7").toString()); if (codec != null) { assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC)); @@ -248,7 +250,7 @@ public void testNonIdentCharactersInColumnName() throws IOException { assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O")); } - private void checkField(Field field, String name, Type type) { + protected void checkField(Field field, String name, Type type) { assertEquals(name, field.name()); assertEquals(Schema.Type.UNION, field.schema().getType()); assertEquals(Schema.Type.NULL, field.schema().getTypes().get(0).getType()); @@ -270,7 +272,7 @@ public void testNullableAvroImport() throws IOException, SQLException { } - private DataFileReader read(Path filename) throws IOException { + protected DataFileReader read(Path filename) throws IOException { Configuration conf = new Configuration(); if (!BaseSqoopTestCase.isOnPhysicalCluster()) { conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); @@ -281,7 +283,7 @@ private DataFileReader read(Path filename) throws IOException { return new DataFileReader(fsInput, datumReader); } - private void checkSchemaFile(final Schema schema) throws IOException { + protected void checkSchemaFile(final Schema schema) throws IOException { final File schemaFile = new File(schema.getName() + ".avsc"); assertTrue(schemaFile.exists()); assertEquals(schema, new Schema.Parser().parse(schemaFile));