diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 5afd90c1..172e822b 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -18,9 +18,12 @@ package org.apache.sqoop.mapreduce; +import java.io.File; import java.io.IOException; import java.sql.SQLException; + import org.apache.avro.Schema; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; @@ -32,6 +35,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; + import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.config.ConfigurationHelper; import com.cloudera.sqoop.lib.LargeObjectLoader; @@ -83,12 +87,35 @@ protected void configureMapper(Job job, String tableName, AvroSchemaGenerator generator = new AvroSchemaGenerator(options, connManager, tableName); Schema schema = generator.generate(); + + try { + writeAvroSchema(schema); + } catch (final IOException e) { + LOG.error("Error while writing Avro schema.", e); + } + AvroJob.setMapOutputSchema(job.getConfiguration(), schema); } job.setMapperClass(getMapperClass()); } + private void writeAvroSchema(final Schema schema) throws IOException { + // Generate schema in JAR output directory. + final File schemaFile = new File(options.getJarOutputDir(), schema.getName() + ".avsc"); + + LOG.info("Writing Avro schema file: " + schemaFile); + FileUtils.forceMkdir(schemaFile.getParentFile()); + FileUtils.writeStringToFile(schemaFile, schema.toString(true), null); + + // Copy schema to code output directory. + try { + FileUtils.moveFileToDirectory(schemaFile, new File(options.getCodeOutputDir()), true); + } catch (final IOException e) { + LOG.debug("Could not move Avro schema file to code output directory.", e); + } + } + @Override protected Class getMapperClass() { if (options.getHCatTableName() != null) { diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java index 34a7d41a..440e76ce 100644 --- a/src/test/com/cloudera/sqoop/TestAvroImport.java +++ b/src/test/com/cloudera/sqoop/TestAvroImport.java @@ -18,6 +18,7 @@ package com.cloudera.sqoop; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -157,6 +158,8 @@ private void avroImportTestHelper(String[] extraArgs, String codec) if (codec != null) { assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC)); } + + checkSchemaFile(schema); } public void testOverrideTypeMapping() throws IOException { @@ -235,4 +238,9 @@ private DataFileReader read(Path filename) throws IOException { return new DataFileReader(fsInput, datumReader); } + private void checkSchemaFile(final Schema schema) throws IOException { + final File schemaFile = new File(schema.getName() + ".avsc"); + assertTrue(schemaFile.exists()); + assertEquals(schema, new Schema.Parser().parse(schemaFile)); + } }