5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 05:01:10 +08:00

SQOOP-428. AvroOutputFormat doesn't support compression even though documentation claims it does

(Lars Francke via Jarek Jarcec Cecho)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1240613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jarek Jarcec Cecho 2012-02-04 21:37:08 +00:00
parent 2c5127213d
commit 40856655c3
7 changed files with 208 additions and 19 deletions

View File

@ -70,4 +70,15 @@ public static CompressionCodec getCodec(String codecName,
public static Set<String> getCodecNames() {
return org.apache.sqoop.io.CodecMap.getCodecNames();
}
/**
* Return the short name of the codec.
* See {@link org.apache.sqoop.io.CodecMap#getCodecShortNameByName(String,
* Configuration)}.
*/
public static String getCodecShortNameByName(String codecName,
Configuration conf) throws UnsupportedCodecException {
return org.apache.sqoop.io.CodecMap
.getCodecShortNameByName(codecName, conf);
}
}

View File

@ -49,7 +49,7 @@ public final class CodecMap {
codecNames.put(NONE, null);
codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
codecNames.put(LZO, "com.hadoop.compression.lzo.LzoCodec");
codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec");
codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec");
// add more from Hadoop CompressionCodecFactory
for (Class<? extends CompressionCodec> cls
@ -135,7 +135,7 @@ public static Set<String> getCodecNames() {
* <p>
* Note: When HADOOP-7323 is available this method can be replaced with a call
* to CompressionCodecFactory.
* @param classname the canonical class name of the codec or the codec alias
* @param codecName the canonical class name of the codec or the codec alias
* @return the codec object or null if none matching the name were found
*/
private static CompressionCodec getCodecByName(String codecName,
@ -150,6 +150,45 @@ private static CompressionCodec getCodecByName(String codecName,
return null;
}
/**
* Gets the short name for a specified codec. See {@link
* #getCodecByName(String, Configuration)} for details. The name returned
* here is the shortest possible one that means a {@code Codec} part is
* removed as well.
*
* @param codecName name of the codec to return the short name for
* @param conf job configuration object used to get the registered
* compression codecs
*
* @return the short name of the codec
*
* @throws com.cloudera.sqoop.io.UnsupportedCodecException
* if no short name could be found
*/
public static String getCodecShortNameByName(String codecName,
Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
if (codecNames.containsKey(codecName)) {
return codecName;
}
CompressionCodec codec = getCodecByName(codecName, conf);
Class<? extends CompressionCodec> codecClass = null;
if (codec != null) {
codecClass = codec.getClass();
}
if (codecClass != null) {
String simpleName = codecClass.getSimpleName();
if (simpleName.endsWith("Codec")) {
simpleName =
simpleName.substring(0, simpleName.length() - "Codec".length());
}
return simpleName.toLowerCase();
}
throw new com.cloudera.sqoop.io.UnsupportedCodecException(
"Cannot find codec class " + codecName + " for codec " + codecName);
}
private static boolean codecMatches(Class<? extends CompressionCodec> cls,
String codecName) {

View File

@ -27,6 +27,9 @@
public final class AvroJob {
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
/** The configuration key for a job's output schema. */
public static final String OUTPUT_SCHEMA = "avro.output.schema";
private AvroJob() {
}
@ -36,6 +39,11 @@ public static void setMapOutputSchema(Configuration job, Schema s) {
/** Return a job's map output key schema. */
public static Schema getMapOutputSchema(Configuration job) {
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
}
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));
}
}

View File

@ -19,33 +19,85 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
import static org.apache.avro.mapred.AvroOutputFormat.EXT;
import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
/**
* An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files.
* <p/>
* Note: This class is copied from the Avro project in version 1.5.4 and
* adapted here to work with the "new" MapReduce API that's required in Sqoop.
*/
public class AvroOutputFormat<T>
extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
static <T> void configureDataFileWriter(DataFileWriter<T> writer,
TaskAttemptContext context) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(context)) {
int level = context.getConfiguration()
.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = context.getConfiguration()
.get(org.apache.avro.mapred.AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory =
codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level)
: CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
writer.setSyncInterval(context.getConfiguration()
.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
// copy metadata from job
for (Map.Entry<String, String> e : context.getConfiguration()) {
if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.TEXT_PREFIX)) {
writer.setMeta(e.getKey()
.substring(org.apache.avro.mapred.AvroJob.TEXT_PREFIX.length()),
e.getValue());
}
if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.BINARY_PREFIX)) {
writer.setMeta(e.getKey()
.substring(org.apache.avro.mapred.AvroJob.BINARY_PREFIX.length()),
URLDecoder.decode(e.getValue(), "ISO-8859-1").getBytes("ISO-8859-1"));
}
}
}
@Override
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
TaskAttemptContext context) throws IOException, InterruptedException {
Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
boolean isMapOnly = context.getNumReduceTasks() == 0;
Schema schema =
isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
: AvroJob.getOutputSchema(context.getConfiguration());
final DataFileWriter<T> WRITER =
new DataFileWriter<T>(new GenericDatumWriter<T>());
new DataFileWriter<T>(new ReflectDatumWriter<T>());
Path path = getDefaultWorkFile(context,
org.apache.avro.mapred.AvroOutputFormat.EXT);
configureDataFileWriter(WRITER, context);
Path path = getDefaultWorkFile(context, EXT);
WRITER.create(schema,
path.getFileSystem(context.getConfiguration()).create(path));
path.getFileSystem(context.getConfiguration()).create(path));
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
@Override
@ -53,9 +105,10 @@ public void write(AvroWrapper<T> wrapper, NullWritable ignore)
throws IOException {
WRITER.append(wrapper.datum());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
public void close(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
WRITER.close();
}
};

View File

@ -19,6 +19,9 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.mapred.AvroJob;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -98,7 +101,26 @@ protected void configureOutputFormat(Job job, String tableName,
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
CompressionType.BLOCK);
}
// SQOOP-428: Avro expects not a fully qualified class name but a "short"
// name instead (e.g. "snappy") and it needs to be set in a custom
// configuration option called "avro.output.codec".
// The default codec is "deflate".
if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
if (codecName != null) {
String shortName =
CodecMap.getCodecShortNameByName(codecName, job.getConfiguration());
// Avro only knows about "deflate" and not "default"
if (shortName.equalsIgnoreCase("default")) {
shortName = "deflate";
}
job.getConfiguration().set(AvroJob.OUTPUT_CODEC, shortName);
} else {
job.getConfiguration()
.set(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
}
}
}

View File

@ -28,6 +28,7 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -82,14 +83,48 @@ protected String[] getOutputArgv(boolean includeHadoopFlags,
}
public void testAvroImport() throws IOException {
avroImportTestHelper(null, null);
}
String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
"VARCHAR(6)", "VARBINARY(2)", };
String [] vals = { "true", "100", "200", "1.0", "2.0",
"'s'", "'0102'", };
public void testDeflateCompressedAvroImport() throws IOException {
avroImportTestHelper(new String[] {"--compression-codec",
"org.apache.hadoop.io.compress.DefaultCodec", }, "deflate");
}
public void testDefaultCompressedAvroImport() throws IOException {
avroImportTestHelper(new String[] {"--compress", }, "deflate");
}
public void testUnsupportedCodec() throws IOException {
try {
avroImportTestHelper(new String[] {"--compression-codec", "foobar", },
null);
fail("Expected IOException");
} catch (IOException e) {
// Exception is expected
}
}
/**
* Helper method that runs an import using Avro with optional command line
* arguments and checks that the created file matches the expectations.
* <p/>
* This can be used to test various extra options that are implemented for
* the Avro input.
*
* @param extraArgs extra command line arguments to pass to Sqoop in addition
* to those that {@link #getOutputArgv(boolean, String[])}
* returns
*/
private void avroImportTestHelper(String[] extraArgs, String codec)
throws IOException {
String[] types =
{"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
"VARBINARY(2)", };
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
runImport(getOutputArgv(true, extraArgs));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
@ -118,6 +153,10 @@ public void testAvroImport() throws IOException {
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
if (codec != null) {
assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
}
}
public void testOverrideTypeMapping() throws IOException {

View File

@ -52,6 +52,23 @@ public void testGetCodec() throws IOException {
verifyCodec(GzipCodec.class, "org.apache.hadoop.io.compress.GzipCodec");
}
public void testGetShortName() throws UnsupportedCodecException {
verifyShortName("gzip", "org.apache.hadoop.io.compress.GzipCodec");
verifyShortName("default", "org.apache.hadoop.io.compress.DefaultCodec");
try {
verifyShortName("NONE", "bogus");
fail("Expected IOException");
} catch (UnsupportedCodecException e) {
// Exception is expected
}
}
private void verifyShortName(String expected, String codecName)
throws UnsupportedCodecException {
assertEquals(expected,
CodecMap.getCodecShortNameByName(codecName, new Configuration()));
}
public void testUnrecognizedCodec() {
try {
CodecMap.getCodec("bogus", new Configuration());