diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index 31a75ebc..a00e8fb3 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -77,7 +77,8 @@ fi
# Add HBase to dependency list
if [ -e "$HBASE_HOME/bin/hbase" ]; then
- SQOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath`:${SQOOP_CLASSPATH}
+ TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`$HBASE_HOME/bin/hbase classpath`
+ SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
diff --git a/ivy.xml b/ivy.xml
index 48daccb1..b6c4830d 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -84,7 +84,21 @@
conf="common->default"/>
-
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 94ccd370..0f1de10a 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -16,6 +16,8 @@
# This properties file lists the versions of the various artifacts we use.
# It drives ivy and the generation of a maven POM
+avro.version=1.5.1
+
checkstyle.version=5.0
commons-cli.version=1.2
diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt
index 377f4036..b6055eac 100644
--- a/src/docs/man/import-args.txt
+++ b/src/docs/man/import-args.txt
@@ -23,6 +23,9 @@ Import control options
--append::
Append data to an existing HDFS dataset
+----as-avrodatafile::
+ Imports data to Avro Data Files
+
--as-sequencefile::
Imports data to SequenceFiles
diff --git a/src/docs/man/sqoop-import-all-tables.txt b/src/docs/man/sqoop-import-all-tables.txt
index be1142b3..ad3890ab 100644
--- a/src/docs/man/sqoop-import-all-tables.txt
+++ b/src/docs/man/sqoop-import-all-tables.txt
@@ -27,6 +27,9 @@ include::common-args.txt[]
Import control options
~~~~~~~~~~~~~~~~~~~~~~
+----as-avrodatafile::
+ Imports data to Avro Data Files
+
--as-sequencefile::
Imports data to SequenceFiles
diff --git a/src/docs/user/help.txt b/src/docs/user/help.txt
index cec89d88..d7f8d868 100644
--- a/src/docs/user/help.txt
+++ b/src/docs/user/help.txt
@@ -75,6 +75,7 @@ Common arguments:
--verbose Print more information while working
Import control arguments:
+ --as-avrodatafile Imports data to Avro Data Files
--as-sequencefile Imports data to SequenceFiles
--as-textfile Imports data as plain text (default)
...
diff --git a/src/docs/user/hive.txt b/src/docs/user/hive.txt
index 83744672..059d7cb7 100644
--- a/src/docs/user/hive.txt
+++ b/src/docs/user/hive.txt
@@ -40,7 +40,8 @@ multiple Hive installations, or +hive+ is not in your +$PATH+, use the
*+\--hive-home+* option to identify the Hive installation directory.
Sqoop will use +$HIVE_HOME/bin/hive+ from here.
-NOTE: This function is incompatible with +\--as-sequencefile+.
+NOTE: This function is incompatible with +\--as-avrodatafile+ and
++\--as-sequencefile+.
Even though Hive supports escaping characters, it does not
handle escaping of new-line character. Also, it does not support
diff --git a/src/docs/user/import-all-tables.txt b/src/docs/user/import-all-tables.txt
index 7674226f..b52fac31 100644
--- a/src/docs/user/import-all-tables.txt
+++ b/src/docs/user/import-all-tables.txt
@@ -44,6 +44,7 @@ include::common-args.txt[]
`----------------------------`---------------------------------------
Argument Description
---------------------------------------------------------------------
++\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--direct+ Use direct import fast path
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 24cda633..80c4f2d5 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -54,6 +54,7 @@ Argument Description
-------------------------------------------------------------------------
+\--append+ Append data to an existing dataset\
in HDFS
++\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--columns
+ Columns to import from table
diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java
index 5aa3ec4e..ac3e488a 100644
--- a/src/java/com/cloudera/sqoop/SqoopOptions.java
+++ b/src/java/com/cloudera/sqoop/SqoopOptions.java
@@ -77,7 +77,8 @@ public String toString() {
/** Selects in-HDFS destination file format. */
public enum FileLayout {
TextFile,
- SequenceFile
+ SequenceFile,
+ AvroDataFile
}
/**
diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java b/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
new file mode 100644
index 00000000..a084a20a
--- /dev/null
+++ b/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Imports records by transforming them to Avro records in an Avro data file.
+ */
+public class AvroImportMapper
+ extends AutoProgressMapper, NullWritable> {
+
+ private final AvroWrapper wrapper =
+ new AvroWrapper();
+ private Schema schema;
+
+ @Override
+ protected void setup(Context context) {
+ schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+ }
+
+ @Override
+ protected void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ wrapper.datum(toGenericRecord(val));
+ context.write(wrapper, NullWritable.get());
+ }
+
+
+ private GenericRecord toGenericRecord(SqoopRecord val) {
+ Map fieldMap = val.getFieldMap();
+ GenericRecord record = new GenericData.Record(schema);
+ for (Map.Entry entry : fieldMap.entrySet()) {
+ record.put(entry.getKey(), toAvro(entry.getValue()));
+ }
+ return record;
+ }
+
+ /**
+ * Convert the Avro representation of a Java type (that has already been
+ * converted from the SQL equivalent).
+ * @param o
+ * @return
+ */
+ private Object toAvro(Object o) {
+ if (o instanceof BigDecimal) {
+ return o.toString();
+ } else if (o instanceof Date) {
+ return ((Date) o).getTime();
+ } else if (o instanceof Time) {
+ return ((Time) o).getTime();
+ } else if (o instanceof Timestamp) {
+ return ((Timestamp) o).getTime();
+ } else if (o instanceof ClobRef) {
+ throw new UnsupportedOperationException("ClobRef not suported");
+ } else if (o instanceof BlobRef) {
+ throw new UnsupportedOperationException("BlobRef not suported");
+ }
+ // primitive types (Integer, etc) are left unchanged
+ return o;
+ }
+
+
+}
diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java b/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java
new file mode 100644
index 00000000..9fb84804
--- /dev/null
+++ b/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Helper class for setting up an Avro MapReduce job.
+ */
+public final class AvroJob {
+ public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+
+ private AvroJob() {
+ }
+
+ public static void setMapOutputSchema(Configuration job, Schema s) {
+ job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ }
+
+ /** Return a job's map output key schema. */
+ public static Schema getMapOutputSchema(Configuration job) {
+ return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+ }
+}
diff --git a/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java b/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java
new file mode 100644
index 00000000..dceee6bb
--- /dev/null
+++ b/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+public class AvroOutputFormat
+ extends FileOutputFormat, NullWritable> {
+
+ @Override
+ public RecordWriter, NullWritable> getRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+
+ Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+
+ final DataFileWriter WRITER =
+ new DataFileWriter(new GenericDatumWriter());
+
+ Path path = getDefaultWorkFile(context,
+ org.apache.avro.mapred.AvroOutputFormat.EXT);
+ WRITER.create(schema,
+ path.getFileSystem(context.getConfiguration()).create(path));
+
+ return new RecordWriter, NullWritable>() {
+ @Override
+ public void write(AvroWrapper wrapper, NullWritable ignore)
+ throws IOException {
+ WRITER.append(wrapper.datum());
+ }
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ WRITER.close();
+ }
+ };
+ }
+
+}
diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
index 471cc21b..661dde8d 100644
--- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
@@ -18,9 +18,19 @@
package com.cloudera.sqoop.mapreduce;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+
import java.io.IOException;
import java.sql.SQLException;
+import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
@@ -32,14 +42,6 @@
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-
/**
* Actually runs a jdbc import job using the ORM files generated by the
* sqoop.orm package. Uses DataDrivenDBInputFormat.
@@ -68,6 +70,13 @@ protected void configureMapper(Job job, String tableName,
// other types, we just use the defaults.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ ConnManager connManager = getContext().getConnManager();
+ AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+ connManager, tableName);
+ Schema schema = generator.generate();
+ AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
}
job.setMapperClass(getMapperClass());
@@ -80,6 +89,9 @@ protected Class extends Mapper> getMapperClass() {
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.SequenceFile) {
return SequenceFileImportMapper.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ return AvroImportMapper.class;
}
return null;
@@ -93,6 +105,9 @@ protected Class extends OutputFormat> getOutputFormatClass()
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.SequenceFile) {
return SequenceFileOutputFormat.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.AvroDataFile) {
+ return AvroOutputFormat.class;
}
return null;
diff --git a/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
new file mode 100644
index 00000000..1788141b
--- /dev/null
+++ b/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.orm;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+
+import java.io.IOException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+/**
+ * Creates an Avro schema to represent a table from a database.
+ */
+public class AvroSchemaGenerator {
+
+ private final SqoopOptions options;
+ private final ConnManager connManager;
+ private final String tableName;
+
+ public AvroSchemaGenerator(final SqoopOptions opts, final ConnManager connMgr,
+ final String table) {
+ this.options = opts;
+ this.connManager = connMgr;
+ this.tableName = table;
+ }
+
+ public Schema generate() throws IOException {
+ ClassWriter classWriter = new ClassWriter(options, connManager,
+ tableName, null);
+ Map columnTypes = classWriter.getColumnTypes();
+ String[] columnNames = classWriter.getColumnNames(columnTypes);
+
+ List fields = new ArrayList();
+ for (String columnName : columnNames) {
+ String cleanedCol = ClassWriter.toIdentifier(columnName);
+ int sqlType = columnTypes.get(cleanedCol);
+ Schema avroSchema = toAvroSchema(sqlType);
+ Field field = new Field(cleanedCol, avroSchema, null, null);
+ field.addProp("columnName", columnName);
+ field.addProp("sqlType", Integer.toString(sqlType));
+ fields.add(field);
+ }
+ String doc = "Sqoop import of " + tableName;
+ Schema schema = Schema.createRecord(tableName, doc, null, false);
+ schema.setFields(fields);
+ schema.addProp("tableName", tableName);
+ return schema;
+ }
+
+ private Type toAvroType(int sqlType) {
+ switch (sqlType) {
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ return Type.INT;
+ case Types.BIGINT:
+ return Type.LONG;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return Type.BOOLEAN;
+ case Types.REAL:
+ return Type.FLOAT;
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ return Type.DOUBLE;
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return Type.STRING;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ return Type.STRING;
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ return Type.LONG;
+ case Types.BINARY:
+ return Type.BYTES;
+ default:
+ throw new IllegalArgumentException("Cannot convert SQL type "
+ + sqlType);
+ }
+ }
+
+ public Schema toAvroSchema(int sqlType) {
+ return Schema.create(toAvroType(sqlType));
+ }
+
+}
diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java
index 61a0d693..6445706d 100644
--- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java
+++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java
@@ -161,8 +161,8 @@ private static boolean isReservedWord(String word) {
}
/**
- * Coerce a candidate name for an identifier into one which will
- * definitely compile.
+ * Coerce a candidate name for an identifier into one which is a valid
+ * Java or Avro identifier.
*
* Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]*
* and is not a reserved word.
@@ -204,8 +204,22 @@ public static String toIdentifier(String candidate) {
}
}
}
+ return sb.toString();
+ }
- String output = sb.toString();
+ /**
+ * Coerce a candidate name for an identifier into one which will
+ * definitely compile.
+ *
+ * Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]*
+ * and is not a reserved word.
+ *
+ * @param candidate A string we want to use as an identifier
+ * @return A string naming an identifier which compiles and is
+ * similar to the candidate.
+ */
+ public static String toJavaIdentifier(String candidate) {
+ String output = toIdentifier(candidate);
if (isReservedWord(output)) {
// e.g., 'class' -> '_class';
return "_" + output;
@@ -998,66 +1012,22 @@ private void generateHadoopWrite(Map columnTypes,
String [] cleanedColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
String col = colNames[i];
- String identifier = toIdentifier(col);
+ String identifier = toJavaIdentifier(col);
cleanedColNames[i] = identifier;
}
return cleanedColNames;
}
- private Map setupColumnTypes() throws IOException {
- Map columnTypes;
- if (null != tableName) {
- // We're generating a class based on a table import.
- columnTypes = connManager.getColumnTypes(tableName);
- } else {
- // This is based on an arbitrary query.
- String query = this.options.getSqlQuery();
- if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
- throw new IOException("Query [" + query + "] must contain '"
- + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
- }
+ /**
+ * Generate the ORM code for the class.
+ */
+ public void generate() throws IOException {
+ Map columnTypes = getColumnTypes();
- columnTypes = connManager.getColumnTypesForQuery(query);
- }
- return columnTypes;
- }
+ String[] colNames = getColumnNames(columnTypes);
- private String[] setupColNames(Map columnTypes) {
- String [] colNames = options.getColumns();
- if (null == colNames) {
- if (null != tableName) {
- // Table-based import. Read column names from table.
- colNames = connManager.getColumnNames(tableName);
- } else {
- // Infer/assign column names for arbitrary query.
- colNames = connManager.getColumnNamesForQuery(
- this.options.getSqlQuery());
- }
- } else {
- // These column names were provided by the user. They may not be in
- // the same case as the keys in the columnTypes map. So make sure
- // we add the appropriate aliases in that map.
- for (String userColName : colNames) {
- for (Map.Entry typeEntry : columnTypes.entrySet()) {
- String typeColName = typeEntry.getKey();
- if (typeColName.equalsIgnoreCase(userColName)
- && !typeColName.equals(userColName)) {
- // We found the correct-case equivalent.
- columnTypes.put(userColName, typeEntry.getValue());
- // No need to continue iteration; only one could match.
- // Also, the use of put() just invalidated the iterator.
- break;
- }
- }
- }
- }
- return colNames;
- }
-
- private String[] setupCleanedColNames(Map columnTypes,
- String[] colNames) {
// Translate all the column names into names that are safe to
// use as identifiers.
String [] cleanedColNames = cleanColNames(colNames);
@@ -1082,16 +1052,6 @@ private String[] setupCleanedColNames(Map columnTypes,
}
columnTypes.put(identifier, type);
}
- return cleanedColNames;
- }
-
- /**
- * Generate the ORM code for the class.
- */
- public void generate() throws IOException {
- Map columnTypes = setupColumnTypes();
- String[] colNames = setupColNames(columnTypes);
- String[] cleanedColNames = setupCleanedColNames(columnTypes, colNames);
// The db write() method may use column names in a different
// order. If this is set in the options, pull it out here and
@@ -1186,6 +1146,56 @@ public void generate() throws IOException {
}
}
+ protected String[] getColumnNames(Map columnTypes) {
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ if (null != tableName) {
+ // Table-based import. Read column names from table.
+ colNames = connManager.getColumnNames(tableName);
+ } else {
+ // Infer/assign column names for arbitrary query.
+ colNames = connManager.getColumnNamesForQuery(
+ this.options.getSqlQuery());
+ }
+ } else {
+ // These column names were provided by the user. They may not be in
+ // the same case as the keys in the columnTypes map. So make sure
+ // we add the appropriate aliases in that map.
+ for (String userColName : colNames) {
+ for (Map.Entry typeEntry : columnTypes.entrySet()) {
+ String typeColName = typeEntry.getKey();
+ if (typeColName.equalsIgnoreCase(userColName)
+ && !typeColName.equals(userColName)) {
+ // We found the correct-case equivalent.
+ columnTypes.put(userColName, typeEntry.getValue());
+ // No need to continue iteration; only one could match.
+ // Also, the use of put() just invalidated the iterator.
+ break;
+ }
+ }
+ }
+ }
+ return colNames;
+ }
+
+ protected Map getColumnTypes() throws IOException {
+ Map columnTypes;
+ if (null != tableName) {
+ // We're generating a class based on a table import.
+ columnTypes = connManager.getColumnTypes(tableName);
+ } else {
+ // This is based on an arbitrary query.
+ String query = this.options.getSqlQuery();
+ if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
+ throw new IOException("Query [" + query + "] must contain '"
+ + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
+ }
+
+ columnTypes = connManager.getColumnTypesForQuery(query);
+ }
+ return columnTypes;
+ }
+
/**
* Generate the ORM code for a table object containing the named columns.
* @param columnTypes - mapping from column names to sql types
diff --git a/src/java/com/cloudera/sqoop/orm/TableClassName.java b/src/java/com/cloudera/sqoop/orm/TableClassName.java
index d141e2fc..93ab0806 100644
--- a/src/java/com/cloudera/sqoop/orm/TableClassName.java
+++ b/src/java/com/cloudera/sqoop/orm/TableClassName.java
@@ -95,7 +95,7 @@ public String getClassForTable(String tableName) {
// no specific class; no specific package.
// Just make sure it's a legal identifier.
- return ClassWriter.toIdentifier(queryName);
+ return ClassWriter.toJavaIdentifier(queryName);
}
/**
diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
index 2bc0fe9a..a69f9aac 100644
--- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
@@ -86,6 +86,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
public static final String FMT_TEXTFILE_ARG = "as-textfile";
+ public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
public static final String HIVE_IMPORT_ARG = "hive-import";
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java
index 2245d03b..66e60bda 100644
--- a/src/java/com/cloudera/sqoop/tool/ImportTool.java
+++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java
@@ -509,6 +509,10 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data as plain text (default)")
.withLongOpt(FMT_TEXTFILE_ARG)
.create());
+ importOpts.addOption(OptionBuilder
+ .withDescription("Imports data to Avro data files")
+ .withLongOpt(FMT_AVRODATAFILE_ARG)
+ .create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
@@ -699,6 +703,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setFileLayout(SqoopOptions.FileLayout.TextFile);
}
+ if (in.hasOption(FMT_AVRODATAFILE_ARG)) {
+ out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
+ }
+
if (in.hasOption(NUM_MAPPERS_ARG)) {
out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
}
diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java
new file mode 100644
index 00000000..0e17f2e5
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Tests --as-avrodatafile.
+ */
+public class TestAvroImport extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(TestAvroImport.class.getName());
+
+ /**
+ * Create the argv to pass to Sqoop.
+ *
+ * @return the argv as an array of strings.
+ */
+ protected String[] getOutputArgv(boolean includeHadoopFlags) {
+ ArrayList args = new ArrayList();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("--table");
+ args.add(HsqldbTestServer.getTableName());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--split-by");
+ args.add("INTFIELD1");
+ args.add("--as-avrodatafile");
+
+ return args.toArray(new String[0]);
+ }
+
+ // this test just uses the two int table.
+ protected String getTableName() {
+ return HsqldbTestServer.getTableName();
+ }
+
+ public void testAvroImport() throws IOException {
+
+ runImport(getOutputArgv(true));
+
+ Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+ DataFileReader reader = read(outputFile);
+ Schema schema = reader.getSchema();
+ assertEquals(Schema.Type.RECORD, schema.getType());
+ List fields = schema.getFields();
+ assertEquals(2, fields.size());
+
+ assertEquals("INTFIELD1", fields.get(0).name());
+ assertEquals(Schema.Type.INT, fields.get(0).schema().getType());
+
+ assertEquals("INTFIELD2", fields.get(1).name());
+ assertEquals(Schema.Type.INT, fields.get(1).schema().getType());
+
+ GenericRecord record1 = reader.next();
+ assertEquals(1, record1.get("INTFIELD1"));
+ assertEquals(8, record1.get("INTFIELD2"));
+ }
+
+ private DataFileReader read(Path filename) throws IOException {
+ Configuration conf = new Configuration();
+ if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+ conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+ }
+ FsInput fsInput = new FsInput(filename, conf);
+ DatumReader datumReader =
+ new GenericDatumReader();
+ return new DataFileReader(fsInput, datumReader);
+ }
+
+}
diff --git a/src/test/com/cloudera/sqoop/orm/TestClassWriter.java b/src/test/com/cloudera/sqoop/orm/TestClassWriter.java
index 23dbd10c..761c8a33 100644
--- a/src/test/com/cloudera/sqoop/orm/TestClassWriter.java
+++ b/src/test/com/cloudera/sqoop/orm/TestClassWriter.java
@@ -291,7 +291,7 @@ public void testSetPackageName() {
// Test the SQL identifier -> Java identifier conversion.
@Test
- public void testIdentifierConversion() {
+ public void testJavaIdentifierConversion() {
assertNull(ClassWriter.getIdentifierStrForChar(' '));
assertNull(ClassWriter.getIdentifierStrForChar('\t'));
assertNull(ClassWriter.getIdentifierStrForChar('\r'));
@@ -300,14 +300,15 @@ public void testIdentifierConversion() {
assertEquals("_", ClassWriter.getIdentifierStrForChar('-'));
assertEquals("_", ClassWriter.getIdentifierStrForChar('_'));
- assertEquals("foo", ClassWriter.toIdentifier("foo"));
- assertEquals("_class", ClassWriter.toIdentifier("class"));
- assertEquals("_class", ClassWriter.toIdentifier("cla ss"));
- assertEquals("_int", ClassWriter.toIdentifier("int"));
- assertEquals("thisismanywords", ClassWriter.toIdentifier(
+ assertEquals("foo", ClassWriter.toJavaIdentifier("foo"));
+ assertEquals("_class", ClassWriter.toJavaIdentifier("class"));
+ assertEquals("_class", ClassWriter.toJavaIdentifier("cla ss"));
+ assertEquals("_int", ClassWriter.toJavaIdentifier("int"));
+ assertEquals("thisismanywords", ClassWriter.toJavaIdentifier(
"this is many words"));
- assertEquals("_9isLegalInSql", ClassWriter.toIdentifier("9isLegalInSql"));
- assertEquals("___", ClassWriter.toIdentifier("___"));
+ assertEquals("_9isLegalInSql", ClassWriter.toJavaIdentifier(
+ "9isLegalInSql"));
+ assertEquals("___", ClassWriter.toJavaIdentifier("___"));
}
@Test