5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 19:01:27 +08:00

SQOOP-3328: Implement an alternative solution for Parquet reading and writing

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-06-28 16:41:01 +02:00
parent f4f9543010
commit 8e45d2b38d
40 changed files with 1034 additions and 102 deletions

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.accumulo.AccumuloConstants;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
@ -52,6 +53,7 @@
import org.apache.sqoop.util.StoredAsProperty;
import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
/**
@ -458,6 +460,9 @@ public String toString() {
@StoredAsProperty("hs2.keytab")
private String hs2Keytab;
@StoredAsProperty("parquet.configurator.implementation")
private ParquetJobConfiguratorImplementation parquetConfiguratorImplementation;
public SqoopOptions() {
initDefaults(null);
}
@ -1152,6 +1157,8 @@ private void initDefaults(Configuration baseConfiguration) {
// set escape column mapping to true
this.escapeColumnMappingEnabled = true;
this.parquetConfiguratorImplementation = KITE;
}
/**
@ -2925,5 +2932,12 @@ public void setHs2Keytab(String hs2Keytab) {
this.hs2Keytab = hs2Keytab;
}
public ParquetJobConfiguratorImplementation getParquetConfiguratorImplementation() {
return parquetConfiguratorImplementation;
}
public void setParquetConfiguratorImplementation(ParquetJobConfiguratorImplementation parquetConfiguratorImplementation) {
this.parquetConfiguratorImplementation = parquetConfiguratorImplementation;
}
}

View File

@ -40,6 +40,11 @@
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.orm.ClassWriter;
import parquet.avro.AvroSchemaConverter;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import java.io.IOException;
import java.math.BigDecimal;
@ -285,24 +290,7 @@ public static Object fromAvro(Object avroObject, Schema schema, String type) {
*/
public static Schema getAvroSchema(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
Path fileToTest;
if (fs.isDirectory(path)) {
FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fileStatuses.length == 0) {
return null;
}
fileToTest = fileStatuses[0].getPath();
} else {
fileToTest = path;
}
Path fileToTest = getFileToTest(path, conf);
SeekableInput input = new FsInput(fileToTest, conf);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
@ -340,8 +328,37 @@ public static LogicalType createDecimalType(Integer precision, Integer scale, Co
return LogicalTypes.decimal(precision, scale);
}
private static Path getFileToTest(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (!fs.isDirectory(path)) {
return path;
}
FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fileStatuses.length == 0) {
return null;
}
return fileStatuses[0].getPath();
}
public static Schema parseAvroSchema(String schemaString) {
return new Schema.Parser().parse(schemaString);
}
public static Schema getAvroSchemaFromParquetFile(Path path, Configuration conf) throws IOException {
Path fileToTest = getFileToTest(path, conf);
if (fileToTest == null) {
return null;
}
ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(conf, fileToTest, ParquetMetadataConverter.NO_FILTER);
MessageType parquetSchema = parquetMetadata.getFileMetaData().getSchema();
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
return avroSchemaConverter.convert(parquetSchema);
}
}

View File

@ -46,7 +46,6 @@
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
@ -869,7 +868,7 @@ public boolean isDirectModeAccumuloSupported() {
}
public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
return options.getParquetConfiguratorImplementation().createFactory();
}
}

View File

@ -49,6 +49,8 @@
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.orm.AvroSchemaGenerator;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
/**
* Actually runs a jdbc import job using the ORM files generated by the
* sqoop.orm package. Uses DataDrivenDBInputFormat.
@ -114,6 +116,7 @@ protected void configureMapper(Job job, String tableName,
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
Path destination = getContext().getDestination();
options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
}

View File

@ -152,6 +152,7 @@ protected void configureOutputFormat(Job job, String tableName,
String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
if (!shortName.equalsIgnoreCase("default")) {
conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
options.getConf().set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
}
}
}

View File

@ -25,6 +25,8 @@ public final class ParquetConstants {
public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
public static final String PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY = "parquetjob.configurator.implementation";
private ParquetConstants() {
throw new AssertionError("This class is meant for static use only.");
}

View File

@ -25,6 +25,10 @@
import java.io.IOException;
/**
* This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
* The implementations of the methods of this interface help to configure Sqoop Parquet export jobs.
*/
public interface ParquetExportJobConfigurator {
void configureInputFormat(Job job, Path inputPath) throws IOException;

View File

@ -27,6 +27,10 @@
import java.io.IOException;
/**
* This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
* The implementations of the methods of this interface help to configure Sqoop Parquet import jobs.
*/
public interface ParquetImportJobConfigurator {
void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;

View File

@ -18,6 +18,14 @@
package org.apache.sqoop.mapreduce.parquet;
/**
* This interface is an abstract factory of objects which configure Sqoop Parquet jobs.
* Every product is responsible for configuring different types of Sqoop jobs.
*
* @see ParquetImportJobConfigurator
* @see ParquetExportJobConfigurator
* @see ParquetMergeJobConfigurator
*/
public interface ParquetJobConfiguratorFactory {
ParquetImportJobConfigurator createParquetImportJobConfigurator();

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
/**
* An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}.
* The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects.
*/
public enum ParquetJobConfiguratorImplementation {
KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class);
private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass;
ParquetJobConfiguratorImplementation(Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass) {
this.configuratorFactoryClass = configuratorFactoryClass;
}
public ParquetJobConfiguratorFactory createFactory() {
try {
return configuratorFactoryClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Could not instantiate factory class: " + configuratorFactoryClass, e);
}
}
}

View File

@ -24,6 +24,10 @@
import java.io.IOException;
/**
* This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
* The implementations of the methods of this interface help to configure Sqoop Parquet merge jobs.
*/
public interface ParquetMergeJobConfigurator {
void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;

View File

@ -16,19 +16,20 @@
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
import org.apache.avro.generic.GenericRecord;
import org.apache.sqoop.mapreduce.MergeParquetReducer;
public final class ParquetJobConfiguratorFactoryProvider {
import java.io.IOException;
private ParquetJobConfiguratorFactoryProvider() {
throw new AssertionError("This class is meant for static use only.");
/**
* An implementation of {@link MergeParquetReducer} which depends on the Hadoop Parquet library.
*/
public class HadoopMergeParquetReducer extends MergeParquetReducer<Void, GenericRecord> {
@Override
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
context.write(null, record);
}
public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
return new KiteParquetJobConfiguratorFactory();
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import parquet.avro.AvroParquetInputFormat;
import java.io.IOException;
/**
* An implementation of {@link ParquetExportJobConfigurator} which depends on the Hadoop Parquet library.
*/
public class HadoopParquetExportJobConfigurator implements ParquetExportJobConfigurator {
@Override
public void configureInputFormat(Job job, Path inputPath) throws IOException {
// do nothing
}
@Override
public Class<? extends Mapper> getMapperClass() {
return HadoopParquetExportMapper.class;
}
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return AvroParquetInputFormat.class;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
import java.io.IOException;
/**
* An implementation of {@link GenericRecordExportMapper} which depends on the Hadoop Parquet library.
*/
public class HadoopParquetExportMapper extends GenericRecordExportMapper<Void, GenericRecord> {
@Override
protected void map(Void key, GenericRecord val, Context context) throws IOException, InterruptedException {
context.write(toSqoopRecord(val), NullWritable.get());
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import parquet.avro.AvroParquetOutputFormat;
import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
/**
* An implementation of {@link ParquetImportJobConfigurator} which depends on the Hadoop Parquet library.
*/
public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfigurator {
private static final Log LOG = LogFactory.getLog(HadoopParquetImportJobConfigurator.class.getName());
@Override
public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
configureAvroSchema(job, schema);
configureOutputCodec(job);
}
@Override
public Class<? extends Mapper> getMapperClass() {
return HadoopParquetImportMapper.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return AvroParquetOutputFormat.class;
}
void configureOutputCodec(Job job) {
String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY);
if (outputCodec != null) {
LOG.info("Using output codec: " + outputCodec);
ParquetOutputFormat.setCompression(job, CompressionCodecName.fromConf(outputCodec));
}
}
void configureAvroSchema(Job job, Schema schema) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using Avro schema: " + schema);
}
AvroParquetOutputFormat.setSchema(job, schema);
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.lib.LargeObjectLoader;
import org.apache.sqoop.mapreduce.ParquetImportMapper;
import java.io.IOException;
/**
* An implementation of {@link ParquetImportMapper} which depends on the Hadoop Parquet library.
*/
public class HadoopParquetImportMapper extends ParquetImportMapper<NullWritable, GenericRecord> {
private static final Log LOG = LogFactory.getLog(HadoopParquetImportMapper.class.getName());
/**
* The key to get the configuration value set by
* parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
*/
private static final String HADOOP_PARQUET_AVRO_SCHEMA_KEY = "parquet.avro.schema";
@Override
protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
return new LargeObjectLoader(context.getConfiguration(), FileOutputFormat.getWorkOutputPath(context));
}
@Override
protected Schema getAvroSchema(Configuration configuration) {
String schemaString = configuration.get(HADOOP_PARQUET_AVRO_SCHEMA_KEY);
LOG.debug("Found Avro schema: " + schemaString);
return AvroUtil.parseAvroSchema(schemaString);
}
@Override
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
context.write(null, record);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
/**
* A concrete factory implementation which produces configurator objects using the Hadoop Parquet library.
*/
public class HadoopParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
@Override
public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
return new HadoopParquetImportJobConfigurator();
}
@Override
public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
return new HadoopParquetExportJobConfigurator();
}
@Override
public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
return new HadoopParquetMergeJobConfigurator();
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.hadoop;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.mapreduce.MergeParquetMapper;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import parquet.avro.AvroParquetInputFormat;
import java.io.IOException;
import static java.lang.String.format;
import static java.util.Collections.singleton;
import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
/**
* An implementation of {@link ParquetMergeJobConfigurator} which depends on the Hadoop Parquet library.
*/
public class HadoopParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
public static final Log LOG = LogFactory.getLog(HadoopParquetMergeJobConfigurator.class.getName());
private final HadoopParquetImportJobConfigurator importJobConfigurator;
private final HadoopParquetExportJobConfigurator exportJobConfigurator;
public HadoopParquetMergeJobConfigurator(HadoopParquetImportJobConfigurator importJobConfigurator, HadoopParquetExportJobConfigurator exportJobConfigurator) {
this.importJobConfigurator = importJobConfigurator;
this.exportJobConfigurator = exportJobConfigurator;
}
public HadoopParquetMergeJobConfigurator() {
this(new HadoopParquetImportJobConfigurator(), new HadoopParquetExportJobConfigurator());
}
@Override
public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
Path finalPath) throws IOException {
try {
LOG.info("Trying to merge parquet files");
job.setOutputKeyClass(Void.class);
job.setMapperClass(MergeParquetMapper.class);
job.setReducerClass(HadoopMergeParquetReducer.class);
job.setOutputValueClass(GenericRecord.class);
Schema avroSchema = loadAvroSchema(conf, oldPath);
validateNewPathAvroSchema(getAvroSchemaFromParquetFile(newPath, conf), avroSchema);
job.setInputFormatClass(exportJobConfigurator.getInputFormatClass());
AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
importJobConfigurator.configureAvroSchema(job, avroSchema);
importJobConfigurator.configureOutputCodec(job);
job.setOutputFormatClass(importJobConfigurator.getOutputFormatClass());
} catch (Exception cnfe) {
throw new IOException(cnfe);
}
}
private Schema loadAvroSchema(Configuration conf, Path path) throws IOException {
Schema avroSchema = getAvroSchemaFromParquetFile(path, conf);
if (avroSchema == null) {
throw new RuntimeException("Could not load Avro schema from path: " + path);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Avro schema loaded: " + avroSchema);
}
return avroSchema;
}
/**
* This method ensures that the Avro schema in the new path is compatible with the Avro schema in the old path.
*/
private void validateNewPathAvroSchema(Schema newPathAvroSchema, Schema avroSchema) {
// If the new path is an empty directory (e.g. in case of a sqoop merge command) then the newPathAvroSchema will
// be null. In that case we just want to proceed without real validation.
if (newPathAvroSchema == null) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(format("Validation Avro schema %s against %s", newPathAvroSchema.toString(), avroSchema.toString()));
}
SchemaValidator schemaValidator = new SchemaValidatorBuilder().mutualReadStrategy().validateAll();
try {
schemaValidator.validate(newPathAvroSchema, singleton(avroSchema));
} catch (SchemaValidationException e) {
throw new RuntimeException("Cannot merge files, the Avro schemas are not compatible.", e);
}
}
}

View File

@ -24,6 +24,9 @@
import java.io.IOException;
/**
* An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API.
*/
public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
@Override

View File

@ -28,6 +28,9 @@
import java.io.IOException;
/**
* An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API.
*/
public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
@Override

View File

@ -25,7 +25,7 @@
import java.io.IOException;
/**
* Exports Parquet records from a data source.
* An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API.
*/
public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {

View File

@ -35,6 +35,9 @@
import java.io.IOException;
/**
* An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API.
*/
public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());

View File

@ -30,6 +30,9 @@
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
/**
* An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API.
*/
public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
@Override

View File

@ -23,6 +23,9 @@
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
/**
* A concrete factory implementation which produces configurator objects using the Kite Dataset API.
*/
public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
@Override

View File

@ -48,6 +48,9 @@
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
/**
* An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API.
*/
public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());

View File

@ -45,7 +45,7 @@
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
/**
* Helper class for setting up a Parquet MapReduce job.
* Helper class using the Kite Dataset API for setting up a Parquet MapReduce job.
*/
public final class KiteParquetUtils {

View File

@ -19,6 +19,9 @@
package org.apache.sqoop.tool;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
import java.io.File;
import java.io.FileInputStream;
@ -34,12 +37,11 @@
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.manager.SupportedManagers;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.password.CredentialProviderHelper;
@ -183,6 +185,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
public static final String THROW_ON_ERROR_ARG = "throw-on-error";
public static final String ORACLE_ESCAPING_DISABLED = "oracle-escaping-disabled";
public static final String ESCAPE_MAPPING_COLUMN_NAMES_ENABLED = "escape-mapping-column-names";
public static final String PARQUET_CONFIGURATOR_IMPLEMENTATION = "parquet-configurator-implementation";
// Arguments for validation.
public static final String VALIDATE_ARG = "validate";
@ -1145,6 +1148,8 @@ protected void applyCommonOptions(CommandLine in, SqoopOptions out)
out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
}
applyParquetJobConfigurationImplementation(in, out);
}
private void applyCredentialsOptions(CommandLine in, SqoopOptions out)
@ -1908,7 +1913,27 @@ protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.Inva
}
public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration) {
return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration);
private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
String optionValue = in.getOptionValue(PARQUET_CONFIGURATOR_IMPLEMENTATION);
String propertyValue = out.getConf().get(PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY);
String valueToUse = isBlank(optionValue) ? propertyValue : optionValue;
if (isBlank(valueToUse)) {
LOG.debug("Parquet job configurator implementation is not set, using default value: " + out.getParquetConfiguratorImplementation());
return;
}
try {
ParquetJobConfiguratorImplementation parquetConfiguratorImplementation = valueOf(valueToUse.toUpperCase());
out.setParquetConfiguratorImplementation(parquetConfiguratorImplementation);
LOG.debug("Parquet job configurator implementation set: " + parquetConfiguratorImplementation);
} catch (IllegalArgumentException e) {
throw new InvalidOptionsException(format("Invalid Parquet job configurator implementation is set: %s. Supported values are: %s", valueToUse, Arrays.toString(ParquetJobConfiguratorImplementation.values())));
}
}
public ParquetJobConfiguratorFactory getParquetJobConfigurator(SqoopOptions sqoopOptions) {
return sqoopOptions.getParquetConfiguratorImplementation().createFactory();
}
}

View File

@ -473,7 +473,7 @@ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context)
loadJars(options.getConf(), context.getJarFile(), context.getTableName());
}
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator();
MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
if (mergeJob.runMergeJob()) {
// Rename destination directory to proper location.

View File

@ -53,7 +53,7 @@ public MergeTool(String toolName) {
public int run(SqoopOptions options) {
try {
// Configure and execute a MapReduce job to merge these datasets.
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator();
MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
if (!mergeJob.runMergeJob()) {
LOG.error("MapReduce job failed!");

View File

@ -37,6 +37,7 @@
import org.apache.sqoop.testutil.ExportJobTestCase;
import org.junit.Test;
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
/**
@ -57,7 +58,7 @@ private void runBigDecimalExport(String line)
writer.close();
String[] types =
{ "DECIMAL", "NUMERIC" };
createTableWithColTypes(types, null);
createTableWithColTypes(types, emptyList());
List<String> args = new ArrayList<String>();

View File

@ -26,6 +26,8 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.apache.sqoop.manager.ConnManager;
@ -52,6 +54,8 @@
import org.junit.Before;
import org.junit.Test;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.junit.Assert.fail;
/**
@ -80,6 +84,8 @@ public class TestMerge extends BaseSqoopTestCase {
Arrays.asList(new Integer(1), new Integer(43)),
Arrays.asList(new Integer(3), new Integer(313)));
private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE;
@Before
public void setUp() {
super.setUp();
@ -112,6 +118,7 @@ public Configuration newConf() {
public SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions options = new SqoopOptions(conf);
options.setConnectString(HsqldbTestServer.getDbUrl());
options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation);
return options;
}
@ -157,7 +164,14 @@ public void testAvroFileMerge() throws Exception {
}
@Test
public void testParquetFileMerge() throws Exception {
public void testParquetFileMergeHadoop() throws Exception {
parquetJobConfiguratorImplementation = HADOOP;
runMergeTest(SqoopOptions.FileLayout.ParquetFile);
}
@Test
public void testParquetFileMergeKite() throws Exception {
parquetJobConfiguratorImplementation = KITE;
runMergeTest(SqoopOptions.FileLayout.ParquetFile);
}

View File

@ -32,6 +32,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import parquet.avro.AvroParquetWriter;
import java.io.IOException;
@ -42,6 +44,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@ -55,11 +58,23 @@
/**
* Test that we can export Parquet Data Files from HDFS into databases.
*/
@RunWith(Parameterized.class)
public class TestParquetExport extends ExportJobTestCase {
@Parameterized.Parameters(name = "parquetImplementation = {0}")
public static Iterable<? extends Object> parquetImplementationParameters() {
return Arrays.asList("kite", "hadoop");
}
@Rule
public ExpectedException thrown = ExpectedException.none();
private final String parquetImplementation;
public TestParquetExport(String parquetImplementation) {
this.parquetImplementation = parquetImplementation;
}
/**
* @return an argv for the CodeGenTool to use when creating tables to export.
*/
@ -478,5 +493,10 @@ public void testMissingParquetFields() throws IOException, SQLException {
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
}
@Override
protected Configuration getConf() {
Configuration conf = super.getConf();
conf.set("parquetjob.configurator.implementation", parquetImplementation);
return conf;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.sqoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.apache.sqoop.testutil.ImportJobTestCase;
@ -32,12 +31,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Test;
import parquet.avro.AvroSchemaConverter;
import parquet.format.CompressionCodec;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -46,20 +42,38 @@
import java.util.Arrays;
import java.util.List;
import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/**
* Tests --as-parquetfile.
*/
@RunWith(Parameterized.class)
public class TestParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory
.getLog(TestParquetImport.class.getName());
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
@Parameters(name = "parquetImplementation = {0}")
public static Iterable<? extends Object> parquetImplementationParameters() {
return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
}
private final String parquetImplementation;
public TestParquetImport(String parquetImplementation) {
this.parquetImplementation = parquetImplementation;
}
/**
* Create the argv to pass to Sqoop.
*
@ -122,12 +136,30 @@ public void testSnappyCompression() throws IOException {
}
@Test
public void testDeflateCompression() throws IOException {
public void testHadoopGzipCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
runParquetImportTest("gzip");
}
@Test
public void testKiteDeflateCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
// The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
// See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
runParquetImportTest("deflate", "gzip");
}
/**
* This test case is added to document that the deflate codec is not supported with
* the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
* @throws IOException
*/
@Test(expected = IOException.class)
public void testHadoopDeflateCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
runParquetImportTest("deflate");
}
private void runParquetImportTest(String codec) throws IOException {
runParquetImportTest(codec, codec);
}
@ -141,9 +173,10 @@ private void runParquetImportTest(String codec, String expectedCodec) throws IOE
String [] extraArgs = { "--compression-codec", codec};
runImport(getOutputArgv(true, extraArgs));
assertEquals(expectedCodec.toUpperCase(), getCompressionType());
ParquetReader parquetReader = new ParquetReader(getTablePath());
assertEquals(expectedCodec.toUpperCase(), parquetReader.getCodec().name());
Schema schema = getSchema();
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
@ -155,7 +188,7 @@ private void runParquetImportTest(String codec, String expectedCodec) throws IOE
checkField(fields.get(5), "DATA_COL5", Type.STRING);
checkField(fields.get(6), "DATA_COL6", Type.BYTES);
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
List<GenericRecord> genericRecords = parquetReader.readAll();
GenericRecord record1 = genericRecords.get(0);
assertNotNull(record1);
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
@ -181,7 +214,7 @@ public void testOverrideTypeMapping() throws IOException {
String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
runImport(getOutputArgv(true, extraArgs));
Schema schema = getSchema();
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
@ -202,7 +235,7 @@ public void testFirstUnderscoreInColumnName() throws IOException {
runImport(getOutputArgv(true, null));
Schema schema = getSchema();
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
@ -223,7 +256,7 @@ public void testNonIdentCharactersInColumnName() throws IOException {
runImport(getOutputArgv(true, null));
Schema schema = getSchema();
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
@ -295,29 +328,6 @@ public void testOverwriteParquetDatasetFail() throws IOException, SQLException {
}
}
private String getCompressionType() {
ParquetMetadata parquetMetadata = getOutputMetadata();
CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec();
return parquetCompressionCodec.name();
}
private ParquetMetadata getOutputMetadata() {
try {
Configuration config = new Configuration();
FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath());
List<Footer> footers = ParquetFileReader.readFooters(config, fileStatus, false);
return footers.get(0).getParquetMetadata();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Schema getSchema() {
MessageType parquetSchema = getOutputMetadata().getFileMetaData().getSchema();
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
return avroSchemaConverter.convert(parquetSchema);
}
private void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Type.UNION, field.schema().getType());
@ -325,4 +335,10 @@ private void checkField(Field field, String name, Type type) {
assertEquals(type, field.schema().getTypes().get(1).getType());
}
@Override
protected Configuration getConf() {
Configuration conf = super.getConf();
conf.set("parquetjob.configurator.implementation", parquetImplementation);
return conf;
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import parquet.hadoop.metadata.CompressionCodecName;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
public class TestParquetIncrementalImportMerge extends ImportJobTestCase {
private static final String[] TEST_COLUMN_TYPES = {"INTEGER", "VARCHAR(32)", "CHAR(64)", "TIMESTAMP"};
private static final String[] ALTERNATIVE_TEST_COLUMN_TYPES = {"INTEGER", "INTEGER", "INTEGER", "TIMESTAMP"};
private static final String INITIAL_RECORDS_TIMESTAMP = "2018-06-14 15:00:00.000";
private static final String NEW_RECORDS_TIMESTAMP = "2018-06-14 16:00:00.000";
private static final List<List<Object>> INITIAL_RECORDS = Arrays.<List<Object>>asList(
Arrays.<Object>asList(2006, "Germany", "Italy", INITIAL_RECORDS_TIMESTAMP),
Arrays.<Object>asList(2014, "Brazil", "Hungary", INITIAL_RECORDS_TIMESTAMP)
);
private static final List<Object> ALTERNATIVE_INITIAL_RECORD = Arrays.<Object>asList(1, 2, 3, INITIAL_RECORDS_TIMESTAMP);
private static final List<List<Object>> NEW_RECORDS = Arrays.<List<Object>>asList(
Arrays.<Object>asList(2010, "South Africa", "Spain", NEW_RECORDS_TIMESTAMP),
Arrays.<Object>asList(2014, "Brazil", "Germany", NEW_RECORDS_TIMESTAMP)
);
private static final List<String> EXPECTED_MERGED_RECORDS = asList(
"2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
"2010,South Africa,Spain," + timeFromString(NEW_RECORDS_TIMESTAMP),
"2014,Brazil,Germany," + timeFromString(NEW_RECORDS_TIMESTAMP)
);
private static final List<String> EXPECTED_INITIAL_RECORDS = asList(
"2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
"2014,Brazil,Hungary," + timeFromString(INITIAL_RECORDS_TIMESTAMP)
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Override
public void setUp() {
super.setUp();
createTableWithRecords(TEST_COLUMN_TYPES, INITIAL_RECORDS);
}
@Test
public void testSimpleMerge() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
runImport(args);
clearTable(getTableName());
insertRecordsIntoTable(TEST_COLUMN_TYPES, NEW_RECORDS);
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
runImport(args);
List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
assertEquals(EXPECTED_MERGED_RECORDS, result);
}
@Test
public void testMergeWhenTheIncrementalImportDoesNotImportAnyRows() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
runImport(args);
clearTable(getTableName());
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
runImport(args);
List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
assertEquals(EXPECTED_INITIAL_RECORDS, result);
}
@Test
public void testMergeWithIncompatibleSchemas() throws Exception {
String targetDir = getWarehouseDir() + "/testMergeWithIncompatibleSchemas";
String[] args = initialImportArgs(getConnectString(), getTableName(), targetDir).build();
runImport(args);
incrementTableNum();
createTableWithColTypes(ALTERNATIVE_TEST_COLUMN_TYPES, ALTERNATIVE_INITIAL_RECORD);
args = incrementalImportArgs(getConnectString(), getTableName(), targetDir, getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
expectedException.expectMessage("Cannot merge files, the Avro schemas are not compatible.");
runImportThrowingException(args);
}
@Test
public void testMergedFilesHaveCorrectCodec() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString())
.withOption("compression-codec", "snappy")
.build();
runImport(args);
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP)
.withOption("compression-codec", "gzip")
.build();
runImport(args);
CompressionCodecName compressionCodec = new ParquetReader(getTablePath()).getCodec();
assertEquals(GZIP, compressionCodec);
}
private ArgumentArrayBuilder initialImportArgs(String connectString, String tableName, String targetDir) {
return new ArgumentArrayBuilder()
.withProperty("parquetjob.configurator.implementation", "hadoop")
.withOption("connect", connectString)
.withOption("table", tableName)
.withOption("num-mappers", "1")
.withOption("target-dir", targetDir)
.withOption("as-parquetfile");
}
private ArgumentArrayBuilder incrementalImportArgs(String connectString, String tableName, String targetDir, String checkColumn, String mergeKey, String lastValue) {
return initialImportArgs(connectString, tableName, targetDir)
.withOption("incremental", "lastmodified")
.withOption("check-column", checkColumn)
.withOption("merge-key", mergeKey)
.withOption("last-value", lastValue);
}
private static long timeFromString(String timeStampString) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return format.parse(timeStampString).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -90,6 +90,7 @@ public void setup() {
excludedFieldsFromClone.add("layout");
excludedFieldsFromClone.add("activeSqoopTool");
excludedFieldsFromClone.add("hbaseNullIncrementalMode");
excludedFieldsFromClone.add("parquetConfiguratorImplementation");
}
@After

View File

@ -18,7 +18,6 @@
package org.apache.sqoop.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
@ -67,7 +66,7 @@ public void testImport() throws Exception {
List<Object> columnValues = Arrays.<Object>asList("test", 42, "somestring");
String[] types = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
createTableWithColTypes(types, toStringArray(columnValues));
createTableWithColTypes(types, columnValues);
String[] args = new ArgumentArrayBuilder()
.withProperty(YarnConfiguration.RM_PRINCIPAL, miniKdcInfrastructure.getTestPrincipal())
@ -83,19 +82,4 @@ public void testImport() throws Exception {
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertEquals(columnValues, rows.get(0));
}
private String[] toStringArray(List<Object> columnValues) {
String[] result = new String[columnValues.size()];
for (int i = 0; i < columnValues.size(); i++) {
if (columnValues.get(i) instanceof String) {
result[i] = StringUtils.wrap((String) columnValues.get(i), '\'');
} else {
result[i] = columnValues.get(i).toString();
}
}
return result;
}
}

View File

@ -42,7 +42,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.wrap;
import static org.junit.Assert.fail;
/**
@ -427,6 +429,16 @@ protected void insertIntoTable(String[] colTypes, String[] vals) {
insertIntoTable(null, colTypes, vals);
}
protected void insertIntoTable(String[] colTypes, List<Object> record) {
insertIntoTable(null, colTypes, toStringArray(record));
}
protected void insertRecordsIntoTable(String[] colTypes, List<List<Object>> records) {
for (List<Object> record : records) {
insertIntoTable(colTypes, record);
}
}
protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) {
assert colTypes != null;
assert colTypes.length == vals.length;
@ -575,6 +587,17 @@ protected void createTableWithColTypes(String [] colTypes, String [] vals) {
createTableWithColTypesAndNames(colNames, colTypes, vals);
}
protected void createTableWithColTypes(String [] colTypes, List<Object> record) {
createTableWithColTypes(colTypes, toStringArray(record));
}
protected void createTableWithRecords(String [] colTypes, List<List<Object>> records) {
createTableWithColTypes(colTypes, records.get(0));
for (int i = 1; i < records.size(); i++) {
insertIntoTable(colTypes, records.get(i));
}
}
/**
* Create a table with a single column and put a data element in it.
* @param colType the type of the column to create
@ -627,4 +650,28 @@ protected void removeTableDir() {
return ObjectArrays.concat(entries, moreEntries, String.class);
}
protected void clearTable(String tableName) throws SQLException {
String truncateCommand = "DELETE FROM " + tableName;
Connection conn = getManager().getConnection();
try (PreparedStatement statement = conn.prepareStatement(truncateCommand)){
statement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private String[] toStringArray(List<Object> columnValues) {
String[] result = new String[columnValues.size()];
for (int i = 0; i < columnValues.size(); i++) {
if (columnValues.get(i) instanceof String) {
result[i] = wrap((String) columnValues.get(i), '\'');
} else {
result[i] = columnValues.get(i).toString();
}
}
return result;
}
}

View File

@ -37,6 +37,7 @@
import org.apache.sqoop.util.ClassLoaderStack;
import org.junit.Before;
import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@ -219,12 +220,7 @@ protected void runImport(SqoopTool tool, String [] argv) throws IOException {
// run the tool through the normal entry-point.
int ret;
try {
Configuration conf = getConf();
//Need to disable OraOop for existing tests
conf.set("oraoop.disabled", "true");
SqoopOptions opts = getSqoopOptions(conf);
Sqoop sqoop = new Sqoop(tool, conf, opts);
ret = Sqoop.runSqoop(sqoop, argv);
ret = runSqoopTool(tool, argv, getSqoopOptions(getConf()));
} catch (Exception e) {
LOG.error("Got exception running Sqoop: " + e.toString());
e.printStackTrace();
@ -237,9 +233,40 @@ protected void runImport(SqoopTool tool, String [] argv) throws IOException {
}
}
private int runSqoopTool(SqoopTool tool, String [] argv, SqoopOptions sqoopOptions) {
Configuration conf = getConf();
//Need to disable OraOop for existing tests
conf.set("oraoop.disabled", "true");
Sqoop sqoop = new Sqoop(tool, conf, sqoopOptions);
return Sqoop.runSqoop(sqoop, argv);
}
protected int runImportThrowingException(SqoopTool tool, String [] argv) {
String oldRethrowProperty = System.getProperty(SQOOP_RETHROW_PROPERTY);
System.setProperty(SQOOP_RETHROW_PROPERTY, "true");
SqoopOptions sqoopOptions = getSqoopOptions(getConf());
sqoopOptions.setThrowOnError(true);
try {
return runSqoopTool(tool, argv, sqoopOptions);
} finally {
if (oldRethrowProperty == null) {
System.clearProperty(SQOOP_RETHROW_PROPERTY);
} else {
System.setProperty(SQOOP_RETHROW_PROPERTY, oldRethrowProperty);
}
}
}
/** run an import using the default ImportTool. */
protected void runImport(String [] argv) throws IOException {
runImport(new ImportTool(), argv);
}
protected void runImportThrowingException(String [] argv) {
runImportThrowingException(new ImportTool(), argv);
}
}

View File

@ -18,15 +18,21 @@
package org.apache.sqoop.tool;
import org.apache.commons.cli.CommandLine;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestBaseSqoopTool {
@ -35,11 +41,13 @@ public class TestBaseSqoopTool {
private BaseSqoopTool testBaseSqoopTool;
private SqoopOptions testSqoopOptions;
private CommandLine mockCommandLine;
@Before
public void setup() {
testBaseSqoopTool = mock(BaseSqoopTool.class, Mockito.CALLS_REAL_METHODS);
testSqoopOptions = new SqoopOptions();
mockCommandLine = mock(CommandLine.class);
}
@Test
@ -69,4 +77,61 @@ public void testRethrowIfRequiredWithRethrowPropertySetAndException() {
testBaseSqoopTool.rethrowIfRequired(testSqoopOptions, expectedCauseException);
}
@Test
public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLine() throws Exception {
ParquetJobConfiguratorImplementation expectedValue = HADOOP;
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString());
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
}
@Test
public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLineCaseInsensitively() throws Exception {
String hadoopImplementationLowercase = "haDooP";
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(hadoopImplementationLowercase);
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation());
}
@Test
public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromConfiguration() throws Exception {
ParquetJobConfiguratorImplementation expectedValue = HADOOP;
testSqoopOptions.getConf().set("parquetjob.configurator.implementation", expectedValue.toString());
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
}
@Test
public void testApplyCommonOptionsPrefersParquetJobConfigurationImplementationFromCommandLine() throws Exception {
ParquetJobConfiguratorImplementation expectedValue = HADOOP;
testSqoopOptions.getConf().set("parquetjob.configurator.implementation", "kite");
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString());
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
}
@Test
public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception {
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid");
exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]");
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
}
@Test
public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception {
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation());
}
}

View File

@ -20,8 +20,16 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.avro.AvroParquetReader;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.util.HiddenFileFilter;
import java.io.IOException;
import java.util.ArrayDeque;
@ -29,8 +37,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.sqoop.util.FileSystemUtil.isFile;
import static org.apache.sqoop.util.FileSystemUtil.listFiles;
@ -95,6 +105,49 @@ public List<String> readAllInCsv() {
return result;
}
public List<String> readAllInCsvSorted() {
List<String> result = readAllInCsv();
Collections.sort(result);
return result;
}
public CompressionCodecName getCodec() {
List<Footer> footers = getFooters();
Iterator<Footer> footersIterator = footers.iterator();
if (footersIterator.hasNext()) {
Footer footer = footersIterator.next();
Iterator<BlockMetaData> blockMetaDataIterator = footer.getParquetMetadata().getBlocks().iterator();
if (blockMetaDataIterator.hasNext()) {
BlockMetaData blockMetaData = blockMetaDataIterator.next();
Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
if (columnChunkMetaDataIterator.hasNext()) {
ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
return columnChunkMetaData.getCodec();
}
}
}
return null;
}
private List<Footer> getFooters() {
final List<Footer> footers;
try {
FileSystem fs = pathToRead.getFileSystem(configuration);
List<FileStatus> statuses = asList(fs.listStatus(pathToRead, HiddenFileFilter.INSTANCE));
footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
return footers;
}
private String convertToCsv(GenericRecord record) {
StringBuilder result = new StringBuilder();
for (int i = 0; i < record.getSchema().getFields().size(); i++) {