diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index d9984af3..3a19aeac 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.accumulo.AccumuloConstants; import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation; import org.apache.sqoop.tool.BaseSqoopTool; import org.apache.sqoop.util.CredentialsUtil; import org.apache.sqoop.util.LoggingUtils; @@ -52,6 +53,7 @@ import org.apache.sqoop.util.StoredAsProperty; import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier; /** @@ -458,6 +460,9 @@ public String toString() { @StoredAsProperty("hs2.keytab") private String hs2Keytab; + @StoredAsProperty("parquet.configurator.implementation") + private ParquetJobConfiguratorImplementation parquetConfiguratorImplementation; + public SqoopOptions() { initDefaults(null); } @@ -1152,6 +1157,8 @@ private void initDefaults(Configuration baseConfiguration) { // set escape column mapping to true this.escapeColumnMappingEnabled = true; + + this.parquetConfiguratorImplementation = KITE; } /** @@ -2925,5 +2932,12 @@ public void setHs2Keytab(String hs2Keytab) { this.hs2Keytab = hs2Keytab; } + public ParquetJobConfiguratorImplementation getParquetConfiguratorImplementation() { + return parquetConfiguratorImplementation; + } + + public void setParquetConfiguratorImplementation(ParquetJobConfiguratorImplementation parquetConfiguratorImplementation) { + this.parquetConfiguratorImplementation = parquetConfiguratorImplementation; + } } diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index 57c20625..1663b1d1 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -40,6 +40,11 @@ import org.apache.sqoop.lib.BlobRef; import org.apache.sqoop.lib.ClobRef; import org.apache.sqoop.orm.ClassWriter; +import parquet.avro.AvroSchemaConverter; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.MessageType; import java.io.IOException; import java.math.BigDecimal; @@ -285,24 +290,7 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) { */ public static Schema getAvroSchema(Path path, Configuration conf) throws IOException { - FileSystem fs = path.getFileSystem(conf); - Path fileToTest; - if (fs.isDirectory(path)) { - FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() { - @Override - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }); - if (fileStatuses.length == 0) { - return null; - } - fileToTest = fileStatuses[0].getPath(); - } else { - fileToTest = path; - } - + Path fileToTest = getFileToTest(path, conf); SeekableInput input = new FsInput(fileToTest, conf); DatumReader reader = new GenericDatumReader(); FileReader fileReader = DataFileReader.openReader(input, reader); @@ -340,8 +328,37 @@ public static LogicalType createDecimalType(Integer precision, Integer scale, Co return LogicalTypes.decimal(precision, scale); } + private static Path getFileToTest(Path path, Configuration conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (!fs.isDirectory(path)) { + return path; + } + FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + if (fileStatuses.length == 0) { + return null; + } + return fileStatuses[0].getPath(); + } public static Schema parseAvroSchema(String schemaString) { return new Schema.Parser().parse(schemaString); } + + public static Schema getAvroSchemaFromParquetFile(Path path, Configuration conf) throws IOException { + Path fileToTest = getFileToTest(path, conf); + if (fileToTest == null) { + return null; + } + ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(conf, fileToTest, ParquetMetadataConverter.NO_FILTER); + + MessageType parquetSchema = parquetMetadata.getFileMetaData().getSchema(); + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(); + return avroSchemaConverter.convert(parquetSchema); + } } diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index c80dd5d9..4c1e8f59 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -46,7 +46,6 @@ import org.apache.sqoop.lib.BlobRef; import org.apache.sqoop.lib.ClobRef; import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider; import org.apache.sqoop.util.ExportException; import org.apache.sqoop.util.ImportException; @@ -869,7 +868,7 @@ public boolean isDirectModeAccumuloSupported() { } public ParquetJobConfiguratorFactory getParquetJobConfigurator() { - return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf()); + return options.getParquetConfiguratorImplementation().createFactory(); } } diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 3b542102..349ca8d8 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -49,6 +49,8 @@ import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; import org.apache.sqoop.orm.AvroSchemaGenerator; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; + /** * Actually runs a jdbc import job using the ORM files generated by the * sqoop.orm package. Uses DataDrivenDBInputFormat. @@ -114,6 +116,7 @@ protected void configureMapper(Job job, String tableName, Schema schema = generateAvroSchema(tableName, schemaNameOverride); Path destination = getContext().getDestination(); + options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString()); parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination); } diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 17c9ed39..80c06988 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -152,6 +152,7 @@ protected void configureOutputFormat(Job job, String tableName, String shortName = CodecMap.getCodecShortNameByName(codecName, conf); if (!shortName.equalsIgnoreCase("default")) { conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName); + options.getConf().set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName); } } } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java index ae53a96b..63fe92a9 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java @@ -25,6 +25,8 @@ public final class ParquetConstants { public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec"; + public static final String PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY = "parquetjob.configurator.implementation"; + private ParquetConstants() { throw new AssertionError("This class is meant for static use only."); } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java index 8d7b87f6..d4c50a39 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java @@ -25,6 +25,10 @@ import java.io.IOException; +/** + * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}. + * The implementations of the methods of this interface help to configure Sqoop Parquet export jobs. + */ public interface ParquetExportJobConfigurator { void configureInputFormat(Job job, Path inputPath) throws IOException; diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java index fa1bc7d1..eb6d08f8 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java @@ -27,6 +27,10 @@ import java.io.IOException; +/** + * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}. + * The implementations of the methods of this interface help to configure Sqoop Parquet import jobs. + */ public interface ParquetImportJobConfigurator { void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException; diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java index ed5103f1..a97c243e 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java @@ -18,6 +18,14 @@ package org.apache.sqoop.mapreduce.parquet; +/** + * This interface is an abstract factory of objects which configure Sqoop Parquet jobs. + * Every product is responsible for configuring different types of Sqoop jobs. + * + * @see ParquetImportJobConfigurator + * @see ParquetExportJobConfigurator + * @see ParquetMergeJobConfigurator + */ public interface ParquetJobConfiguratorFactory { ParquetImportJobConfigurator createParquetImportJobConfigurator(); diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java new file mode 100644 index 00000000..050c8548 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet; + +import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory; +import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory; + +/** + * An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}. + * The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects. + */ +public enum ParquetJobConfiguratorImplementation { + KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class); + + private Class configuratorFactoryClass; + + ParquetJobConfiguratorImplementation(Class configuratorFactoryClass) { + this.configuratorFactoryClass = configuratorFactoryClass; + } + + public ParquetJobConfiguratorFactory createFactory() { + try { + return configuratorFactoryClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Could not instantiate factory class: " + configuratorFactoryClass, e); + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java index 67fdf660..e5fe53f5 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java @@ -24,6 +24,10 @@ import java.io.IOException; +/** + * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}. + * The implementations of the methods of this interface help to configure Sqoop Parquet merge jobs. + */ public interface ParquetMergeJobConfigurator { void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException; diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopMergeParquetReducer.java similarity index 60% rename from src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java rename to src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopMergeParquetReducer.java index 2286a520..9104eee1 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopMergeParquetReducer.java @@ -16,19 +16,20 @@ * limitations under the License. */ -package org.apache.sqoop.mapreduce.parquet; +package org.apache.sqoop.mapreduce.parquet.hadoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory; +import org.apache.avro.generic.GenericRecord; +import org.apache.sqoop.mapreduce.MergeParquetReducer; -public final class ParquetJobConfiguratorFactoryProvider { +import java.io.IOException; - private ParquetJobConfiguratorFactoryProvider() { - throw new AssertionError("This class is meant for static use only."); +/** + * An implementation of {@link MergeParquetReducer} which depends on the Hadoop Parquet library. + */ +public class HadoopMergeParquetReducer extends MergeParquetReducer { + + @Override + protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { + context.write(null, record); } - - public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) { - return new KiteParquetJobConfiguratorFactory(); - } - } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java new file mode 100644 index 00000000..2180cc20 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; +import parquet.avro.AvroParquetInputFormat; + +import java.io.IOException; + +/** + * An implementation of {@link ParquetExportJobConfigurator} which depends on the Hadoop Parquet library. + */ +public class HadoopParquetExportJobConfigurator implements ParquetExportJobConfigurator { + + @Override + public void configureInputFormat(Job job, Path inputPath) throws IOException { + // do nothing + } + + @Override + public Class getMapperClass() { + return HadoopParquetExportMapper.class; + } + + @Override + public Class getInputFormatClass() { + return AvroParquetInputFormat.class; + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java new file mode 100644 index 00000000..f960f217 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.io.NullWritable; +import org.apache.sqoop.mapreduce.GenericRecordExportMapper; + +import java.io.IOException; + +/** + * An implementation of {@link GenericRecordExportMapper} which depends on the Hadoop Parquet library. + */ +public class HadoopParquetExportMapper extends GenericRecordExportMapper { + + @Override + protected void map(Void key, GenericRecord val, Context context) throws IOException, InterruptedException { + context.write(toSqoopRecord(val), NullWritable.get()); + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java new file mode 100644 index 00000000..3f35faf8 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.avro.Schema; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; +import parquet.avro.AvroParquetOutputFormat; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY; + +/** + * An implementation of {@link ParquetImportJobConfigurator} which depends on the Hadoop Parquet library. + */ +public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfigurator { + + private static final Log LOG = LogFactory.getLog(HadoopParquetImportJobConfigurator.class.getName()); + + @Override + public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException { + configureAvroSchema(job, schema); + configureOutputCodec(job); + } + + @Override + public Class getMapperClass() { + return HadoopParquetImportMapper.class; + } + + @Override + public Class getOutputFormatClass() { + return AvroParquetOutputFormat.class; + } + + void configureOutputCodec(Job job) { + String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY); + if (outputCodec != null) { + LOG.info("Using output codec: " + outputCodec); + ParquetOutputFormat.setCompression(job, CompressionCodecName.fromConf(outputCodec)); + } + } + + void configureAvroSchema(Job job, Schema schema) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using Avro schema: " + schema); + } + AvroParquetOutputFormat.setSchema(job, schema); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java new file mode 100644 index 00000000..7a66d11e --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.sqoop.avro.AvroUtil; +import org.apache.sqoop.lib.LargeObjectLoader; +import org.apache.sqoop.mapreduce.ParquetImportMapper; + +import java.io.IOException; + +/** + * An implementation of {@link ParquetImportMapper} which depends on the Hadoop Parquet library. + */ +public class HadoopParquetImportMapper extends ParquetImportMapper { + + private static final Log LOG = LogFactory.getLog(HadoopParquetImportMapper.class.getName()); + + /** + * The key to get the configuration value set by + * parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + private static final String HADOOP_PARQUET_AVRO_SCHEMA_KEY = "parquet.avro.schema"; + + @Override + protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException { + return new LargeObjectLoader(context.getConfiguration(), FileOutputFormat.getWorkOutputPath(context)); + } + + @Override + protected Schema getAvroSchema(Configuration configuration) { + String schemaString = configuration.get(HADOOP_PARQUET_AVRO_SCHEMA_KEY); + LOG.debug("Found Avro schema: " + schemaString); + return AvroUtil.parseAvroSchema(schemaString); + } + + @Override + protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { + context.write(null, record); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java new file mode 100644 index 00000000..052ea04b --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; +import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; +import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; + +/** + * A concrete factory implementation which produces configurator objects using the Hadoop Parquet library. + */ +public class HadoopParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory { + + @Override + public ParquetImportJobConfigurator createParquetImportJobConfigurator() { + return new HadoopParquetImportJobConfigurator(); + } + + @Override + public ParquetExportJobConfigurator createParquetExportJobConfigurator() { + return new HadoopParquetExportJobConfigurator(); + } + + @Override + public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() { + return new HadoopParquetMergeJobConfigurator(); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java new file mode 100644 index 00000000..66ebc5b8 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.parquet.hadoop; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaValidationException; +import org.apache.avro.SchemaValidator; +import org.apache.avro.SchemaValidatorBuilder; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.mapreduce.MergeParquetMapper; +import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; +import parquet.avro.AvroParquetInputFormat; + +import java.io.IOException; + +import static java.lang.String.format; +import static java.util.Collections.singleton; +import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; + +/** + * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Hadoop Parquet library. + */ +public class HadoopParquetMergeJobConfigurator implements ParquetMergeJobConfigurator { + + public static final Log LOG = LogFactory.getLog(HadoopParquetMergeJobConfigurator.class.getName()); + + private final HadoopParquetImportJobConfigurator importJobConfigurator; + + private final HadoopParquetExportJobConfigurator exportJobConfigurator; + + public HadoopParquetMergeJobConfigurator(HadoopParquetImportJobConfigurator importJobConfigurator, HadoopParquetExportJobConfigurator exportJobConfigurator) { + this.importJobConfigurator = importJobConfigurator; + this.exportJobConfigurator = exportJobConfigurator; + } + + public HadoopParquetMergeJobConfigurator() { + this(new HadoopParquetImportJobConfigurator(), new HadoopParquetExportJobConfigurator()); + } + + @Override + public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, + Path finalPath) throws IOException { + try { + LOG.info("Trying to merge parquet files"); + job.setOutputKeyClass(Void.class); + job.setMapperClass(MergeParquetMapper.class); + job.setReducerClass(HadoopMergeParquetReducer.class); + job.setOutputValueClass(GenericRecord.class); + + Schema avroSchema = loadAvroSchema(conf, oldPath); + + validateNewPathAvroSchema(getAvroSchemaFromParquetFile(newPath, conf), avroSchema); + + job.setInputFormatClass(exportJobConfigurator.getInputFormatClass()); + AvroParquetInputFormat.setAvroReadSchema(job, avroSchema); + + conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString()); + importJobConfigurator.configureAvroSchema(job, avroSchema); + importJobConfigurator.configureOutputCodec(job); + job.setOutputFormatClass(importJobConfigurator.getOutputFormatClass()); + } catch (Exception cnfe) { + throw new IOException(cnfe); + } + } + + private Schema loadAvroSchema(Configuration conf, Path path) throws IOException { + Schema avroSchema = getAvroSchemaFromParquetFile(path, conf); + + if (avroSchema == null) { + throw new RuntimeException("Could not load Avro schema from path: " + path); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Avro schema loaded: " + avroSchema); + } + + return avroSchema; + } + + /** + * This method ensures that the Avro schema in the new path is compatible with the Avro schema in the old path. + */ + private void validateNewPathAvroSchema(Schema newPathAvroSchema, Schema avroSchema) { + // If the new path is an empty directory (e.g. in case of a sqoop merge command) then the newPathAvroSchema will + // be null. In that case we just want to proceed without real validation. + if (newPathAvroSchema == null) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug(format("Validation Avro schema %s against %s", newPathAvroSchema.toString(), avroSchema.toString())); + } + SchemaValidator schemaValidator = new SchemaValidatorBuilder().mutualReadStrategy().validateAll(); + try { + schemaValidator.validate(newPathAvroSchema, singleton(avroSchema)); + } catch (SchemaValidationException e) { + throw new RuntimeException("Cannot merge files, the Avro schemas are not compatible.", e); + } + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java index 7f21205e..02816d77 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java @@ -24,6 +24,9 @@ import java.io.IOException; +/** + * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API. + */ public class KiteMergeParquetReducer extends MergeParquetReducer { @Override diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java index ca02c7bd..6ebc5a31 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java @@ -28,6 +28,9 @@ import java.io.IOException; +/** + * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API. + */ public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator { @Override diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java index 25555d88..122ff3fc 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java @@ -25,7 +25,7 @@ import java.io.IOException; /** - * Exports Parquet records from a data source. + * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API. */ public class KiteParquetExportMapper extends GenericRecordExportMapper { diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java index 87828d14..feb3bf19 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java @@ -35,6 +35,9 @@ import java.io.IOException; +/** + * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API. + */ public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator { public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName()); diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java index 20adf6e4..0a91e4a2 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java @@ -30,6 +30,9 @@ import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; +/** + * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API. + */ public class KiteParquetImportMapper extends ParquetImportMapper { @Override diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java index 055e1166..bd07c09f 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java @@ -23,6 +23,9 @@ import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; +/** + * A concrete factory implementation which produces configurator objects using the Kite Dataset API. + */ public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory { @Override diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java index 9fecf282..ed045cd1 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java @@ -48,6 +48,9 @@ import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; +/** + * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API. + */ public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator { public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName()); diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java index e68bba90..a4768c93 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java @@ -45,7 +45,7 @@ import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY; /** - * Helper class for setting up a Parquet MapReduce job. + * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job. */ public final class KiteParquetUtils { diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index c62ee98c..8d318327 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -19,6 +19,9 @@ package org.apache.sqoop.tool; import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf; import java.io.File; import java.io.FileInputStream; @@ -34,12 +37,11 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.sqoop.manager.SupportedManagers; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation; import org.apache.sqoop.util.CredentialsUtil; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.password.CredentialProviderHelper; @@ -183,6 +185,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { public static final String THROW_ON_ERROR_ARG = "throw-on-error"; public static final String ORACLE_ESCAPING_DISABLED = "oracle-escaping-disabled"; public static final String ESCAPE_MAPPING_COLUMN_NAMES_ENABLED = "escape-mapping-column-names"; + public static final String PARQUET_CONFIGURATOR_IMPLEMENTATION = "parquet-configurator-implementation"; // Arguments for validation. public static final String VALIDATE_ARG = "validate"; @@ -1145,6 +1148,8 @@ protected void applyCommonOptions(CommandLine in, SqoopOptions out) out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue( ESCAPE_MAPPING_COLUMN_NAMES_ENABLED))); } + + applyParquetJobConfigurationImplementation(in, out); } private void applyCredentialsOptions(CommandLine in, SqoopOptions out) @@ -1908,7 +1913,27 @@ protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.Inva } - public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration) { - return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration); + private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException { + String optionValue = in.getOptionValue(PARQUET_CONFIGURATOR_IMPLEMENTATION); + String propertyValue = out.getConf().get(PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY); + + String valueToUse = isBlank(optionValue) ? propertyValue : optionValue; + + if (isBlank(valueToUse)) { + LOG.debug("Parquet job configurator implementation is not set, using default value: " + out.getParquetConfiguratorImplementation()); + return; + } + + try { + ParquetJobConfiguratorImplementation parquetConfiguratorImplementation = valueOf(valueToUse.toUpperCase()); + out.setParquetConfiguratorImplementation(parquetConfiguratorImplementation); + LOG.debug("Parquet job configurator implementation set: " + parquetConfiguratorImplementation); + } catch (IllegalArgumentException e) { + throw new InvalidOptionsException(format("Invalid Parquet job configurator implementation is set: %s. Supported values are: %s", valueToUse, Arrays.toString(ParquetJobConfiguratorImplementation.values()))); + } + } + + public ParquetJobConfiguratorFactory getParquetJobConfigurator(SqoopOptions sqoopOptions) { + return sqoopOptions.getParquetConfiguratorImplementation().createFactory(); } } diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 2c474b7e..25c3f703 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -473,7 +473,7 @@ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) loadJars(options.getConf(), context.getJarFile(), context.getTableName()); } - ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator(); + ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator(); MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator); if (mergeJob.runMergeJob()) { // Rename destination directory to proper location. diff --git a/src/java/org/apache/sqoop/tool/MergeTool.java b/src/java/org/apache/sqoop/tool/MergeTool.java index 4c20f7d1..e23b4f11 100644 --- a/src/java/org/apache/sqoop/tool/MergeTool.java +++ b/src/java/org/apache/sqoop/tool/MergeTool.java @@ -53,7 +53,7 @@ public MergeTool(String toolName) { public int run(SqoopOptions options) { try { // Configure and execute a MapReduce job to merge these datasets. - ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator(); + ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator(); MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator); if (!mergeJob.runMergeJob()) { LOG.error("MapReduce job failed!"); diff --git a/src/test/org/apache/sqoop/TestBigDecimalExport.java b/src/test/org/apache/sqoop/TestBigDecimalExport.java index ccea1734..b579d933 100644 --- a/src/test/org/apache/sqoop/TestBigDecimalExport.java +++ b/src/test/org/apache/sqoop/TestBigDecimalExport.java @@ -37,6 +37,7 @@ import org.apache.sqoop.testutil.ExportJobTestCase; import org.junit.Test; +import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; /** @@ -57,7 +58,7 @@ private void runBigDecimalExport(String line) writer.close(); String[] types = { "DECIMAL", "NUMERIC" }; - createTableWithColTypes(types, null); + createTableWithColTypes(types, emptyList()); List args = new ArrayList(); diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java index 11806fea..2b3280a5 100644 --- a/src/test/org/apache/sqoop/TestMerge.java +++ b/src/test/org/apache/sqoop/TestMerge.java @@ -26,6 +26,8 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; + +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation; import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.HsqldbTestServer; import org.apache.sqoop.manager.ConnManager; @@ -52,6 +54,8 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.junit.Assert.fail; /** @@ -80,6 +84,8 @@ public class TestMerge extends BaseSqoopTestCase { Arrays.asList(new Integer(1), new Integer(43)), Arrays.asList(new Integer(3), new Integer(313))); + private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE; + @Before public void setUp() { super.setUp(); @@ -112,6 +118,7 @@ public Configuration newConf() { public SqoopOptions getSqoopOptions(Configuration conf) { SqoopOptions options = new SqoopOptions(conf); options.setConnectString(HsqldbTestServer.getDbUrl()); + options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation); return options; } @@ -157,7 +164,14 @@ public void testAvroFileMerge() throws Exception { } @Test - public void testParquetFileMerge() throws Exception { + public void testParquetFileMergeHadoop() throws Exception { + parquetJobConfiguratorImplementation = HADOOP; + runMergeTest(SqoopOptions.FileLayout.ParquetFile); + } + + @Test + public void testParquetFileMergeKite() throws Exception { + parquetJobConfiguratorImplementation = KITE; runMergeTest(SqoopOptions.FileLayout.ParquetFile); } diff --git a/src/test/org/apache/sqoop/TestParquetExport.java b/src/test/org/apache/sqoop/TestParquetExport.java index 43dabb57..0fab1880 100644 --- a/src/test/org/apache/sqoop/TestParquetExport.java +++ b/src/test/org/apache/sqoop/TestParquetExport.java @@ -32,6 +32,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import parquet.avro.AvroParquetWriter; import java.io.IOException; @@ -42,6 +44,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -55,11 +58,23 @@ /** * Test that we can export Parquet Data Files from HDFS into databases. */ +@RunWith(Parameterized.class) public class TestParquetExport extends ExportJobTestCase { + @Parameterized.Parameters(name = "parquetImplementation = {0}") + public static Iterable parquetImplementationParameters() { + return Arrays.asList("kite", "hadoop"); + } + @Rule public ExpectedException thrown = ExpectedException.none(); + private final String parquetImplementation; + + public TestParquetExport(String parquetImplementation) { + this.parquetImplementation = parquetImplementation; + } + /** * @return an argv for the CodeGenTool to use when creating tables to export. */ @@ -478,5 +493,10 @@ public void testMissingParquetFields() throws IOException, SQLException { runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); } - + @Override + protected Configuration getConf() { + Configuration conf = super.getConf(); + conf.set("parquetjob.configurator.implementation", parquetImplementation); + return conf; + } } diff --git a/src/test/org/apache/sqoop/TestParquetImport.java b/src/test/org/apache/sqoop/TestParquetImport.java index 27d407aa..b1488e8a 100644 --- a/src/test/org/apache/sqoop/TestParquetImport.java +++ b/src/test/org/apache/sqoop/TestParquetImport.java @@ -19,7 +19,6 @@ package org.apache.sqoop; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.HsqldbTestServer; import org.apache.sqoop.testutil.ImportJobTestCase; @@ -32,12 +31,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.sqoop.util.ParquetReader; import org.junit.Test; -import parquet.avro.AvroSchemaConverter; -import parquet.format.CompressionCodec; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.MessageType; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.nio.ByteBuffer; @@ -46,20 +42,38 @@ import java.util.Arrays; import java.util.List; +import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; /** * Tests --as-parquetfile. */ +@RunWith(Parameterized.class) public class TestParquetImport extends ImportJobTestCase { public static final Log LOG = LogFactory .getLog(TestParquetImport.class.getName()); + private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite"; + + private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop"; + + @Parameters(name = "parquetImplementation = {0}") + public static Iterable parquetImplementationParameters() { + return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP); + } + + private final String parquetImplementation; + + public TestParquetImport(String parquetImplementation) { + this.parquetImplementation = parquetImplementation; + } + /** * Create the argv to pass to Sqoop. * @@ -122,12 +136,30 @@ public void testSnappyCompression() throws IOException { } @Test - public void testDeflateCompression() throws IOException { + public void testHadoopGzipCompression() throws IOException { + assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation)); + runParquetImportTest("gzip"); + } + + @Test + public void testKiteDeflateCompression() throws IOException { + assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation)); // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified. // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName() runParquetImportTest("deflate", "gzip"); } + /** + * This test case is added to document that the deflate codec is not supported with + * the Hadoop Parquet implementation so Sqoop throws an exception when it is specified. + * @throws IOException + */ + @Test(expected = IOException.class) + public void testHadoopDeflateCompression() throws IOException { + assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation)); + runParquetImportTest("deflate"); + } + private void runParquetImportTest(String codec) throws IOException { runParquetImportTest(codec, codec); } @@ -141,9 +173,10 @@ private void runParquetImportTest(String codec, String expectedCodec) throws IOE String [] extraArgs = { "--compression-codec", codec}; runImport(getOutputArgv(true, extraArgs)); - assertEquals(expectedCodec.toUpperCase(), getCompressionType()); + ParquetReader parquetReader = new ParquetReader(getTablePath()); + assertEquals(expectedCodec.toUpperCase(), parquetReader.getCodec().name()); - Schema schema = getSchema(); + Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf()); assertEquals(Type.RECORD, schema.getType()); List fields = schema.getFields(); assertEquals(types.length, fields.size()); @@ -155,7 +188,7 @@ private void runParquetImportTest(String codec, String expectedCodec) throws IOE checkField(fields.get(5), "DATA_COL5", Type.STRING); checkField(fields.get(6), "DATA_COL6", Type.BYTES); - List genericRecords = new ParquetReader(getTablePath()).readAll(); + List genericRecords = parquetReader.readAll(); GenericRecord record1 = genericRecords.get(0); assertNotNull(record1); assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); @@ -181,7 +214,7 @@ public void testOverrideTypeMapping() throws IOException { String [] extraArgs = { "--map-column-java", "DATA_COL0=String"}; runImport(getOutputArgv(true, extraArgs)); - Schema schema = getSchema(); + Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf()); assertEquals(Type.RECORD, schema.getType()); List fields = schema.getFields(); assertEquals(types.length, fields.size()); @@ -202,7 +235,7 @@ public void testFirstUnderscoreInColumnName() throws IOException { runImport(getOutputArgv(true, null)); - Schema schema = getSchema(); + Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf()); assertEquals(Type.RECORD, schema.getType()); List fields = schema.getFields(); assertEquals(types.length, fields.size()); @@ -223,7 +256,7 @@ public void testNonIdentCharactersInColumnName() throws IOException { runImport(getOutputArgv(true, null)); - Schema schema = getSchema(); + Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf()); assertEquals(Type.RECORD, schema.getType()); List fields = schema.getFields(); assertEquals(types.length, fields.size()); @@ -295,29 +328,6 @@ public void testOverwriteParquetDatasetFail() throws IOException, SQLException { } } - private String getCompressionType() { - ParquetMetadata parquetMetadata = getOutputMetadata(); - CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec(); - return parquetCompressionCodec.name(); - } - - private ParquetMetadata getOutputMetadata() { - try { - Configuration config = new Configuration(); - FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath()); - List