From 41724c31d4c9e56fc7b979b95707bf9ed2e752c1 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Fri, 5 Sep 2014 08:13:14 +0200 Subject: [PATCH] SQOOP-1491: Remove SqoopAvroRecord (Qian Xu via Jarek Jarcec Cecho) --- src/java/org/apache/sqoop/avro/AvroUtil.java | 24 ++++++-- .../org/apache/sqoop/lib/SqoopAvroRecord.java | 57 ------------------- .../sqoop/mapreduce/AvroImportMapper.java | 27 +++------ .../sqoop/mapreduce/DataDrivenImportJob.java | 19 ++++--- .../sqoop/mapreduce/ParquetImportMapper.java | 22 +++++-- .../apache/sqoop/mapreduce/ParquetJob.java | 7 +++ .../org/apache/sqoop/orm/ClassWriter.java | 57 ++----------------- 7 files changed, 69 insertions(+), 144 deletions(-) delete mode 100644 src/java/org/apache/sqoop/lib/SqoopAvroRecord.java diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index 4b37d589..811c2407 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.avro; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.io.BytesWritable; import org.apache.sqoop.lib.BlobRef; import org.apache.sqoop.lib.ClobRef; @@ -26,6 +29,7 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.Map; /** * The service class provides methods for creating and converting Avro objects. @@ -33,14 +37,13 @@ public final class AvroUtil { /** - * Convert the Avro representation of a Java type (that has already been - * converted from the SQL equivalent). Note that the method is taken from - * {@link org.apache.sqoop.mapreduce.AvroImportMapper} + * Convert a Sqoop's Java representation to Avro representation. */ public static Object toAvro(Object o, boolean bigDecimalFormatString) { if (o instanceof BigDecimal) { if (bigDecimalFormatString) { - return ((BigDecimal)o).toPlainString(); + // Returns a string representation of this without an exponent field. + return ((BigDecimal) o).toPlainString(); } else { return o.toString(); } @@ -66,4 +69,17 @@ public static Object toAvro(Object o, boolean bigDecimalFormatString) { return o; } + /** + * Manipulate a GenericRecord instance. + */ + public static GenericRecord toGenericRecord(Map fieldMap, + Schema schema, boolean bigDecimalFormatString) { + GenericRecord record = new GenericData.Record(schema); + for (Map.Entry entry : fieldMap.entrySet()) { + Object avroObject = toAvro(entry.getValue(), bigDecimalFormatString); + record.put(entry.getKey(), avroObject); + } + return record; + } + } diff --git a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java deleted file mode 100644 index 80875d23..00000000 --- a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.lib; - -import org.apache.avro.generic.GenericRecord; -import org.apache.sqoop.avro.AvroUtil; - -/** - * The abstract class extends {@link org.apache.sqoop.lib.SqoopRecord}. It also - * implements the interface GenericRecord which is a generic instance of an Avro - * record schema. Fields are accessible by name as well as by index. - */ -public abstract class SqoopAvroRecord extends SqoopRecord implements GenericRecord { - - public abstract boolean getBigDecimalFormatString(); - - @Override - public void put(String key, Object v) { - getFieldMap().put(key, v); - } - - @Override - public Object get(String key) { - Object o = getFieldMap().get(key); - return AvroUtil.toAvro(o, getBigDecimalFormatString()); - } - - @Override - public void put(int i, Object v) { - put(getFieldNameByIndex(i), v); - } - - @Override - public Object get(int i) { - return get(getFieldNameByIndex(i)); - } - - private String getFieldNameByIndex(int i) { - return getSchema().getFields().get(i).name(); - } - -} diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java index 6adad794..1454ead4 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java @@ -18,22 +18,21 @@ package org.apache.sqoop.mapreduce; -import java.io.IOException; -import java.sql.SQLException; -import java.util.Map; +import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroWrapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import com.cloudera.sqoop.lib.LargeObjectLoader; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.mapreduce.AutoProgressMapper; import org.apache.sqoop.avro.AvroUtil; +import java.io.IOException; +import java.sql.SQLException; + /** * Imports records by transforming them to Avro records in an Avro data file. */ @@ -69,7 +68,9 @@ protected void map(LongWritable key, SqoopRecord val, Context context) throw new IOException(sqlE); } - wrapper.datum(toGenericRecord(val)); + GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), + schema, bigDecimalFormatString); + wrapper.datum(outKey); context.write(wrapper, NullWritable.get()); } @@ -80,14 +81,4 @@ protected void cleanup(Context context) throws IOException { } } - private GenericRecord toGenericRecord(SqoopRecord val) { - Map fieldMap = val.getFieldMap(); - GenericRecord record = new GenericData.Record(schema); - for (Map.Entry entry : fieldMap.entrySet()) { - Object avro = AvroUtil.toAvro(entry.getValue(), bigDecimalFormatString); - record.put(entry.getKey(), avro); - } - return record; - } - } diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 905ba8c1..19ec5428 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -23,6 +23,8 @@ import java.sql.SQLException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,11 +88,7 @@ protected void configureMapper(Job job, String tableName, job.setOutputValueClass(NullWritable.class); } else if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) { - ConnManager connManager = getContext().getConnManager(); - AvroSchemaGenerator generator = new AvroSchemaGenerator(options, - connManager, tableName); - Schema schema = generator.generate(); - + Schema schema = generateArvoSchema(tableName); try { writeAvroSchema(schema); } catch (final IOException e) { @@ -103,8 +101,8 @@ protected void configureMapper(Job job, String tableName, Configuration conf = job.getConfiguration(); // An Avro schema is required for creating a dataset that manages // Parquet data records. The import will fail, if schema is invalid. - Schema schema = new Schema.Parser().parse(conf.get("avro.schema")); - String uri = ""; + Schema schema = generateArvoSchema(tableName); + String uri; if (options.doHiveImport()) { uri = "dataset:hive?dataset=" + options.getTableName(); } else { @@ -117,6 +115,13 @@ protected void configureMapper(Job job, String tableName, job.setMapperClass(getMapperClass()); } + private Schema generateArvoSchema(String tableName) throws IOException { + ConnManager connManager = getContext().getConnManager(); + AvroSchemaGenerator generator = new AvroSchemaGenerator(options, + connManager, tableName); + return generator.generate(); + } + private void writeAvroSchema(final Schema schema) throws IOException { // Generate schema in JAR output directory. final File schemaFile = new File(options.getJarOutputDir(), schema.getName() + ".avsc"); diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java index effbadd1..fb6af2cb 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java @@ -19,13 +19,14 @@ package org.apache.sqoop.mapreduce; import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.lib.SqoopRecord; import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.sqoop.lib.SqoopAvroRecord; +import org.apache.sqoop.avro.AvroUtil; import java.io.IOException; import java.sql.SQLException; @@ -34,19 +35,26 @@ * Imports records by writing them to a Parquet File. */ public class ParquetImportMapper - extends AutoProgressMapper { + private Schema schema = null; + private boolean bigDecimalFormatString = true; private LargeObjectLoader lobLoader = null; @Override protected void setup(Context context) throws IOException, InterruptedException { - lobLoader = new LargeObjectLoader(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + schema = ParquetJob.getAvroSchema(conf); + bigDecimalFormatString = conf.getBoolean( + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + lobLoader = new LargeObjectLoader(conf); } @Override - protected void map(LongWritable key, SqoopAvroRecord val, Context context) + protected void map(LongWritable key, SqoopRecord val, Context context) throws IOException, InterruptedException { try { // Loading of LOBs was delayed until we have a Context. @@ -55,7 +63,9 @@ protected void map(LongWritable key, SqoopAvroRecord val, Context context) throw new IOException(sqlE); } - context.write(val, null); + GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, + bigDecimalFormatString); + context.write(outKey, null); } @Override diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java index a74432a0..6ef29a1d 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java @@ -39,6 +39,12 @@ public final class ParquetJob { private ParquetJob() { } + private static final String CONF_AVRO_SCHEMA = "avro.schema"; + + public static Schema getAvroSchema(Configuration conf) { + return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA)); + } + /** * Configure the import job. The import process will use a Kite dataset to * write data records into Parquet format internally. The input key class is @@ -63,6 +69,7 @@ public static void configureImportJob(Configuration conf, Schema schema, } else { dataset = createDataset(schema, uri); } + conf.set(CONF_AVRO_SCHEMA, schema.toString()); DatasetKeyOutputFormat.configure(conf).writeTo(dataset); } diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java index 4f9dedd9..94ff5765 100644 --- a/src/java/org/apache/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/sqoop/orm/ClassWriter.java @@ -30,11 +30,9 @@ import java.util.Properties; import java.util.Set; -import org.apache.avro.Schema; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; -import org.apache.sqoop.lib.SqoopAvroRecord; import org.apache.sqoop.mapreduce.ImportJobBase; import com.cloudera.sqoop.SqoopOptions; @@ -1110,26 +1108,6 @@ private void generateSetField(Map columnTypes, } } - private void generateSqoopAvroRecordMethods(String className, Schema schema, StringBuilder sb) { - // Define shared immutable attributes as static - sb.append(" private final static boolean bigDecimalFormatString;\n"); - sb.append(" private final static Schema schema;\n"); - sb.append(" static {\n"); - sb.append(" bigDecimalFormatString = " + bigDecimalFormatString + ";\n"); - sb.append(" schema = new Schema.Parser().parse(\""); - sb.append(schema.toString().replaceAll("\"", "\\\\\"")); - sb.append("\");\n"); - sb.append(" }\n"); - sb.append(" @Override\n"); - sb.append(" public boolean getBigDecimalFormatString() {\n"); - sb.append(" return bigDecimalFormatString;\n"); - sb.append(" }\n"); - sb.append(" @Override\n"); - sb.append(" public Schema getSchema() {\n"); - sb.append(" return schema;\n"); - sb.append(" }\n"); - } - /** * Generate the setField() method. * @param columnTypes - mapping from column names to sql types @@ -1750,15 +1728,9 @@ public void generate() throws IOException { } } - Schema schema = null; - if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { - schema = generateAvroSchemaForTable(tableName); - options.getConf().set("avro.schema", schema.toString()); - } - // Generate the Java code. StringBuilder sb = generateClassForColumns(columnTypes, - cleanedColNames, cleanedDbWriteColNames, schema); + cleanedColNames, cleanedDbWriteColNames); // Write this out to a file in the jar output directory. // We'll move it to the user-visible CodeOutputDir after compiling. String codeOutDir = options.getJarOutputDir(); @@ -1816,12 +1788,6 @@ public void generate() throws IOException { } } - private Schema generateAvroSchemaForTable(String tableName) throws IOException { - AvroSchemaGenerator generator = new AvroSchemaGenerator(options, - connManager, tableName); - return generator.generate(); - } - protected String[] getColumnNames(Map columnTypes) { String [] colNames = options.getColumns(); if (null == colNames) { @@ -1872,18 +1838,15 @@ protected Map getColumnTypes() throws IOException { * @param colNames - ordered list of column names for table. * @param dbWriteColNames - ordered list of column names for the db * write() method of the class. - * @param schema - If a valid Avro schema is specified, the base class will - * be SqoopAvroRecord * @return - A StringBuilder that contains the text of the class code. */ private StringBuilder generateClassForColumns( Map columnTypes, - String [] colNames, String [] dbWriteColNames, Schema schema) { + String [] colNames, String [] dbWriteColNames) { if (colNames.length ==0) { throw new IllegalArgumentException("Attempted to generate class with " + "no columns!"); } - StringBuilder sb = new StringBuilder(); sb.append("// ORM class for table '" + tableName + "'\n"); sb.append("// WARNING: This class is AUTO-GENERATED. " @@ -1915,13 +1878,7 @@ private StringBuilder generateClassForColumns( sb.append("import " + BlobRef.class.getCanonicalName() + ";\n"); sb.append("import " + ClobRef.class.getCanonicalName() + ";\n"); sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n"); - - Class baseClass = SqoopRecord.class; - if (null != schema) { - sb.append("import org.apache.avro.Schema;\n"); - baseClass = SqoopAvroRecord.class; - } - sb.append("import " + baseClass.getCanonicalName() + ";\n"); + sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n"); sb.append("import java.sql.PreparedStatement;\n"); sb.append("import java.sql.ResultSet;\n"); sb.append("import java.sql.SQLException;\n"); @@ -1941,8 +1898,8 @@ private StringBuilder generateClassForColumns( sb.append("\n"); String className = tableNameInfo.getShortClassForTable(tableName); - sb.append("public class " + className + " extends " + baseClass.getSimpleName() - + " implements DBWritable, Writable {\n"); + sb.append("public class " + className + " extends SqoopRecord " + + " implements DBWritable, Writable {\n"); sb.append(" private final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n"); sb.append( @@ -1961,10 +1918,6 @@ private StringBuilder generateClassForColumns( generateGetFieldMap(columnTypes, colNames, sb); generateSetField(columnTypes, colNames, sb); - if (baseClass == SqoopAvroRecord.class) { - generateSqoopAvroRecordMethods(className, schema, sb); - } - // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a // WritableComparable