From 40856655c33f39e67d449f138163e57e7f3d626a Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sat, 4 Feb 2012 21:37:08 +0000 Subject: [PATCH] SQOOP-428. AvroOutputFormat doesn't support compression even though documentation claims it does (Lars Francke via Jarek Jarcec Cecho) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1240613 13f79535-47bb-0310-9956-ffa450edef68 --- src/java/com/cloudera/sqoop/io/CodecMap.java | 11 +++ src/java/org/apache/sqoop/io/CodecMap.java | 43 ++++++++++- .../org/apache/sqoop/mapreduce/AvroJob.java | 10 ++- .../sqoop/mapreduce/AvroOutputFormat.java | 73 ++++++++++++++++--- .../apache/sqoop/mapreduce/ImportJobBase.java | 24 +++++- .../com/cloudera/sqoop/TestAvroImport.java | 49 +++++++++++-- .../com/cloudera/sqoop/io/TestCodecMap.java | 17 +++++ 7 files changed, 208 insertions(+), 19 deletions(-) diff --git a/src/java/com/cloudera/sqoop/io/CodecMap.java b/src/java/com/cloudera/sqoop/io/CodecMap.java index ffe949bc..647cedcd 100644 --- a/src/java/com/cloudera/sqoop/io/CodecMap.java +++ b/src/java/com/cloudera/sqoop/io/CodecMap.java @@ -70,4 +70,15 @@ public static CompressionCodec getCodec(String codecName, public static Set getCodecNames() { return org.apache.sqoop.io.CodecMap.getCodecNames(); } + + /** + * Return the short name of the codec. + * See {@link org.apache.sqoop.io.CodecMap#getCodecShortNameByName(String, + * Configuration)}. + */ + public static String getCodecShortNameByName(String codecName, + Configuration conf) throws UnsupportedCodecException { + return org.apache.sqoop.io.CodecMap + .getCodecShortNameByName(codecName, conf); + } } diff --git a/src/java/org/apache/sqoop/io/CodecMap.java b/src/java/org/apache/sqoop/io/CodecMap.java index 5b672061..cec93583 100644 --- a/src/java/org/apache/sqoop/io/CodecMap.java +++ b/src/java/org/apache/sqoop/io/CodecMap.java @@ -49,7 +49,7 @@ public final class CodecMap { codecNames.put(NONE, null); codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec"); codecNames.put(LZO, "com.hadoop.compression.lzo.LzoCodec"); - codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec"); + codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec"); // add more from Hadoop CompressionCodecFactory for (Class cls @@ -135,7 +135,7 @@ public static Set getCodecNames() { *

* Note: When HADOOP-7323 is available this method can be replaced with a call * to CompressionCodecFactory. - * @param classname the canonical class name of the codec or the codec alias + * @param codecName the canonical class name of the codec or the codec alias * @return the codec object or null if none matching the name were found */ private static CompressionCodec getCodecByName(String codecName, @@ -150,6 +150,45 @@ private static CompressionCodec getCodecByName(String codecName, return null; } + /** + * Gets the short name for a specified codec. See {@link + * #getCodecByName(String, Configuration)} for details. The name returned + * here is the shortest possible one that means a {@code Codec} part is + * removed as well. + * + * @param codecName name of the codec to return the short name for + * @param conf job configuration object used to get the registered + * compression codecs + * + * @return the short name of the codec + * + * @throws com.cloudera.sqoop.io.UnsupportedCodecException + * if no short name could be found + */ + public static String getCodecShortNameByName(String codecName, + Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException { + if (codecNames.containsKey(codecName)) { + return codecName; + } + + CompressionCodec codec = getCodecByName(codecName, conf); + Class codecClass = null; + if (codec != null) { + codecClass = codec.getClass(); + } + + if (codecClass != null) { + String simpleName = codecClass.getSimpleName(); + if (simpleName.endsWith("Codec")) { + simpleName = + simpleName.substring(0, simpleName.length() - "Codec".length()); + } + return simpleName.toLowerCase(); + } + + throw new com.cloudera.sqoop.io.UnsupportedCodecException( + "Cannot find codec class " + codecName + " for codec " + codecName); + } private static boolean codecMatches(Class cls, String codecName) { diff --git a/src/java/org/apache/sqoop/mapreduce/AvroJob.java b/src/java/org/apache/sqoop/mapreduce/AvroJob.java index a57aaf1d..bb4755c8 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroJob.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroJob.java @@ -27,6 +27,9 @@ public final class AvroJob { public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema"; + /** The configuration key for a job's output schema. */ + public static final String OUTPUT_SCHEMA = "avro.output.schema"; + private AvroJob() { } @@ -36,6 +39,11 @@ public static void setMapOutputSchema(Configuration job, Schema s) { /** Return a job's map output key schema. */ public static Schema getMapOutputSchema(Configuration job) { - return Schema.parse(job.get(MAP_OUTPUT_SCHEMA)); + return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA))); + } + + /** Return a job's output key schema. */ + public static Schema getOutputSchema(Configuration job) { + return Schema.parse(job.get(OUTPUT_SCHEMA)); } } diff --git a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java index 96befd71..aed1e720 100644 --- a/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java @@ -19,33 +19,85 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.Map; + import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */ +import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL; +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; +import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL; +import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY; +import static org.apache.avro.mapred.AvroOutputFormat.EXT; +import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY; + +/** + * An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. + *

+ * Note: This class is copied from the Avro project in version 1.5.4 and + * adapted here to work with the "new" MapReduce API that's required in Sqoop. + */ public class AvroOutputFormat extends FileOutputFormat, NullWritable> { + static void configureDataFileWriter(DataFileWriter writer, + TaskAttemptContext context) throws UnsupportedEncodingException { + if (FileOutputFormat.getCompressOutput(context)) { + int level = context.getConfiguration() + .getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL); + String codecName = context.getConfiguration() + .get(org.apache.avro.mapred.AvroJob.OUTPUT_CODEC, DEFLATE_CODEC); + CodecFactory factory = + codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level) + : CodecFactory.fromString(codecName); + writer.setCodec(factory); + } + + writer.setSyncInterval(context.getConfiguration() + .getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL)); + + // copy metadata from job + for (Map.Entry e : context.getConfiguration()) { + if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.TEXT_PREFIX)) { + writer.setMeta(e.getKey() + .substring(org.apache.avro.mapred.AvroJob.TEXT_PREFIX.length()), + e.getValue()); + } + if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.BINARY_PREFIX)) { + writer.setMeta(e.getKey() + .substring(org.apache.avro.mapred.AvroJob.BINARY_PREFIX.length()), + URLDecoder.decode(e.getValue(), "ISO-8859-1").getBytes("ISO-8859-1")); + } + } + } + @Override public RecordWriter, NullWritable> getRecordWriter( - TaskAttemptContext context) throws IOException, InterruptedException { + TaskAttemptContext context) throws IOException, InterruptedException { - Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration()); + boolean isMapOnly = context.getNumReduceTasks() == 0; + Schema schema = + isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration()) + : AvroJob.getOutputSchema(context.getConfiguration()); final DataFileWriter WRITER = - new DataFileWriter(new GenericDatumWriter()); + new DataFileWriter(new ReflectDatumWriter()); - Path path = getDefaultWorkFile(context, - org.apache.avro.mapred.AvroOutputFormat.EXT); + configureDataFileWriter(WRITER, context); + + Path path = getDefaultWorkFile(context, EXT); WRITER.create(schema, - path.getFileSystem(context.getConfiguration()).create(path)); + path.getFileSystem(context.getConfiguration()).create(path)); return new RecordWriter, NullWritable>() { @Override @@ -53,9 +105,10 @@ public void write(AvroWrapper wrapper, NullWritable ignore) throws IOException { WRITER.append(wrapper.datum()); } + @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { + public void close(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { WRITER.close(); } }; diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index ed6954a8..7788d352 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -19,6 +19,9 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; + +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.mapred.AvroJob; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -98,7 +101,26 @@ protected void configureOutputFormat(Job job, String tableName, if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) { SequenceFileOutputFormat.setOutputCompressionType(job, - CompressionType.BLOCK); + CompressionType.BLOCK); + } + + // SQOOP-428: Avro expects not a fully qualified class name but a "short" + // name instead (e.g. "snappy") and it needs to be set in a custom + // configuration option called "avro.output.codec". + // The default codec is "deflate". + if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) { + if (codecName != null) { + String shortName = + CodecMap.getCodecShortNameByName(codecName, job.getConfiguration()); + // Avro only knows about "deflate" and not "default" + if (shortName.equalsIgnoreCase("default")) { + shortName = "deflate"; + } + job.getConfiguration().set(AvroJob.OUTPUT_CODEC, shortName); + } else { + job.getConfiguration() + .set(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC); + } } } diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java index 06834fc8..32d1bbea 100644 --- a/src/test/com/cloudera/sqoop/TestAvroImport.java +++ b/src/test/com/cloudera/sqoop/TestAvroImport.java @@ -28,6 +28,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -82,14 +83,48 @@ protected String[] getOutputArgv(boolean includeHadoopFlags, } public void testAvroImport() throws IOException { + avroImportTestHelper(null, null); + } - String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", - "VARCHAR(6)", "VARBINARY(2)", }; - String [] vals = { "true", "100", "200", "1.0", "2.0", - "'s'", "'0102'", }; + public void testDeflateCompressedAvroImport() throws IOException { + avroImportTestHelper(new String[] {"--compression-codec", + "org.apache.hadoop.io.compress.DefaultCodec", }, "deflate"); + } + + public void testDefaultCompressedAvroImport() throws IOException { + avroImportTestHelper(new String[] {"--compress", }, "deflate"); + } + + public void testUnsupportedCodec() throws IOException { + try { + avroImportTestHelper(new String[] {"--compression-codec", "foobar", }, + null); + fail("Expected IOException"); + } catch (IOException e) { + // Exception is expected + } + } + + /** + * Helper method that runs an import using Avro with optional command line + * arguments and checks that the created file matches the expectations. + *

+ * This can be used to test various extra options that are implemented for + * the Avro input. + * + * @param extraArgs extra command line arguments to pass to Sqoop in addition + * to those that {@link #getOutputArgv(boolean, String[])} + * returns + */ + private void avroImportTestHelper(String[] extraArgs, String codec) + throws IOException { + String[] types = + {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)", + "VARBINARY(2)", }; + String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", }; createTableWithColTypes(types, vals); - runImport(getOutputArgv(true, null)); + runImport(getOutputArgv(true, extraArgs)); Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); DataFileReader reader = read(outputFile); @@ -118,6 +153,10 @@ public void testAvroImport() throws IOException { ByteBuffer b = ((ByteBuffer) object); assertEquals((byte) 1, b.get(0)); assertEquals((byte) 2, b.get(1)); + + if (codec != null) { + assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC)); + } } public void testOverrideTypeMapping() throws IOException { diff --git a/src/test/com/cloudera/sqoop/io/TestCodecMap.java b/src/test/com/cloudera/sqoop/io/TestCodecMap.java index f2f4039d..925c5446 100644 --- a/src/test/com/cloudera/sqoop/io/TestCodecMap.java +++ b/src/test/com/cloudera/sqoop/io/TestCodecMap.java @@ -52,6 +52,23 @@ public void testGetCodec() throws IOException { verifyCodec(GzipCodec.class, "org.apache.hadoop.io.compress.GzipCodec"); } + public void testGetShortName() throws UnsupportedCodecException { + verifyShortName("gzip", "org.apache.hadoop.io.compress.GzipCodec"); + verifyShortName("default", "org.apache.hadoop.io.compress.DefaultCodec"); + try { + verifyShortName("NONE", "bogus"); + fail("Expected IOException"); + } catch (UnsupportedCodecException e) { + // Exception is expected + } + } + + private void verifyShortName(String expected, String codecName) + throws UnsupportedCodecException { + assertEquals(expected, + CodecMap.getCodecShortNameByName(codecName, new Configuration())); + } + public void testUnrecognizedCodec() { try { CodecMap.getCodec("bogus", new Configuration());