From 44ffd1e75b38664277ec136109cac41ddc00bdd3 Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:40 +0000 Subject: [PATCH] SQOOP-207. Support import as Avro Data Files. (Tom White via Arvind Prabhakar) From: Arvind Prabhakar git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1150047 13f79535-47bb-0310-9956-ffa450edef68 --- bin/configure-sqoop | 3 +- ivy.xml | 16 +- ivy/libraries.properties | 2 + src/docs/man/import-args.txt | 3 + src/docs/man/sqoop-import-all-tables.txt | 3 + src/docs/user/help.txt | 1 + src/docs/user/hive.txt | 3 +- src/docs/user/import-all-tables.txt | 1 + src/docs/user/import.txt | 1 + src/java/com/cloudera/sqoop/SqoopOptions.java | 3 +- .../sqoop/mapreduce/AvroImportMapper.java | 97 ++++++++++++ .../com/cloudera/sqoop/mapreduce/AvroJob.java | 41 ++++++ .../sqoop/mapreduce/AvroOutputFormat.java | 65 +++++++++ .../sqoop/mapreduce/DataDrivenImportJob.java | 31 +++- .../sqoop/orm/AvroSchemaGenerator.java | 112 ++++++++++++++ .../com/cloudera/sqoop/orm/ClassWriter.java | 138 ++++++++++-------- .../cloudera/sqoop/orm/TableClassName.java | 2 +- .../cloudera/sqoop/tool/BaseSqoopTool.java | 1 + .../com/cloudera/sqoop/tool/ImportTool.java | 8 + .../com/cloudera/sqoop/TestAvroImport.java | 113 ++++++++++++++ .../cloudera/sqoop/orm/TestClassWriter.java | 17 ++- 21 files changed, 576 insertions(+), 85 deletions(-) create mode 100644 src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/AvroJob.java create mode 100644 src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java create mode 100644 src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java create mode 100644 src/test/com/cloudera/sqoop/TestAvroImport.java diff --git a/bin/configure-sqoop b/bin/configure-sqoop index 31a75ebc..a00e8fb3 100755 --- a/bin/configure-sqoop +++ b/bin/configure-sqoop @@ -77,7 +77,8 @@ fi # Add HBase to dependency list if [ -e "$HBASE_HOME/bin/hbase" ]; then - SQOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath`:${SQOOP_CLASSPATH} + TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`$HBASE_HOME/bin/hbase classpath` + SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH} fi ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper} diff --git a/ivy.xml b/ivy.xml index 48daccb1..b6c4830d 100644 --- a/ivy.xml +++ b/ivy.xml @@ -84,7 +84,21 @@ conf="common->default"/> - + + + + + + + + + + + + + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 94ccd370..0f1de10a 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -16,6 +16,8 @@ # This properties file lists the versions of the various artifacts we use. # It drives ivy and the generation of a maven POM +avro.version=1.5.1 + checkstyle.version=5.0 commons-cli.version=1.2 diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt index 377f4036..b6055eac 100644 --- a/src/docs/man/import-args.txt +++ b/src/docs/man/import-args.txt @@ -23,6 +23,9 @@ Import control options --append:: Append data to an existing HDFS dataset +----as-avrodatafile:: + Imports data to Avro Data Files + --as-sequencefile:: Imports data to SequenceFiles diff --git a/src/docs/man/sqoop-import-all-tables.txt b/src/docs/man/sqoop-import-all-tables.txt index be1142b3..ad3890ab 100644 --- a/src/docs/man/sqoop-import-all-tables.txt +++ b/src/docs/man/sqoop-import-all-tables.txt @@ -27,6 +27,9 @@ include::common-args.txt[] Import control options ~~~~~~~~~~~~~~~~~~~~~~ +----as-avrodatafile:: + Imports data to Avro Data Files + --as-sequencefile:: Imports data to SequenceFiles diff --git a/src/docs/user/help.txt b/src/docs/user/help.txt index cec89d88..d7f8d868 100644 --- a/src/docs/user/help.txt +++ b/src/docs/user/help.txt @@ -75,6 +75,7 @@ Common arguments: --verbose Print more information while working Import control arguments: + --as-avrodatafile Imports data to Avro Data Files --as-sequencefile Imports data to SequenceFiles --as-textfile Imports data as plain text (default) ... diff --git a/src/docs/user/hive.txt b/src/docs/user/hive.txt index 83744672..059d7cb7 100644 --- a/src/docs/user/hive.txt +++ b/src/docs/user/hive.txt @@ -40,7 +40,8 @@ multiple Hive installations, or +hive+ is not in your +$PATH+, use the *+\--hive-home+* option to identify the Hive installation directory. Sqoop will use +$HIVE_HOME/bin/hive+ from here. -NOTE: This function is incompatible with +\--as-sequencefile+. +NOTE: This function is incompatible with +\--as-avrodatafile+ and ++\--as-sequencefile+. Even though Hive supports escaping characters, it does not handle escaping of new-line character. Also, it does not support diff --git a/src/docs/user/import-all-tables.txt b/src/docs/user/import-all-tables.txt index 7674226f..b52fac31 100644 --- a/src/docs/user/import-all-tables.txt +++ b/src/docs/user/import-all-tables.txt @@ -44,6 +44,7 @@ include::common-args.txt[] `----------------------------`--------------------------------------- Argument Description --------------------------------------------------------------------- ++\--as-avrodatafile+ Imports data to Avro Data Files +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) +\--direct+ Use direct import fast path diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 24cda633..80c4f2d5 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -54,6 +54,7 @@ Argument Description ------------------------------------------------------------------------- +\--append+ Append data to an existing dataset\ in HDFS ++\--as-avrodatafile+ Imports data to Avro Data Files +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) +\--columns + Columns to import from table diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 5aa3ec4e..ac3e488a 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -77,7 +77,8 @@ public String toString() { /** Selects in-HDFS destination file format. */ public enum FileLayout { TextFile, - SequenceFile + SequenceFile, + AvroDataFile } /** diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java b/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java new file mode 100644 index 00000000..a084a20a --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java @@ -0,0 +1,97 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import com.cloudera.sqoop.lib.BlobRef; +import com.cloudera.sqoop.lib.ClobRef; +import com.cloudera.sqoop.lib.SqoopRecord; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * Imports records by transforming them to Avro records in an Avro data file. + */ +public class AvroImportMapper + extends AutoProgressMapper, NullWritable> { + + private final AvroWrapper wrapper = + new AvroWrapper(); + private Schema schema; + + @Override + protected void setup(Context context) { + schema = AvroJob.getMapOutputSchema(context.getConfiguration()); + } + + @Override + protected void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + wrapper.datum(toGenericRecord(val)); + context.write(wrapper, NullWritable.get()); + } + + + private GenericRecord toGenericRecord(SqoopRecord val) { + Map fieldMap = val.getFieldMap(); + GenericRecord record = new GenericData.Record(schema); + for (Map.Entry entry : fieldMap.entrySet()) { + record.put(entry.getKey(), toAvro(entry.getValue())); + } + return record; + } + + /** + * Convert the Avro representation of a Java type (that has already been + * converted from the SQL equivalent). + * @param o + * @return + */ + private Object toAvro(Object o) { + if (o instanceof BigDecimal) { + return o.toString(); + } else if (o instanceof Date) { + return ((Date) o).getTime(); + } else if (o instanceof Time) { + return ((Time) o).getTime(); + } else if (o instanceof Timestamp) { + return ((Timestamp) o).getTime(); + } else if (o instanceof ClobRef) { + throw new UnsupportedOperationException("ClobRef not suported"); + } else if (o instanceof BlobRef) { + throw new UnsupportedOperationException("BlobRef not suported"); + } + // primitive types (Integer, etc) are left unchanged + return o; + } + + +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java b/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java new file mode 100644 index 00000000..9fb84804 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java @@ -0,0 +1,41 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; + +/** + * Helper class for setting up an Avro MapReduce job. + */ +public final class AvroJob { + public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema"; + + private AvroJob() { + } + + public static void setMapOutputSchema(Configuration job, Schema s) { + job.set(MAP_OUTPUT_SCHEMA, s.toString()); + } + + /** Return a job's map output key schema. */ + public static Schema getMapOutputSchema(Configuration job) { + return Schema.parse(job.get(MAP_OUTPUT_SCHEMA)); + } +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java new file mode 100644 index 00000000..dceee6bb --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java @@ -0,0 +1,65 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */ +public class AvroOutputFormat + extends FileOutputFormat, NullWritable> { + + @Override + public RecordWriter, NullWritable> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + + Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration()); + + final DataFileWriter WRITER = + new DataFileWriter(new GenericDatumWriter()); + + Path path = getDefaultWorkFile(context, + org.apache.avro.mapred.AvroOutputFormat.EXT); + WRITER.create(schema, + path.getFileSystem(context.getConfiguration()).create(path)); + + return new RecordWriter, NullWritable>() { + @Override + public void write(AvroWrapper wrapper, NullWritable ignore) + throws IOException { + WRITER.append(wrapper.datum()); + } + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + WRITER.close(); + } + }; + } + +} diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index 471cc21b..661dde8d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -18,9 +18,19 @@ package com.cloudera.sqoop.mapreduce; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.config.ConfigurationHelper; +import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; +import com.cloudera.sqoop.orm.AvroSchemaGenerator; + import java.io.IOException; import java.sql.SQLException; +import org.apache.avro.Schema; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.NullWritable; @@ -32,14 +42,6 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.config.ConfigurationHelper; -import com.cloudera.sqoop.lib.LargeObjectLoader; -import com.cloudera.sqoop.manager.ConnManager; -import com.cloudera.sqoop.manager.ImportJobContext; -import com.cloudera.sqoop.mapreduce.db.DBConfiguration; -import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; - /** * Actually runs a jdbc import job using the ORM files generated by the * sqoop.orm package. Uses DataDrivenDBInputFormat. @@ -68,6 +70,13 @@ protected void configureMapper(Job job, String tableName, // other types, we just use the defaults. job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); + } else if (options.getFileLayout() + == SqoopOptions.FileLayout.AvroDataFile) { + ConnManager connManager = getContext().getConnManager(); + AvroSchemaGenerator generator = new AvroSchemaGenerator(options, + connManager, tableName); + Schema schema = generator.generate(); + AvroJob.setMapOutputSchema(job.getConfiguration(), schema); } job.setMapperClass(getMapperClass()); @@ -80,6 +89,9 @@ protected Class getMapperClass() { } else if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { return SequenceFileImportMapper.class; + } else if (options.getFileLayout() + == SqoopOptions.FileLayout.AvroDataFile) { + return AvroImportMapper.class; } return null; @@ -93,6 +105,9 @@ protected Class getOutputFormatClass() } else if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { return SequenceFileOutputFormat.class; + } else if (options.getFileLayout() + == SqoopOptions.FileLayout.AvroDataFile) { + return AvroOutputFormat.class; } return null; diff --git a/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java new file mode 100644 index 00000000..1788141b --- /dev/null +++ b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java @@ -0,0 +1,112 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.orm; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; + +import java.io.IOException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; + +/** + * Creates an Avro schema to represent a table from a database. + */ +public class AvroSchemaGenerator { + + private final SqoopOptions options; + private final ConnManager connManager; + private final String tableName; + + public AvroSchemaGenerator(final SqoopOptions opts, final ConnManager connMgr, + final String table) { + this.options = opts; + this.connManager = connMgr; + this.tableName = table; + } + + public Schema generate() throws IOException { + ClassWriter classWriter = new ClassWriter(options, connManager, + tableName, null); + Map columnTypes = classWriter.getColumnTypes(); + String[] columnNames = classWriter.getColumnNames(columnTypes); + + List fields = new ArrayList(); + for (String columnName : columnNames) { + String cleanedCol = ClassWriter.toIdentifier(columnName); + int sqlType = columnTypes.get(cleanedCol); + Schema avroSchema = toAvroSchema(sqlType); + Field field = new Field(cleanedCol, avroSchema, null, null); + field.addProp("columnName", columnName); + field.addProp("sqlType", Integer.toString(sqlType)); + fields.add(field); + } + String doc = "Sqoop import of " + tableName; + Schema schema = Schema.createRecord(tableName, doc, null, false); + schema.setFields(fields); + schema.addProp("tableName", tableName); + return schema; + } + + private Type toAvroType(int sqlType) { + switch (sqlType) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + return Type.INT; + case Types.BIGINT: + return Type.LONG; + case Types.BIT: + case Types.BOOLEAN: + return Type.BOOLEAN; + case Types.REAL: + return Type.FLOAT; + case Types.FLOAT: + case Types.DOUBLE: + return Type.DOUBLE; + case Types.NUMERIC: + case Types.DECIMAL: + return Type.STRING; + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + return Type.STRING; + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + return Type.LONG; + case Types.BINARY: + return Type.BYTES; + default: + throw new IllegalArgumentException("Cannot convert SQL type " + + sqlType); + } + } + + public Schema toAvroSchema(int sqlType) { + return Schema.create(toAvroType(sqlType)); + } + +} diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java index 61a0d693..6445706d 100644 --- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java +++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java @@ -161,8 +161,8 @@ private static boolean isReservedWord(String word) { } /** - * Coerce a candidate name for an identifier into one which will - * definitely compile. + * Coerce a candidate name for an identifier into one which is a valid + * Java or Avro identifier. * * Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]* * and is not a reserved word. @@ -204,8 +204,22 @@ public static String toIdentifier(String candidate) { } } } + return sb.toString(); + } - String output = sb.toString(); + /** + * Coerce a candidate name for an identifier into one which will + * definitely compile. + * + * Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]* + * and is not a reserved word. + * + * @param candidate A string we want to use as an identifier + * @return A string naming an identifier which compiles and is + * similar to the candidate. + */ + public static String toJavaIdentifier(String candidate) { + String output = toIdentifier(candidate); if (isReservedWord(output)) { // e.g., 'class' -> '_class'; return "_" + output; @@ -998,66 +1012,22 @@ private void generateHadoopWrite(Map columnTypes, String [] cleanedColNames = new String[colNames.length]; for (int i = 0; i < colNames.length; i++) { String col = colNames[i]; - String identifier = toIdentifier(col); + String identifier = toJavaIdentifier(col); cleanedColNames[i] = identifier; } return cleanedColNames; } - private Map setupColumnTypes() throws IOException { - Map columnTypes; - if (null != tableName) { - // We're generating a class based on a table import. - columnTypes = connManager.getColumnTypes(tableName); - } else { - // This is based on an arbitrary query. - String query = this.options.getSqlQuery(); - if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) { - throw new IOException("Query [" + query + "] must contain '" - + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause."); - } + /** + * Generate the ORM code for the class. + */ + public void generate() throws IOException { + Map columnTypes = getColumnTypes(); - columnTypes = connManager.getColumnTypesForQuery(query); - } - return columnTypes; - } + String[] colNames = getColumnNames(columnTypes); - private String[] setupColNames(Map columnTypes) { - String [] colNames = options.getColumns(); - if (null == colNames) { - if (null != tableName) { - // Table-based import. Read column names from table. - colNames = connManager.getColumnNames(tableName); - } else { - // Infer/assign column names for arbitrary query. - colNames = connManager.getColumnNamesForQuery( - this.options.getSqlQuery()); - } - } else { - // These column names were provided by the user. They may not be in - // the same case as the keys in the columnTypes map. So make sure - // we add the appropriate aliases in that map. - for (String userColName : colNames) { - for (Map.Entry typeEntry : columnTypes.entrySet()) { - String typeColName = typeEntry.getKey(); - if (typeColName.equalsIgnoreCase(userColName) - && !typeColName.equals(userColName)) { - // We found the correct-case equivalent. - columnTypes.put(userColName, typeEntry.getValue()); - // No need to continue iteration; only one could match. - // Also, the use of put() just invalidated the iterator. - break; - } - } - } - } - return colNames; - } - - private String[] setupCleanedColNames(Map columnTypes, - String[] colNames) { // Translate all the column names into names that are safe to // use as identifiers. String [] cleanedColNames = cleanColNames(colNames); @@ -1082,16 +1052,6 @@ private String[] setupCleanedColNames(Map columnTypes, } columnTypes.put(identifier, type); } - return cleanedColNames; - } - - /** - * Generate the ORM code for the class. - */ - public void generate() throws IOException { - Map columnTypes = setupColumnTypes(); - String[] colNames = setupColNames(columnTypes); - String[] cleanedColNames = setupCleanedColNames(columnTypes, colNames); // The db write() method may use column names in a different // order. If this is set in the options, pull it out here and @@ -1186,6 +1146,56 @@ public void generate() throws IOException { } } + protected String[] getColumnNames(Map columnTypes) { + String [] colNames = options.getColumns(); + if (null == colNames) { + if (null != tableName) { + // Table-based import. Read column names from table. + colNames = connManager.getColumnNames(tableName); + } else { + // Infer/assign column names for arbitrary query. + colNames = connManager.getColumnNamesForQuery( + this.options.getSqlQuery()); + } + } else { + // These column names were provided by the user. They may not be in + // the same case as the keys in the columnTypes map. So make sure + // we add the appropriate aliases in that map. + for (String userColName : colNames) { + for (Map.Entry typeEntry : columnTypes.entrySet()) { + String typeColName = typeEntry.getKey(); + if (typeColName.equalsIgnoreCase(userColName) + && !typeColName.equals(userColName)) { + // We found the correct-case equivalent. + columnTypes.put(userColName, typeEntry.getValue()); + // No need to continue iteration; only one could match. + // Also, the use of put() just invalidated the iterator. + break; + } + } + } + } + return colNames; + } + + protected Map getColumnTypes() throws IOException { + Map columnTypes; + if (null != tableName) { + // We're generating a class based on a table import. + columnTypes = connManager.getColumnTypes(tableName); + } else { + // This is based on an arbitrary query. + String query = this.options.getSqlQuery(); + if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) { + throw new IOException("Query [" + query + "] must contain '" + + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause."); + } + + columnTypes = connManager.getColumnTypesForQuery(query); + } + return columnTypes; + } + /** * Generate the ORM code for a table object containing the named columns. * @param columnTypes - mapping from column names to sql types diff --git a/src/java/com/cloudera/sqoop/orm/TableClassName.java b/src/java/com/cloudera/sqoop/orm/TableClassName.java index d141e2fc..93ab0806 100644 --- a/src/java/com/cloudera/sqoop/orm/TableClassName.java +++ b/src/java/com/cloudera/sqoop/orm/TableClassName.java @@ -95,7 +95,7 @@ public String getClassForTable(String tableName) { // no specific class; no specific package. // Just make sure it's a legal identifier. - return ClassWriter.toIdentifier(queryName); + return ClassWriter.toJavaIdentifier(queryName); } /** diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 2bc0fe9a..a69f9aac 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -86,6 +86,7 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile"; public static final String FMT_TEXTFILE_ARG = "as-textfile"; + public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile"; public static final String HIVE_IMPORT_ARG = "hive-import"; public static final String HIVE_TABLE_ARG = "hive-table"; public static final String HIVE_OVERWRITE_ARG = "hive-overwrite"; diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 2245d03b..66e60bda 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -509,6 +509,10 @@ protected RelatedOptions getImportOptions() { .withDescription("Imports data as plain text (default)") .withLongOpt(FMT_TEXTFILE_ARG) .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data to Avro data files") + .withLongOpt(FMT_AVRODATAFILE_ARG) + .create()); importOpts.addOption(OptionBuilder.withArgName("n") .hasArg().withDescription("Use 'n' map tasks to import in parallel") .withLongOpt(NUM_MAPPERS_ARG) @@ -699,6 +703,10 @@ public void applyOptions(CommandLine in, SqoopOptions out) out.setFileLayout(SqoopOptions.FileLayout.TextFile); } + if (in.hasOption(FMT_AVRODATAFILE_ARG)) { + out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile); + } + if (in.hasOption(NUM_MAPPERS_ARG)) { out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG))); } diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java new file mode 100644 index 00000000..0e17f2e5 --- /dev/null +++ b/src/test/com/cloudera/sqoop/TestAvroImport.java @@ -0,0 +1,113 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop; + +import com.cloudera.sqoop.testutil.BaseSqoopTestCase; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Tests --as-avrodatafile. + */ +public class TestAvroImport extends ImportJobTestCase { + + public static final Log LOG = LogFactory + .getLog(TestAvroImport.class.getName()); + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected String[] getOutputArgv(boolean includeHadoopFlags) { + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--as-avrodatafile"); + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + public void testAvroImport() throws IOException { + + runImport(getOutputArgv(true)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List fields = schema.getFields(); + assertEquals(2, fields.size()); + + assertEquals("INTFIELD1", fields.get(0).name()); + assertEquals(Schema.Type.INT, fields.get(0).schema().getType()); + + assertEquals("INTFIELD2", fields.get(1).name()); + assertEquals(Schema.Type.INT, fields.get(1).schema().getType()); + + GenericRecord record1 = reader.next(); + assertEquals(1, record1.get("INTFIELD1")); + assertEquals(8, record1.get("INTFIELD2")); + } + + private DataFileReader read(Path filename) throws IOException { + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + FsInput fsInput = new FsInput(filename, conf); + DatumReader datumReader = + new GenericDatumReader(); + return new DataFileReader(fsInput, datumReader); + } + +} diff --git a/src/test/com/cloudera/sqoop/orm/TestClassWriter.java b/src/test/com/cloudera/sqoop/orm/TestClassWriter.java index 23dbd10c..761c8a33 100644 --- a/src/test/com/cloudera/sqoop/orm/TestClassWriter.java +++ b/src/test/com/cloudera/sqoop/orm/TestClassWriter.java @@ -291,7 +291,7 @@ public void testSetPackageName() { // Test the SQL identifier -> Java identifier conversion. @Test - public void testIdentifierConversion() { + public void testJavaIdentifierConversion() { assertNull(ClassWriter.getIdentifierStrForChar(' ')); assertNull(ClassWriter.getIdentifierStrForChar('\t')); assertNull(ClassWriter.getIdentifierStrForChar('\r')); @@ -300,14 +300,15 @@ public void testIdentifierConversion() { assertEquals("_", ClassWriter.getIdentifierStrForChar('-')); assertEquals("_", ClassWriter.getIdentifierStrForChar('_')); - assertEquals("foo", ClassWriter.toIdentifier("foo")); - assertEquals("_class", ClassWriter.toIdentifier("class")); - assertEquals("_class", ClassWriter.toIdentifier("cla ss")); - assertEquals("_int", ClassWriter.toIdentifier("int")); - assertEquals("thisismanywords", ClassWriter.toIdentifier( + assertEquals("foo", ClassWriter.toJavaIdentifier("foo")); + assertEquals("_class", ClassWriter.toJavaIdentifier("class")); + assertEquals("_class", ClassWriter.toJavaIdentifier("cla ss")); + assertEquals("_int", ClassWriter.toJavaIdentifier("int")); + assertEquals("thisismanywords", ClassWriter.toJavaIdentifier( "this is many words")); - assertEquals("_9isLegalInSql", ClassWriter.toIdentifier("9isLegalInSql")); - assertEquals("___", ClassWriter.toIdentifier("___")); + assertEquals("_9isLegalInSql", ClassWriter.toJavaIdentifier( + "9isLegalInSql")); + assertEquals("___", ClassWriter.toJavaIdentifier("___")); } @Test