5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 11:11:18 +08:00

SQOOP-207. Support import as Avro Data Files.

(Tom White via Arvind Prabhakar)

From: Arvind Prabhakar <arvind@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1150047 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:40 +00:00
parent 5eaafd3243
commit 44ffd1e75b
21 changed files with 576 additions and 85 deletions

View File

@ -77,7 +77,8 @@ fi
# Add HBase to dependency list
if [ -e "$HBASE_HOME/bin/hbase" ]; then
SQOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath`:${SQOOP_CLASSPATH}
TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`$HBASE_HOME/bin/hbase classpath`
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}

14
ivy.xml
View File

@ -84,6 +84,20 @@
conf="common->default"/>
<dependency org="commons-io" name="commons-io" rev="${commons-io.version}"
conf="common->default;redist->default"/>
<dependency org="org.apache.avro" name="avro" rev="${avro.version}"
conf="common->default;redist->default">
<exclude org="org.slf4j" module="slf4j-api"/>
<exclude org="org.mortbay.jetty" module="jetty"/>
<exclude org="org.jboss.netty" module="netty"/>
<exclude org="org.apache.velocity" module="velocity"/>
</dependency>
<dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
conf="common->default;redist->default">
<exclude org="org.slf4j" module="slf4j-api"/>
<exclude org="org.mortbay.jetty" module="jetty"/>
<exclude org="org.jboss.netty" module="netty"/>
<exclude org="org.apache.velocity" module="velocity"/>
</dependency>
<!-- dependencies for static analysis -->
<dependency org="checkstyle" name="checkstyle" rev="${checkstyle.version}"

View File

@ -16,6 +16,8 @@
# This properties file lists the versions of the various artifacts we use.
# It drives ivy and the generation of a maven POM
avro.version=1.5.1
checkstyle.version=5.0
commons-cli.version=1.2

View File

@ -23,6 +23,9 @@ Import control options
--append::
Append data to an existing HDFS dataset
----as-avrodatafile::
Imports data to Avro Data Files
--as-sequencefile::
Imports data to SequenceFiles

View File

@ -27,6 +27,9 @@ include::common-args.txt[]
Import control options
~~~~~~~~~~~~~~~~~~~~~~
----as-avrodatafile::
Imports data to Avro Data Files
--as-sequencefile::
Imports data to SequenceFiles

View File

@ -75,6 +75,7 @@ Common arguments:
--verbose Print more information while working
Import control arguments:
--as-avrodatafile Imports data to Avro Data Files
--as-sequencefile Imports data to SequenceFiles
--as-textfile Imports data as plain text (default)
...

View File

@ -40,7 +40,8 @@ multiple Hive installations, or +hive+ is not in your +$PATH+, use the
*+\--hive-home+* option to identify the Hive installation directory.
Sqoop will use +$HIVE_HOME/bin/hive+ from here.
NOTE: This function is incompatible with +\--as-sequencefile+.
NOTE: This function is incompatible with +\--as-avrodatafile+ and
+\--as-sequencefile+.
Even though Hive supports escaping characters, it does not
handle escaping of new-line character. Also, it does not support

View File

@ -44,6 +44,7 @@ include::common-args.txt[]
`----------------------------`---------------------------------------
Argument Description
---------------------------------------------------------------------
+\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--direct+ Use direct import fast path

View File

@ -54,6 +54,7 @@ Argument Description
-------------------------------------------------------------------------
+\--append+ Append data to an existing dataset\
in HDFS
+\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--columns <col,col,col...>+ Columns to import from table

View File

@ -77,7 +77,8 @@ public String toString() {
/** Selects in-HDFS destination file format. */
public enum FileLayout {
TextFile,
SequenceFile
SequenceFile,
AvroDataFile
}
/**

View File

@ -0,0 +1,97 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.SqoopRecord;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
/**
* Imports records by transforming them to Avro records in an Avro data file.
*/
public class AvroImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord,
AvroWrapper<GenericRecord>, NullWritable> {
private final AvroWrapper<GenericRecord> wrapper =
new AvroWrapper<GenericRecord>();
private Schema schema;
@Override
protected void setup(Context context) {
schema = AvroJob.getMapOutputSchema(context.getConfiguration());
}
@Override
protected void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
wrapper.datum(toGenericRecord(val));
context.write(wrapper, NullWritable.get());
}
private GenericRecord toGenericRecord(SqoopRecord val) {
Map<String, Object> fieldMap = val.getFieldMap();
GenericRecord record = new GenericData.Record(schema);
for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
record.put(entry.getKey(), toAvro(entry.getValue()));
}
return record;
}
/**
* Convert the Avro representation of a Java type (that has already been
* converted from the SQL equivalent).
* @param o
* @return
*/
private Object toAvro(Object o) {
if (o instanceof BigDecimal) {
return o.toString();
} else if (o instanceof Date) {
return ((Date) o).getTime();
} else if (o instanceof Time) {
return ((Time) o).getTime();
} else if (o instanceof Timestamp) {
return ((Timestamp) o).getTime();
} else if (o instanceof ClobRef) {
throw new UnsupportedOperationException("ClobRef not suported");
} else if (o instanceof BlobRef) {
throw new UnsupportedOperationException("BlobRef not suported");
}
// primitive types (Integer, etc) are left unchanged
return o;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
/**
* Helper class for setting up an Avro MapReduce job.
*/
public final class AvroJob {
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
private AvroJob() {
}
public static void setMapOutputSchema(Configuration job, Schema s) {
job.set(MAP_OUTPUT_SCHEMA, s.toString());
}
/** Return a job's map output key schema. */
public static Schema getMapOutputSchema(Configuration job) {
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
public class AvroOutputFormat<T>
extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
@Override
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
final DataFileWriter<T> WRITER =
new DataFileWriter<T>(new GenericDatumWriter<T>());
Path path = getDefaultWorkFile(context,
org.apache.avro.mapred.AvroOutputFormat.EXT);
WRITER.create(schema,
path.getFileSystem(context.getConfiguration()).create(path));
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
@Override
public void write(AvroWrapper<T> wrapper, NullWritable ignore)
throws IOException {
WRITER.append(wrapper.datum());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
WRITER.close();
}
};
}
}

View File

@ -18,9 +18,19 @@
package com.cloudera.sqoop.mapreduce;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
@ -32,14 +42,6 @@
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
/**
* Actually runs a jdbc import job using the ORM files generated by the
* sqoop.orm package. Uses DataDrivenDBInputFormat.
@ -68,6 +70,13 @@ protected void configureMapper(Job job, String tableName,
// other types, we just use the defaults.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
ConnManager connManager = getContext().getConnManager();
AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
connManager, tableName);
Schema schema = generator.generate();
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
}
job.setMapperClass(getMapperClass());
@ -80,6 +89,9 @@ protected Class<? extends Mapper> getMapperClass() {
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.SequenceFile) {
return SequenceFileImportMapper.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
return AvroImportMapper.class;
}
return null;
@ -93,6 +105,9 @@ protected Class<? extends OutputFormat> getOutputFormatClass()
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.SequenceFile) {
return SequenceFileOutputFormat.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
return AvroOutputFormat.class;
}
return null;

View File

@ -0,0 +1,112 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.orm;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import java.io.IOException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
/**
* Creates an Avro schema to represent a table from a database.
*/
public class AvroSchemaGenerator {
private final SqoopOptions options;
private final ConnManager connManager;
private final String tableName;
public AvroSchemaGenerator(final SqoopOptions opts, final ConnManager connMgr,
final String table) {
this.options = opts;
this.connManager = connMgr;
this.tableName = table;
}
public Schema generate() throws IOException {
ClassWriter classWriter = new ClassWriter(options, connManager,
tableName, null);
Map<String, Integer> columnTypes = classWriter.getColumnTypes();
String[] columnNames = classWriter.getColumnNames(columnTypes);
List<Field> fields = new ArrayList<Field>();
for (String columnName : columnNames) {
String cleanedCol = ClassWriter.toIdentifier(columnName);
int sqlType = columnTypes.get(cleanedCol);
Schema avroSchema = toAvroSchema(sqlType);
Field field = new Field(cleanedCol, avroSchema, null, null);
field.addProp("columnName", columnName);
field.addProp("sqlType", Integer.toString(sqlType));
fields.add(field);
}
String doc = "Sqoop import of " + tableName;
Schema schema = Schema.createRecord(tableName, doc, null, false);
schema.setFields(fields);
schema.addProp("tableName", tableName);
return schema;
}
private Type toAvroType(int sqlType) {
switch (sqlType) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
return Type.INT;
case Types.BIGINT:
return Type.LONG;
case Types.BIT:
case Types.BOOLEAN:
return Type.BOOLEAN;
case Types.REAL:
return Type.FLOAT;
case Types.FLOAT:
case Types.DOUBLE:
return Type.DOUBLE;
case Types.NUMERIC:
case Types.DECIMAL:
return Type.STRING;
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
return Type.STRING;
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
return Type.LONG;
case Types.BINARY:
return Type.BYTES;
default:
throw new IllegalArgumentException("Cannot convert SQL type "
+ sqlType);
}
}
public Schema toAvroSchema(int sqlType) {
return Schema.create(toAvroType(sqlType));
}
}

View File

@ -161,8 +161,8 @@ private static boolean isReservedWord(String word) {
}
/**
* Coerce a candidate name for an identifier into one which will
* definitely compile.
* Coerce a candidate name for an identifier into one which is a valid
* Java or Avro identifier.
*
* Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]*
* and is not a reserved word.
@ -204,8 +204,22 @@ public static String toIdentifier(String candidate) {
}
}
}
return sb.toString();
}
String output = sb.toString();
/**
* Coerce a candidate name for an identifier into one which will
* definitely compile.
*
* Ensures that the returned identifier matches [A-Za-z_][A-Za-z0-9_]*
* and is not a reserved word.
*
* @param candidate A string we want to use as an identifier
* @return A string naming an identifier which compiles and is
* similar to the candidate.
*/
public static String toJavaIdentifier(String candidate) {
String output = toIdentifier(candidate);
if (isReservedWord(output)) {
// e.g., 'class' -> '_class';
return "_" + output;
@ -998,66 +1012,22 @@ private void generateHadoopWrite(Map<String, Integer> columnTypes,
String [] cleanedColNames = new String[colNames.length];
for (int i = 0; i < colNames.length; i++) {
String col = colNames[i];
String identifier = toIdentifier(col);
String identifier = toJavaIdentifier(col);
cleanedColNames[i] = identifier;
}
return cleanedColNames;
}
private Map<String, Integer> setupColumnTypes() throws IOException {
Map<String, Integer> columnTypes;
if (null != tableName) {
// We're generating a class based on a table import.
columnTypes = connManager.getColumnTypes(tableName);
} else {
// This is based on an arbitrary query.
String query = this.options.getSqlQuery();
if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
throw new IOException("Query [" + query + "] must contain '"
+ SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
}
/**
* Generate the ORM code for the class.
*/
public void generate() throws IOException {
Map<String, Integer> columnTypes = getColumnTypes();
columnTypes = connManager.getColumnTypesForQuery(query);
}
return columnTypes;
}
String[] colNames = getColumnNames(columnTypes);
private String[] setupColNames(Map<String, Integer> columnTypes) {
String [] colNames = options.getColumns();
if (null == colNames) {
if (null != tableName) {
// Table-based import. Read column names from table.
colNames = connManager.getColumnNames(tableName);
} else {
// Infer/assign column names for arbitrary query.
colNames = connManager.getColumnNamesForQuery(
this.options.getSqlQuery());
}
} else {
// These column names were provided by the user. They may not be in
// the same case as the keys in the columnTypes map. So make sure
// we add the appropriate aliases in that map.
for (String userColName : colNames) {
for (Map.Entry<String, Integer> typeEntry : columnTypes.entrySet()) {
String typeColName = typeEntry.getKey();
if (typeColName.equalsIgnoreCase(userColName)
&& !typeColName.equals(userColName)) {
// We found the correct-case equivalent.
columnTypes.put(userColName, typeEntry.getValue());
// No need to continue iteration; only one could match.
// Also, the use of put() just invalidated the iterator.
break;
}
}
}
}
return colNames;
}
private String[] setupCleanedColNames(Map<String, Integer> columnTypes,
String[] colNames) {
// Translate all the column names into names that are safe to
// use as identifiers.
String [] cleanedColNames = cleanColNames(colNames);
@ -1082,16 +1052,6 @@ private String[] setupCleanedColNames(Map<String, Integer> columnTypes,
}
columnTypes.put(identifier, type);
}
return cleanedColNames;
}
/**
* Generate the ORM code for the class.
*/
public void generate() throws IOException {
Map<String, Integer> columnTypes = setupColumnTypes();
String[] colNames = setupColNames(columnTypes);
String[] cleanedColNames = setupCleanedColNames(columnTypes, colNames);
// The db write() method may use column names in a different
// order. If this is set in the options, pull it out here and
@ -1186,6 +1146,56 @@ public void generate() throws IOException {
}
}
protected String[] getColumnNames(Map<String, Integer> columnTypes) {
String [] colNames = options.getColumns();
if (null == colNames) {
if (null != tableName) {
// Table-based import. Read column names from table.
colNames = connManager.getColumnNames(tableName);
} else {
// Infer/assign column names for arbitrary query.
colNames = connManager.getColumnNamesForQuery(
this.options.getSqlQuery());
}
} else {
// These column names were provided by the user. They may not be in
// the same case as the keys in the columnTypes map. So make sure
// we add the appropriate aliases in that map.
for (String userColName : colNames) {
for (Map.Entry<String, Integer> typeEntry : columnTypes.entrySet()) {
String typeColName = typeEntry.getKey();
if (typeColName.equalsIgnoreCase(userColName)
&& !typeColName.equals(userColName)) {
// We found the correct-case equivalent.
columnTypes.put(userColName, typeEntry.getValue());
// No need to continue iteration; only one could match.
// Also, the use of put() just invalidated the iterator.
break;
}
}
}
}
return colNames;
}
protected Map<String, Integer> getColumnTypes() throws IOException {
Map<String, Integer> columnTypes;
if (null != tableName) {
// We're generating a class based on a table import.
columnTypes = connManager.getColumnTypes(tableName);
} else {
// This is based on an arbitrary query.
String query = this.options.getSqlQuery();
if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
throw new IOException("Query [" + query + "] must contain '"
+ SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
}
columnTypes = connManager.getColumnTypesForQuery(query);
}
return columnTypes;
}
/**
* Generate the ORM code for a table object containing the named columns.
* @param columnTypes - mapping from column names to sql types

View File

@ -95,7 +95,7 @@ public String getClassForTable(String tableName) {
// no specific class; no specific package.
// Just make sure it's a legal identifier.
return ClassWriter.toIdentifier(queryName);
return ClassWriter.toJavaIdentifier(queryName);
}
/**

View File

@ -86,6 +86,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
public static final String FMT_TEXTFILE_ARG = "as-textfile";
public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
public static final String HIVE_IMPORT_ARG = "hive-import";
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";

View File

@ -509,6 +509,10 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data as plain text (default)")
.withLongOpt(FMT_TEXTFILE_ARG)
.create());
importOpts.addOption(OptionBuilder
.withDescription("Imports data to Avro data files")
.withLongOpt(FMT_AVRODATAFILE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
@ -699,6 +703,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setFileLayout(SqoopOptions.FileLayout.TextFile);
}
if (in.hasOption(FMT_AVRODATAFILE_ARG)) {
out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
}
if (in.hasOption(NUM_MAPPERS_ARG)) {
out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* Tests --as-avrodatafile.
*/
public class TestAvroImport extends ImportJobTestCase {
public static final Log LOG = LogFactory
.getLog(TestAvroImport.class.getName());
/**
* Create the argv to pass to Sqoop.
*
* @return the argv as an array of strings.
*/
protected String[] getOutputArgv(boolean includeHadoopFlags) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
args.add("--table");
args.add(HsqldbTestServer.getTableName());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--split-by");
args.add("INTFIELD1");
args.add("--as-avrodatafile");
return args.toArray(new String[0]);
}
// this test just uses the two int table.
protected String getTableName() {
return HsqldbTestServer.getTableName();
}
public void testAvroImport() throws IOException {
runImport(getOutputArgv(true));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
Schema schema = reader.getSchema();
assertEquals(Schema.Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(2, fields.size());
assertEquals("INTFIELD1", fields.get(0).name());
assertEquals(Schema.Type.INT, fields.get(0).schema().getType());
assertEquals("INTFIELD2", fields.get(1).name());
assertEquals(Schema.Type.INT, fields.get(1).schema().getType());
GenericRecord record1 = reader.next();
assertEquals(1, record1.get("INTFIELD1"));
assertEquals(8, record1.get("INTFIELD2"));
}
private DataFileReader<GenericRecord> read(Path filename) throws IOException {
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FsInput fsInput = new FsInput(filename, conf);
DatumReader<GenericRecord> datumReader =
new GenericDatumReader<GenericRecord>();
return new DataFileReader<GenericRecord>(fsInput, datumReader);
}
}

View File

@ -291,7 +291,7 @@ public void testSetPackageName() {
// Test the SQL identifier -> Java identifier conversion.
@Test
public void testIdentifierConversion() {
public void testJavaIdentifierConversion() {
assertNull(ClassWriter.getIdentifierStrForChar(' '));
assertNull(ClassWriter.getIdentifierStrForChar('\t'));
assertNull(ClassWriter.getIdentifierStrForChar('\r'));
@ -300,14 +300,15 @@ public void testIdentifierConversion() {
assertEquals("_", ClassWriter.getIdentifierStrForChar('-'));
assertEquals("_", ClassWriter.getIdentifierStrForChar('_'));
assertEquals("foo", ClassWriter.toIdentifier("foo"));
assertEquals("_class", ClassWriter.toIdentifier("class"));
assertEquals("_class", ClassWriter.toIdentifier("cla ss"));
assertEquals("_int", ClassWriter.toIdentifier("int"));
assertEquals("thisismanywords", ClassWriter.toIdentifier(
assertEquals("foo", ClassWriter.toJavaIdentifier("foo"));
assertEquals("_class", ClassWriter.toJavaIdentifier("class"));
assertEquals("_class", ClassWriter.toJavaIdentifier("cla ss"));
assertEquals("_int", ClassWriter.toJavaIdentifier("int"));
assertEquals("thisismanywords", ClassWriter.toJavaIdentifier(
"this is many words"));
assertEquals("_9isLegalInSql", ClassWriter.toIdentifier("9isLegalInSql"));
assertEquals("___", ClassWriter.toIdentifier("___"));
assertEquals("_9isLegalInSql", ClassWriter.toJavaIdentifier(
"9isLegalInSql"));
assertEquals("___", ClassWriter.toJavaIdentifier("___"));
}
@Test