5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 05:30:45 +08:00

SQOOP-1493: Add ability to import/export true decimal in Avro instead of serializing it to String

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2016-01-29 08:42:23 -08:00
parent c9c3f6a3a2
commit d25eb2e190
13 changed files with 172 additions and 20 deletions

View File

@ -362,3 +362,7 @@ For lib/slf4j-api-<version>.jar:
For lib/snappy-java-<version>.jar:
The Apache License, Version 2.0
Some parts of the code were copied from the Apache Hive Project:
The Apache License, Version 2.0

View File

@ -18,7 +18,7 @@
# This properties file lists the versions of the various artifacts we use.
# It drives ivy and the generation of a maven POM
avro.version=1.7.5
avro.version=1.8.0-SNAPSHOT
kite-data.version=1.0.0

View File

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.avro;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
@ -50,12 +51,29 @@
* The service class provides methods for creating and converting Avro objects.
*/
public final class AvroUtil {
public static boolean isDecimal(Schema.Field field) {
return isDecimal(field.schema());
}
public static boolean isDecimal(Schema schema) {
if (schema.getType().equals(Schema.Type.UNION)) {
for (Schema type : schema.getTypes()) {
if (isDecimal(type)) {
return true;
}
}
return false;
} else {
return "decimal".equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP));
}
}
/**
* Convert a Sqoop's Java representation to Avro representation.
*/
public static Object toAvro(Object o, boolean bigDecimalFormatString) {
if (o instanceof BigDecimal) {
public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString) {
if (o instanceof BigDecimal && !isDecimal(field)) {
if (bigDecimalFormatString) {
// Returns a string representation of this without an exponent field.
return ((BigDecimal) o).toPlainString();
@ -111,8 +129,9 @@ public static GenericRecord toGenericRecord(Map<String, Object> fieldMap,
Schema schema, boolean bigDecimalFormatString) {
GenericRecord record = new GenericData.Record(schema);
for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString);
String avroColumn = toAvroColumn(entry.getKey());
Schema.Field field = schema.getField(avroColumn);
Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString);
record.put(avroColumn, avroObject);
}
return record;
@ -187,7 +206,12 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) {
throw new IllegalArgumentException("Only support union with null");
}
case FIXED:
return new BytesWritable(((GenericFixed) avroObject).bytes());
if (isDecimal(schema)) {
// Should automatically be a BigDecimal object.
return avroObject;
} else {
return new BytesWritable(((GenericFixed) avroObject).bytes());
}
case RECORD:
case ARRAY:
case MAP:

View File

@ -100,6 +100,11 @@ public final class ConfigurationConstants {
*/
public static final String PROP_SPLIT_LIMIT = "split.limit";
/**
* Enable avro logical types (decimal support only).
*/
public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
private ConfigurationConstants() {
// Disable Explicit Object Creation
}

View File

@ -32,6 +32,8 @@
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema.Type;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -222,6 +224,22 @@ public Type toAvroType(int sqlType) {
}
}
/**
* Resolve a database-specific type to Avro logical data type.
* @param sqlType sql type
* @return avro type
*/
public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
switch (sqlType) {
case Types.NUMERIC:
case Types.DECIMAL:
return LogicalTypes.decimal(precision, scale);
default:
throw new IllegalArgumentException("Cannot convert SQL type "
+ sqlType + " to avro logical type");
}
}
/**
* Return java type for SQL type.
* @param tableName table name
@ -258,6 +276,20 @@ public Type toAvroType(String tableName, String columnName, int sqlType) {
return toAvroType(sqlType);
}
/**
* Return avro logical type for SQL type.
* @param tableName table name
* @param columnName column name
* @param sqlType sql type
* @param precision precision
* @param scale scale
* @return avro type
*/
public LogicalType toAvroLogicalType(String tableName, String columnName, int sqlType, Integer precision, Integer scale) {
// ignore table name and column name by default.
return toAvroLogicalType(sqlType, precision, scale);
}
/**
* Return an unordered mapping from colname to sqltype for
* all columns in a table.

View File

@ -18,8 +18,10 @@
package org.apache.sqoop.mapreduce;
import org.apache.avro.Conversions;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
@ -30,6 +32,14 @@
public class AvroExportMapper
extends GenericRecordExportMapper<AvroWrapper<GenericRecord>, NullWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
// Add decimal support
ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
}
@Override
protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
Context context) throws IOException, InterruptedException {

View File

@ -68,8 +68,7 @@ protected void map(LongWritable key, SqoopRecord val, Context context)
throw new IOException(sqlE);
}
GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(),
schema, bigDecimalFormatString);
GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString);
wrapper.datum(outKey);
context.write(wrapper, NullWritable.get());
}

View File

@ -23,10 +23,12 @@
import java.net.URLDecoder;
import java.util.Map;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@ -34,9 +36,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
import static org.apache.avro.mapred.AvroOutputFormat.EXT;
import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
@ -53,6 +55,7 @@ public class AvroOutputFormat<T>
static <T> void configureDataFileWriter(DataFileWriter<T> writer,
TaskAttemptContext context) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(context)) {
// Default level must be greater than 0.
int level = context.getConfiguration()
.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = context.getConfiguration()
@ -90,6 +93,9 @@ public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
: AvroJob.getOutputSchema(context.getConfiguration());
// Add decimal support
ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
final DataFileWriter<T> WRITER =
new DataFileWriter<T>(new ReflectDatumWriter<T>());

View File

@ -21,7 +21,9 @@
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
import com.cloudera.sqoop.orm.ClassWriter;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;
@ -76,6 +78,9 @@ protected void setup(Context context) throws IOException, InterruptedException {
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
// Add decimal support
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
}
protected SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {

View File

@ -19,11 +19,13 @@
package org.apache.sqoop.orm;
import java.io.IOException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@ -34,6 +36,7 @@
import com.cloudera.sqoop.manager.ConnManager;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.config.ConfigurationConstants;
import org.codehaus.jackson.node.NullNode;
/**
@ -44,6 +47,20 @@ public class AvroSchemaGenerator {
public static final Log LOG =
LogFactory.getLog(AvroSchemaGenerator.class.getName());
/**
* Map precision to the number bytes needed for binary conversion.
* @see <a href="https://github.com/apache/hive/blob/release-1.1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java#L90">Apache Hive</a>.
*/
public static final int MAX_PRECISION = 38;
public static final int PRECISION_TO_BYTE_COUNT[] = new int[MAX_PRECISION];
static {
for (int prec = 1; prec <= MAX_PRECISION; prec++) {
// Estimated number of bytes needed.
PRECISION_TO_BYTE_COUNT[prec - 1] = (int)
Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8);
}
}
private final SqoopOptions options;
private final ConnManager connManager;
private final String tableName;
@ -65,14 +82,18 @@ public Schema generate() throws IOException {
public Schema generate(String schemaNameOverride) throws IOException {
ClassWriter classWriter = new ClassWriter(options, connManager,
tableName, null);
Map<String, List<Integer>> columnInfo = classWriter.getColumnInfo();
Map<String, Integer> columnTypes = classWriter.getColumnTypes();
String[] columnNames = classWriter.getColumnNames(columnTypes);
List<Field> fields = new ArrayList<Field>();
for (String columnName : columnNames) {
String cleanedCol = AvroUtil.toAvroIdentifier(ClassWriter.toJavaIdentifier(columnName));
int sqlType = columnTypes.get(columnName);
Schema avroSchema = toAvroSchema(sqlType, columnName);
List<Integer> columnInfoList = columnInfo.get(columnName);
int sqlType = columnInfoList.get(0);
Integer precision = columnInfoList.get(1);
Integer scale = columnInfoList.get(2);
Schema avroSchema = toAvroSchema(sqlType, columnName, precision, scale);
Field field = new Field(cleanedCol, avroSchema, null, NullNode.getInstance());
field.addProp("columnName", columnName);
field.addProp("sqlType", Integer.toString(sqlType));
@ -98,17 +119,27 @@ public Schema generate(String schemaNameOverride) throws IOException {
*
* @param sqlType Original SQL type (might be overridden by user)
* @param columnName Column name from the query
* @param precision Fixed point precision
* @param scale Fixed point scale
* @return Schema
*/
public Schema toAvroSchema(int sqlType, String columnName) {
public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) {
List<Schema> childSchemas = new ArrayList<Schema>();
childSchemas.add(Schema.create(Schema.Type.NULL));
childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false)
&& isLogicalType(sqlType)) {
childSchemas.add(
toAvroLogicalType(columnName, sqlType, precision, scale)
.addToSchema(Schema.create(Type.BYTES))
);
} else {
childSchemas.add(Schema.create(toAvroType(columnName, sqlType)));
}
return Schema.createUnion(childSchemas);
}
public Schema toAvroSchema(int sqlType) {
return toAvroSchema(sqlType, null);
return toAvroSchema(sqlType, null, null, null);
}
private Type toAvroType(String columnName, int sqlType) {
@ -134,4 +165,18 @@ private Type toAvroType(String columnName, int sqlType) {
return connManager.toAvroType(tableName, columnName, sqlType);
}
private LogicalType toAvroLogicalType(String columnName, int sqlType, Integer precision, Integer scale) {
return connManager.toAvroLogicalType(tableName, columnName, sqlType, precision, scale);
}
private static boolean isLogicalType(int sqlType) {
switch(sqlType) {
case Types.DECIMAL:
case Types.NUMERIC:
return true;
default:
return false;
}
}
}

View File

@ -26,6 +26,7 @@
import java.io.Writer;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -1848,6 +1849,14 @@ protected Map<String, Integer> getColumnTypes() throws IOException {
}
}
protected Map<String, List<Integer>> getColumnInfo() throws IOException {
if (options.getCall() == null) {
return connManager.getColumnInfo(tableName, options.getSqlQuery());
} else {
return connManager.getColumnInfoForProcedure(options.getCall());
}
}
/**
* Generate the ORM code for a table object containing the named columns.
* @param columnTypes - mapping from column names to sql types

View File

@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -35,6 +36,8 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileWriter;
@ -301,6 +304,8 @@ protected void assertColMinAndMax(String colName, ColumnGenerator generator)
}
public void testSupportedAvroTypes() throws IOException, SQLException {
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
String[] argv = {};
final int TOTAL_RECORDS = 1 * 10;
@ -308,6 +313,8 @@ public void testSupportedAvroTypes() throws IOException, SQLException {
Schema fixed = Schema.createFixed("myfixed", null, null, 2);
Schema enumeration = Schema.createEnum("myenum", null, null,
Lists.newArrayList("a", "b"));
Schema decimalSchema = LogicalTypes.decimal(3,2)
.addToSchema(Schema.createFixed("dec1", null, null, 2));
ColumnGenerator[] gens = new ColumnGenerator[] {
colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
@ -323,6 +330,10 @@ public void testSupportedAvroTypes() throws IOException, SQLException {
b, "BINARY(2)"),
colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
"a", "VARCHAR(8)"),
colGenerator(new BigDecimal("2.00"), decimalSchema,
new BigDecimal("2.00"), "DECIMAL(3,2)"),
colGenerator("22.00", Schema.create(Schema.Type.STRING),
new BigDecimal("22.00"), "DECIMAL(4,2)"),
};
createAvroFile(0, TOTAL_RECORDS, gens);
createTable(gens);

View File

@ -118,12 +118,12 @@ public void testUnsupportedCodec() throws IOException {
* to those that {@link #getOutputArgv(boolean, String[])}
* returns
*/
private void avroImportTestHelper(String[] extraArgs, String codec)
throws IOException {
protected void avroImportTestHelper(String[] extraArgs, String codec)
throws IOException {
String[] types =
{"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
"VARBINARY(2)", };
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
"VARBINARY(2)", "DECIMAL(3,2)"};
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", "'1.00'"};
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, extraArgs));
@ -142,6 +142,7 @@ private void avroImportTestHelper(String[] extraArgs, String codec)
checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
checkField(fields.get(7), "DATA_COL7", Schema.Type.STRING);
GenericRecord record1 = reader.next();
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
@ -155,6 +156,7 @@ private void avroImportTestHelper(String[] extraArgs, String codec)
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
assertEquals("DATA_COL7", "1.00", record1.get("DATA_COL7").toString());
if (codec != null) {
assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
@ -248,7 +250,7 @@ public void testNonIdentCharactersInColumnName() throws IOException {
assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O"));
}
private void checkField(Field field, String name, Type type) {
protected void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Schema.Type.UNION, field.schema().getType());
assertEquals(Schema.Type.NULL, field.schema().getTypes().get(0).getType());
@ -270,7 +272,7 @@ public void testNullableAvroImport() throws IOException, SQLException {
}
private DataFileReader<GenericRecord> read(Path filename) throws IOException {
protected DataFileReader<GenericRecord> read(Path filename) throws IOException {
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
@ -281,7 +283,7 @@ private DataFileReader<GenericRecord> read(Path filename) throws IOException {
return new DataFileReader<GenericRecord>(fsInput, datumReader);
}
private void checkSchemaFile(final Schema schema) throws IOException {
protected void checkSchemaFile(final Schema schema) throws IOException {
final File schemaFile = new File(schema.getName() + ".avsc");
assertTrue(schemaFile.exists());
assertEquals(schema, new Schema.Parser().parse(schemaFile));