5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 19:50:39 +08:00

SQOOP-362. Allow user to override type mapping when doing Avro import.

(Jarcec Cecho via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1205233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arvind Prabhakar 2011-11-22 23:30:10 +00:00
parent f31f577f08
commit 1bf7cbf81a
6 changed files with 71 additions and 16 deletions

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.conf.Configuration;
/**
* Configurable state used by Sqoop tools.
* @deprecated
*/
public class SqoopOptions
extends org.apache.sqoop.SqoopOptions implements Cloneable {
@ -90,6 +90,7 @@ public static void clearNonceDir() {
/**
* {@inheritDoc}.
* @deprecated
*/
public static class InvalidOptionsException
extends org.apache.sqoop.SqoopOptions.InvalidOptionsException {

View File

@ -1011,7 +1011,7 @@ public void setMapColumnHive(String mapColumn) {
parseColumnMapping(mapColumn, mapColumnHive);
}
public void setMapColumn(String mapColumn) {
public void setMapColumnJava(String mapColumn) {
parseColumnMapping(mapColumn, mapColumnJava);
}

View File

@ -30,6 +30,7 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import java.util.Properties;
/**
* Creates an Avro schema to represent a table from a database.
@ -57,7 +58,7 @@ public Schema generate() throws IOException {
for (String columnName : columnNames) {
String cleanedCol = ClassWriter.toIdentifier(columnName);
int sqlType = columnTypes.get(cleanedCol);
Schema avroSchema = toAvroSchema(sqlType);
Schema avroSchema = toAvroSchema(sqlType, columnName);
Field field = new Field(cleanedCol, avroSchema, null, null);
field.addProp("columnName", columnName);
field.addProp("sqlType", Integer.toString(sqlType));
@ -112,13 +113,44 @@ private Type toAvroType(int sqlType) {
}
}
public Schema toAvroSchema(int sqlType) {
// All types are assumed nullabl;e make a union of the "true" type for
// a column and NULL.
private Type toAvroType(String type) {
if(type.equalsIgnoreCase("INTEGER")) { return Type.INT; }
if(type.equalsIgnoreCase("LONG")) { return Type.LONG; }
if(type.equalsIgnoreCase("BOOLEAN")) { return Type.BOOLEAN; }
if(type.equalsIgnoreCase("FLOAT")) { return Type.FLOAT; }
if(type.equalsIgnoreCase("DOUBLE")) { return Type.DOUBLE; }
if(type.equalsIgnoreCase("STRING")) { return Type.STRING; }
if(type.equalsIgnoreCase("BYTES")) { return Type.BYTES; }
// Mapping was not found
throw new IllegalArgumentException("Cannot convert to AVRO type " + type);
}
/**
* Will create union, because each type is assumed to be nullable.
*
* @param sqlType Original SQL type (might be overridden by user)
* @param columnName Column name from the query
* @return Schema
*/
public Schema toAvroSchema(int sqlType, String columnName) {
Properties mappingJava = options.getMapColumnJava();
// Try to apply any user specified mapping
Type targetType;
if(columnName != null && mappingJava.containsKey(columnName)) {
targetType = toAvroType((String)mappingJava.get(columnName));
} else {
targetType = toAvroType(sqlType);
}
List<Schema> childSchemas = new ArrayList<Schema>();
childSchemas.add(Schema.create(toAvroType(sqlType)));
childSchemas.add(Schema.create(targetType));
childSchemas.add(Schema.create(Schema.Type.NULL));
return Schema.createUnion(childSchemas);
}
public Schema toAvroSchema(int sqlType) {
return toAvroSchema(sqlType, null);
}
}

View File

@ -861,7 +861,7 @@ protected void applyCodeGenOptions(CommandLine in, SqoopOptions out,
}
if (in.hasOption(MAP_COLUMN_JAVA)) {
out.setMapColumn(in.getOptionValue(MAP_COLUMN_JAVA));
out.setMapColumnJava(in.getOptionValue(MAP_COLUMN_JAVA));
}
if (!multiTable && in.hasOption(CLASS_NAME_ARG)) {

View File

@ -38,7 +38,6 @@
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.FileLayout;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
@ -838,10 +837,6 @@ protected void validateImportOptions(SqoopOptions options)
"MySQL direct export currently supports only text output format."
+ "Parameters --as-sequencefile and --as-avrodatafile are not "
+ "supported with --direct params in MySQL case.");
} else if (!options.getMapColumnJava().isEmpty()
&& options.getFileLayout() == FileLayout.AvroDataFile) {
throw new InvalidOptionsException(
"Overriding column types is currently not supported with avro.");
}
}

View File

@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
@ -56,7 +57,8 @@ public class TestAvroImport extends ImportJobTestCase {
*
* @return the argv as an array of strings.
*/
protected String[] getOutputArgv(boolean includeHadoopFlags) {
protected String[] getOutputArgv(boolean includeHadoopFlags,
String[] extraArgs) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
@ -72,6 +74,9 @@ protected String[] getOutputArgv(boolean includeHadoopFlags) {
args.add("--split-by");
args.add("INTFIELD1");
args.add("--as-avrodatafile");
if(extraArgs != null) {
args.addAll(Arrays.asList(extraArgs));
}
return args.toArray(new String[0]);
}
@ -84,7 +89,7 @@ public void testAvroImport() throws IOException {
"'s'", "'0102'", };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true));
runImport(getOutputArgv(true, null));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
@ -115,6 +120,28 @@ public void testAvroImport() throws IOException {
assertEquals((byte) 2, b.get(1));
}
public void testOverrideTypeMapping() throws IOException {
String [] types = { "INT" };
String [] vals = { "10" };
createTableWithColTypes(types, vals);
String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
runImport(getOutputArgv(true, extraArgs));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
Schema schema = reader.getSchema();
assertEquals(Schema.Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
checkField(fields.get(0), "DATA_COL0", Schema.Type.STRING);
GenericRecord record1 = reader.next();
assertEquals("DATA_COL0", new Utf8("10"), record1.get("DATA_COL0"));
}
private void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Schema.Type.UNION, field.schema().getType());
@ -127,7 +154,7 @@ public void testNullableAvroImport() throws IOException, SQLException {
String [] vals = { null };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true));
runImport(getOutputArgv(true, null));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);