5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-19 18:31:03 +08:00

SQOOP-2294: Change to Avro schema name breaks some use cases

(Qian Xu via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-04-21 07:02:37 -07:00
parent 08c61a11e1
commit 67e8c9a9e8
3 changed files with 25 additions and 10 deletions

View File

@ -23,8 +23,6 @@
import java.sql.SQLException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -88,7 +86,8 @@ protected void configureMapper(Job job, String tableName,
job.setOutputValueClass(NullWritable.class);
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
Schema schema = generateAvroSchema(tableName);
final String schemaNameOverride = null;
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
try {
writeAvroSchema(schema);
} catch (final IOException e) {
@ -99,9 +98,12 @@ protected void configureMapper(Job job, String tableName,
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
Configuration conf = job.getConfiguration();
// An Avro schema is required for creating a dataset that manages
// Parquet data records. The import will fail, if schema is invalid.
Schema schema = generateAvroSchema(tableName);
// Kite SDK requires an Avro schema to represent the data structure of
// target dataset. If the schema name equals to generated java class name,
// the import will fail. So we use table name as schema name and add a
// prefix "codegen_" to generated java class to avoid the conflict.
final String schemaNameOverride = tableName;
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
String uri = getKiteUri(conf, tableName);
ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode(),
options.doHiveImport() && options.doOverwriteHiveTable());
@ -123,11 +125,12 @@ private String getKiteUri(Configuration conf, String tableName) throws IOExcepti
}
}
private Schema generateAvroSchema(String tableName) throws IOException {
private Schema generateAvroSchema(String tableName,
String schemaNameOverride) throws IOException {
ConnManager connManager = getContext().getConnManager();
AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
connManager, tableName);
return generator.generate();
return generator.generate(schemaNameOverride);
}
private void writeAvroSchema(final Schema schema) throws IOException {

View File

@ -55,7 +55,7 @@ public AvroSchemaGenerator(final SqoopOptions opts, final ConnManager connMgr,
this.tableName = table;
}
public Schema generate() throws IOException {
public Schema generate(String schemaNameOverride) throws IOException {
ClassWriter classWriter = new ClassWriter(options, connManager,
tableName, null);
Map<String, Integer> columnTypes = classWriter.getColumnTypes();
@ -75,7 +75,8 @@ public Schema generate() throws IOException {
TableClassName tableClassName = new TableClassName(options);
String shortClassName = tableClassName.getShortClassForTable(tableName);
String avroTableName = (tableName == null ? TableClassName.QUERY_RESULT : tableName);
String avroName = "sqoop_import_" + (shortClassName == null ? avroTableName : shortClassName);
String avroName = schemaNameOverride != null ? schemaNameOverride :
(shortClassName == null ? avroTableName : shortClassName);
String avroNamespace = tableClassName.getPackageForTable();
String doc = "Sqoop import of " + avroTableName;

View File

@ -90,6 +90,17 @@ public String generateORM(SqoopOptions options, String tableName)
return null;
}
LOG.info("Beginning code generation");
if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
String className = options.getClassName() != null ?
options.getClassName() : options.getTableName();
if (className.equalsIgnoreCase(options.getTableName())) {
className = "codegen_" + className;
options.setClassName(className);
LOG.info("Will generate java class as " + options.getClassName());
}
}
CompilationManager compileMgr = new CompilationManager(options);
ClassWriter classWriter = new ClassWriter(options, manager, tableName,
compileMgr);