5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 07:59:37 +08:00

SQOOP-3319: Extract code using Kite into separate classes

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-05-29 10:17:25 +02:00
parent ad7d046ef1
commit 3233db8e1c
43 changed files with 720 additions and 208 deletions

View File

@ -340,4 +340,8 @@ public static LogicalType createDecimalType(Integer precision, Integer scale, Co
return LogicalTypes.decimal(precision, scale);
}
public static Schema parseAvroSchema(String schemaString) {
return new Schema.Parser().parse(schemaString);
}
}

View File

@ -45,6 +45,8 @@
import org.apache.sqoop.hive.HiveTypes;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
@ -866,5 +868,8 @@ public boolean isDirectModeAccumuloSupported() {
return false;
}
public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
}
}

View File

@ -65,7 +65,7 @@ public void exportTable(
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
ExportBatchOutputFormat.class);
ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@ -80,7 +80,7 @@ public void upsertTable(
context.setConnManager(this);
JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context,
CubridUpsertOutputFormat.class);
CubridUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -111,7 +111,7 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
ExportBatchOutputFormat.class);
ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -585,7 +585,8 @@ public void exportTable(ExportJobContext context)
new PostgreSQLCopyExportJob(context,
null,
ExportInputFormat.class,
NullOutputFormat.class);
NullOutputFormat.class,
getParquetJobConfigurator().createParquetExportJobConfigurator());
job.runExport();
}
}

View File

@ -90,7 +90,7 @@ public void importTable(org.apache.sqoop.manager.ImportJobContext context)
importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new MainframeImportJob(opts, context);
importer = new MainframeImportJob(opts, context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
importer.setInputFormatClass(MainframeDatasetInputFormat.class);

View File

@ -138,7 +138,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context)
LOG.warn("documentation for additional limitations.");
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -462,7 +462,7 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context,
null, null, ExportBatchOutputFormat.class);
null, null, ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@ -474,7 +474,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -181,10 +181,10 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
JdbcExportJob exportJob;
if (isNonResilientOperation()) {
exportJob = new JdbcExportJob(context, null, null,
SqlServerExportBatchOutputFormat.class);
SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
} else {
exportJob = new JdbcExportJob(context, null, null,
SQLServerResilientExportOutputFormat.class);
SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
configureConnectionRecoveryForExport(context);
}
exportJob.runExport();
@ -202,7 +202,7 @@ public void updateTable(
} else {
context.setConnManager(this);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
null, SQLServerResilientUpdateOutputFormat.class);
null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
configureConnectionRecoveryForUpdate(context);
exportJob.runExport();
}
@ -223,7 +223,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context)
}
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -682,7 +682,7 @@ public void importTable(org.apache.sqoop.manager.ImportJobContext context)
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
context);
context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
checkTableImportOptions(context);
@ -725,7 +725,7 @@ public void importQuery(org.apache.sqoop.manager.ImportJobContext context)
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
context);
context, getParquetJobConfigurator().createParquetImportJobConfigurator());
}
String splitCol = getSplitColumn(opts, null);
@ -926,7 +926,7 @@ protected int getMetadataIsolationLevel() {
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context);
JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@ -935,7 +935,7 @@ public void callTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException,
ExportException {
context.setConnManager(this);
JdbcCallExportJob exportJob = new JdbcCallExportJob(context);
JdbcCallExportJob exportJob = new JdbcCallExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@ -960,7 +960,7 @@ public void updateTable(
org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -321,7 +321,7 @@ public void exportTable(ExportJobContext context) throws IOException,
throw ex;
}
JdbcExportJob exportJob =
new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
new JdbcExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@ -343,7 +343,7 @@ public void updateTable(ExportJobContext context) throws IOException,
}
JdbcUpdateExportJob exportJob =
new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

View File

@ -26,8 +26,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@ -48,10 +46,8 @@
import org.apache.sqoop.mapreduce.ImportJobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.orm.AvroSchemaGenerator;
import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
/**
* Actually runs a jdbc import job using the ORM files generated by the
@ -62,15 +58,24 @@ public class DataDrivenImportJob extends ImportJobBase {
public static final Log LOG = LogFactory.getLog(
DataDrivenImportJob.class.getName());
@SuppressWarnings("unchecked")
public DataDrivenImportJob(final SqoopOptions opts) {
super(opts, null, DataDrivenDBInputFormat.class, null, null);
private final ParquetImportJobConfigurator parquetImportJobConfigurator;
public DataDrivenImportJob(final SqoopOptions opts,
final Class<? extends InputFormat> inputFormatClass,
ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
super(opts, null, inputFormatClass, null, context);
this.parquetImportJobConfigurator = parquetImportJobConfigurator;
}
public DataDrivenImportJob(final SqoopOptions opts,
final Class<? extends InputFormat> inputFormatClass,
ImportJobContext context) {
super(opts, null, inputFormatClass, null, context);
this(opts, inputFormatClass, context, null);
}
@SuppressWarnings("unchecked")
public DataDrivenImportJob(final SqoopOptions opts) {
this(opts, DataDrivenDBInputFormat.class, null);
}
@Override
@ -101,53 +106,20 @@ protected void configureMapper(Job job, String tableName,
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
JobConf conf = (JobConf)job.getConfiguration();
// Kite SDK requires an Avro schema to represent the data structure of
// target dataset. If the schema name equals to generated java class name,
// the import will fail. So we use table name as schema name and add a
// prefix "codegen_" to generated java class to avoid the conflict.
final String schemaNameOverride = tableName;
Schema schema = generateAvroSchema(tableName, schemaNameOverride);
String uri = getKiteUri(conf, tableName);
ParquetJob.WriteMode writeMode;
Path destination = getContext().getDestination();
if (options.doHiveImport()) {
if (options.doOverwriteHiveTable()) {
writeMode = ParquetJob.WriteMode.OVERWRITE;
} else {
writeMode = ParquetJob.WriteMode.APPEND;
if (Datasets.exists(uri)) {
LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
"append data into the existing Hive table. Consider using " +
"--hive-overwrite, if you do NOT intend to do appending.");
}
}
} else {
// Note that there is no such an import argument for overwriting HDFS
// dataset, so overwrite mode is not supported yet.
// Sqoop's append mode means to merge two independent datasets. We
// choose DEFAULT as write mode.
writeMode = ParquetJob.WriteMode.DEFAULT;
}
ParquetJob.configureImportJob(conf, schema, uri, writeMode);
parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
}
job.setMapperClass(getMapperClass());
}
private String getKiteUri(Configuration conf, String tableName) throws IOException {
if (options.doHiveImport()) {
String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
options.getHiveDatabaseName();
String hiveTable = options.getHiveTableName() == null ? tableName :
options.getHiveTableName();
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
} else {
Path destination = getContext().getDestination();
return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
}
}
private Schema generateAvroSchema(String tableName,
String schemaNameOverride) throws IOException {
ConnManager connManager = getContext().getConnManager();
@ -187,7 +159,7 @@ protected Class<? extends Mapper> getMapperClass() {
return AvroImportMapper.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
return ParquetImportMapper.class;
return parquetImportJobConfigurator.getMapperClass();
}
return null;
@ -210,7 +182,7 @@ protected Class<? extends OutputFormat> getOutputFormatClass()
return AvroOutputFormat.class;
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
return DatasetKeyOutputFormat.class;
return parquetImportJobConfigurator.getOutputFormatClass();
}
return null;

View File

@ -49,6 +49,8 @@
import java.sql.SQLException;
import java.util.Date;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
/**
* Base class for running an import MapReduce job.
* Allows dependency injection, etc, for easy customization of import job types.
@ -149,7 +151,7 @@ protected void configureOutputFormat(Job job, String tableName,
Configuration conf = job.getConfiguration();
String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
if (!shortName.equalsIgnoreCase("default")) {
conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
}
}
}

View File

@ -32,6 +32,7 @@
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ExportJobContext;
import com.google.common.base.Strings;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an export using JDBC (JDBC-based ExportCallOutputFormat) to
@ -43,15 +44,16 @@ public class JdbcCallExportJob extends JdbcExportJob {
public static final Log LOG = LogFactory.getLog(
JdbcCallExportJob.class.getName());
public JdbcCallExportJob(final ExportJobContext context) {
super(context, null, null, ExportCallOutputFormat.class);
public JdbcCallExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(context, null, null, ExportCallOutputFormat.class, parquetExportJobConfigurator);
}
public JdbcCallExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
final Class<? extends OutputFormat> outputFormatClass,
final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
}
/**

View File

@ -32,11 +32,10 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import java.io.IOException;
import java.util.Map;
import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
@ -45,18 +44,23 @@ public class JdbcExportJob extends ExportJobBase {
private FileType fileType;
private ParquetExportJobConfigurator parquetExportJobConfigurator;
public static final Log LOG = LogFactory.getLog(
JdbcExportJob.class.getName());
public JdbcExportJob(final ExportJobContext context) {
public JdbcExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(context);
this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
public JdbcExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
final Class<? extends OutputFormat> outputFormatClass,
final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
@Override
@ -78,8 +82,7 @@ protected void configureInputFormat(Job job, String tableName,
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
}
}
@ -120,7 +123,7 @@ protected Class<? extends InputFormat> getInputFormatClass()
case AVRO_DATA_FILE:
return AvroInputFormat.class;
case PARQUET_FILE:
return DatasetKeyInputFormat.class;
return parquetExportJobConfigurator.getInputFormatClass();
default:
return super.getInputFormatClass();
}
@ -137,7 +140,7 @@ protected Class<? extends Mapper> getMapperClass() {
case AVRO_DATA_FILE:
return AvroExportMapper.class;
case PARQUET_FILE:
return ParquetExportMapper.class;
return parquetExportJobConfigurator.getMapperClass();
case UNKNOWN:
default:
return TextExportMapper.class;

View File

@ -33,15 +33,13 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.ExportJobContext;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.sqoop.util.FileSystemUtil;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@ -53,6 +51,8 @@ public class JdbcUpdateExportJob extends ExportJobBase {
public static final Log LOG = LogFactory.getLog(
JdbcUpdateExportJob.class.getName());
private ParquetExportJobConfigurator parquetExportJobConfigurator;
/**
* Return an instance of the UpdateOutputFormat class object loaded
* from the shim jar.
@ -62,16 +62,19 @@ private static Class<? extends OutputFormat> getUpdateOutputFormat()
return UpdateOutputFormat.class;
}
public JdbcUpdateExportJob(final ExportJobContext context)
public JdbcUpdateExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator)
throws IOException {
super(context, null, null, getUpdateOutputFormat());
this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
public JdbcUpdateExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
final Class<? extends OutputFormat> outputFormatClass,
final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
this.parquetExportJobConfigurator = parquetExportJobConfigurator;
}
// Fix For Issue [SQOOP-2846]
@ -86,7 +89,7 @@ protected Class<? extends Mapper> getMapperClass() {
case AVRO_DATA_FILE:
return AvroExportMapper.class;
case PARQUET_FILE:
return ParquetExportMapper.class;
return parquetExportJobConfigurator.getMapperClass();
case UNKNOWN:
default:
return TextExportMapper.class;
@ -186,8 +189,7 @@ protected void configureInputFormat(Job job, String tableName, String tableClass
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
}
}
@ -222,7 +224,7 @@ protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoun
case AVRO_DATA_FILE:
return AvroInputFormat.class;
case PARQUET_FILE:
return DatasetKeyInputFormat.class;
return parquetExportJobConfigurator.getInputFormatClass();
default:
return super.getInputFormatClass();
}

View File

@ -30,6 +30,7 @@
import org.apache.sqoop.manager.ExportJobContext;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
* Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
@ -40,9 +41,10 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
JdbcUpsertExportJob.class.getName());
public JdbcUpsertExportJob(final ExportJobContext context,
final Class<? extends OutputFormat> outputFormatClass)
final Class<? extends OutputFormat> outputFormatClass,
final ParquetExportJobConfigurator parquetExportJobConfigurator)
throws IOException {
super(context, null, null, outputFormatClass);
super(context, null, null, outputFormatClass, parquetExportJobConfigurator);
}
@Override

View File

@ -19,18 +19,12 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -44,17 +38,8 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.apache.sqoop.util.Jars;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
import parquet.avro.AvroParquetInputFormat;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.JobBase;
@ -79,10 +64,11 @@ public class MergeJob extends JobBase {
*/
public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
private final ParquetMergeJobConfigurator parquetMergeJobConfigurator;
public MergeJob(final SqoopOptions opts) {
public MergeJob(final SqoopOptions opts, final ParquetMergeJobConfigurator parquetMergeJobConfigurator) {
super(opts, null, null, null);
this.parquetMergeJobConfigurator = parquetMergeJobConfigurator;
}
public boolean runMergeJob() throws IOException {
@ -147,7 +133,7 @@ public boolean runMergeJob() throws IOException {
case PARQUET_FILE:
Path finalPath = new Path(options.getTargetDir());
finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
parquetMergeJobConfigurator.configureParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
break;
case AVRO_DATA_FILE:
configueAvroMergeJob(conf, job, oldPath, newPath);
@ -198,51 +184,6 @@ private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Pat
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
Path finalPath) throws IOException {
try {
FileSystem fileSystem = finalPath.getFileSystem(conf);
LOG.info("Trying to merge parquet files");
job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
job.setMapperClass(MergeParquetMapper.class);
job.setReducerClass(MergeParquetReducer.class);
job.setOutputValueClass(NullWritable.class);
List<Footer> footers = new ArrayList<Footer>();
FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
Schema avroSchema = avroSchemaConverter.convert(schema);
if (!fileSystem.exists(finalPath)) {
Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
DatasetKeyOutputFormat.configure(job).overwrite(dataset);
} else {
DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
}
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
job.setOutputFormatClass(outClass);
} catch (Exception cnfe) {
throw new IOException(cnfe);
}
}
public static Dataset createDataset(Schema schema, String uri) {
DatasetDescriptor descriptor =
new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
return Datasets.create(uri, descriptor, GenericRecord.class);
}
}

View File

@ -27,16 +27,16 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.lib.SqoopRecord;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
public abstract class MergeParquetReducer<KEYOUT, VALUEOUT> extends Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
private Schema schema = null;
private boolean bigDecimalFormatString = true;
@ -44,7 +44,7 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
@Override
protected void setup(Context context) throws IOException, InterruptedException {
schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
schema = new Schema.Parser().parse(context.getConfiguration().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY));
bigDecimalFormatString = context.getConfiguration().getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
}
@ -67,9 +67,12 @@ public void reduce(Text key, Iterable<MergeRecord> vals, Context context)
}
if (null != bestRecord) {
GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
GenericRecord record = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
bigDecimalFormatString);
context.write(outKey, null);
write(context, record);
}
}
protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
}

View File

@ -23,10 +23,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.sqoop.avro.AvroUtil;
import java.io.IOException;
@ -35,9 +32,9 @@
/**
* Imports records by writing them to a Parquet File.
*/
public class ParquetImportMapper
public abstract class ParquetImportMapper<KEYOUT, VALOUT>
extends AutoProgressMapper<LongWritable, SqoopRecord,
GenericRecord, NullWritable> {
KEYOUT, VALOUT> {
private Schema schema = null;
private boolean bigDecimalFormatString = true;
@ -47,11 +44,11 @@ public class ParquetImportMapper
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
schema = ParquetJob.getAvroSchema(conf);
schema = getAvroSchema(conf);
bigDecimalFormatString = conf.getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
lobLoader = new LargeObjectLoader(conf, new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())));
lobLoader = createLobLoader(context);
}
@Override
@ -64,9 +61,9 @@ protected void map(LongWritable key, SqoopRecord val, Context context)
throw new IOException(sqlE);
}
GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
bigDecimalFormatString);
context.write(outKey, null);
write(context, record);
}
@Override
@ -76,4 +73,9 @@ protected void cleanup(Context context) throws IOException {
}
}
protected abstract LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException;
protected abstract Schema getAvroSchema(Configuration configuration);
protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
}

View File

@ -30,6 +30,7 @@
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
/**
* Import data from a mainframe dataset, using MainframeDatasetInputFormat.
@ -39,8 +40,8 @@ public class MainframeImportJob extends DataDrivenImportJob {
private static final Log LOG = LogFactory.getLog(
MainframeImportJob.class.getName());
public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) {
super(opts, MainframeDatasetInputFormat.class, context);
public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
super(opts, MainframeDatasetInputFormat.class, context, parquetImportJobConfigurator);
}
@Override

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
public final class ParquetConstants {
public static final String SQOOP_PARQUET_AVRO_SCHEMA_KEY = "parquetjob.avro.schema";
public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
private ParquetConstants() {
throw new AssertionError("This class is meant for static use only.");
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public interface ParquetExportJobConfigurator {
void configureInputFormat(Job job, Path inputPath) throws IOException;
Class<? extends Mapper> getMapperClass();
Class<? extends InputFormat> getInputFormatClass();
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.SqoopOptions;
import java.io.IOException;
public interface ParquetImportJobConfigurator {
void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;
Class<? extends Mapper> getMapperClass();
Class<? extends OutputFormat> getOutputFormatClass();
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
public interface ParquetJobConfiguratorFactory {
ParquetImportJobConfigurator createParquetImportJobConfigurator();
ParquetExportJobConfigurator createParquetExportJobConfigurator();
ParquetMergeJobConfigurator createParquetMergeJobConfigurator();
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
public final class ParquetJobConfiguratorFactoryProvider {
private ParquetJobConfiguratorFactoryProvider() {
throw new AssertionError("This class is meant for static use only.");
}
public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
return new KiteParquetJobConfiguratorFactory();
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public interface ParquetMergeJobConfigurator {
void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.mapreduce.MergeParquetReducer;
import java.io.IOException;
public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
@Override
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
context.write(record, null);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import java.io.IOException;
public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
@Override
public void configureInputFormat(Job job, Path inputPath) throws IOException {
String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
@Override
public Class<? extends Mapper> getMapperClass() {
return KiteParquetExportMapper.class;
}
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return DatasetKeyInputFormat.class;
}
}

View File

@ -16,27 +16,21 @@
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
import java.io.IOException;
/**
* Exports Parquet records from a data source.
*/
public class ParquetExportMapper
extends GenericRecordExportMapper<GenericRecord, NullWritable> {
public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void map(GenericRecord key, NullWritable val,
Context context) throws IOException, InterruptedException {
protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
context.write(toSqoopRecord(key), NullWritable.get());
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
import java.io.IOException;
public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
@Override
public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
JobConf conf = (JobConf) job.getConfiguration();
String uri = getKiteUri(conf, options, tableName, destination);
KiteParquetUtils.WriteMode writeMode;
if (options.doHiveImport()) {
if (options.doOverwriteHiveTable()) {
writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
} else {
writeMode = KiteParquetUtils.WriteMode.APPEND;
if (Datasets.exists(uri)) {
LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
"append data into the existing Hive table. Consider using " +
"--hive-overwrite, if you do NOT intend to do appending.");
}
}
} else {
// Note that there is no such an import argument for overwriting HDFS
// dataset, so overwrite mode is not supported yet.
// Sqoop's append mode means to merge two independent datasets. We
// choose DEFAULT as write mode.
writeMode = KiteParquetUtils.WriteMode.DEFAULT;
}
KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
}
@Override
public Class<? extends Mapper> getMapperClass() {
return KiteParquetImportMapper.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return DatasetKeyOutputFormat.class;
}
private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
if (options.doHiveImport()) {
String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
options.getHiveDatabaseName();
String hiveTable = options.getHiveTableName() == null ? tableName :
options.getHiveTableName();
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
} else {
return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.lib.LargeObjectLoader;
import org.apache.sqoop.mapreduce.ParquetImportMapper;
import java.io.IOException;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
@Override
protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
return new LargeObjectLoader(conf, workPath);
}
@Override
protected Schema getAvroSchema(Configuration configuration) {
String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
return AvroUtil.parseAvroSchema(schemaString);
}
@Override
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
context.write(record, null);
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
@Override
public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
return new KiteParquetImportJobConfigurator();
}
@Override
public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
return new KiteParquetExportJobConfigurator();
}
@Override
public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
return new KiteParquetMergeJobConfigurator();
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.mapreduce.MergeParquetMapper;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
import parquet.avro.AvroParquetInputFormat;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
@Override
public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
Path finalPath) throws IOException {
try {
FileSystem fileSystem = finalPath.getFileSystem(conf);
LOG.info("Trying to merge parquet files");
job.setOutputKeyClass(GenericRecord.class);
job.setMapperClass(MergeParquetMapper.class);
job.setReducerClass(KiteMergeParquetReducer.class);
job.setOutputValueClass(NullWritable.class);
List<Footer> footers = new ArrayList<Footer>();
FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
Schema avroSchema = avroSchemaConverter.convert(schema);
if (!fileSystem.exists(finalPath)) {
Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
DatasetKeyOutputFormat.configure(job).overwrite(dataset);
} else {
DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
}
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
job.setOutputFormatClass(outClass);
} catch (Exception cnfe) {
throw new IOException(cnfe);
}
}
public static Dataset createDataset(Schema schema, String uri) {
DatasetDescriptor descriptor =
new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
return Datasets.create(uri, descriptor, GenericRecord.class);
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
package org.apache.sqoop.mapreduce.parquet.kite;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@ -41,12 +41,15 @@
import java.io.IOException;
import java.lang.reflect.Method;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
/**
* Helper class for setting up a Parquet MapReduce job.
*/
public final class ParquetJob {
public final class KiteParquetUtils {
public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
@ -66,22 +69,16 @@ public final class ParquetJob {
private static final String HIVE_URI_PREFIX = "dataset:hive";
private ParquetJob() {
private KiteParquetUtils() {
}
private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
enum WriteMode {
public enum WriteMode {
DEFAULT, APPEND, OVERWRITE
};
public static Schema getAvroSchema(Configuration conf) {
return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
}
public static CompressionType getCompressionType(Configuration conf) {
CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
try {
return CompressionType.forName(codec);
} catch (IllegalArgumentException ex) {
@ -129,7 +126,7 @@ public static void configureImportJob(JobConf conf, Schema schema,
} else {
dataset = createDataset(schema, getCompressionType(conf), uri);
}
conf.set(CONF_AVRO_SCHEMA, schema.toString());
conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
DatasetKeyOutputFormat.ConfigBuilder builder =
DatasetKeyOutputFormat.configure(conf);

View File

@ -32,7 +32,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.mapreduce.JdbcExportJob;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
/**
@ -42,15 +42,16 @@ public class PostgreSQLCopyExportJob extends JdbcExportJob {
public static final Log LOG =
LogFactory.getLog(PostgreSQLCopyExportJob.class.getName());
public PostgreSQLCopyExportJob(final ExportJobContext context) {
super(context);
public PostgreSQLCopyExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(context, parquetExportJobConfigurator);
}
public PostgreSQLCopyExportJob(final ExportJobContext ctxt,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
final Class<? extends OutputFormat> outputFormatClass,
final ParquetExportJobConfigurator parquetExportJobConfigurator) {
super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
}
@Override

View File

@ -34,9 +34,12 @@
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.manager.SupportedManagers;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.password.CredentialProviderHelper;
@ -1904,4 +1907,8 @@ protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.Inva
}
}
public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration) {
return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration);
}
}

View File

@ -46,6 +46,7 @@
import org.apache.sqoop.hive.HiveClientFactory;
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.MergeJob;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.apache.sqoop.metastore.JobData;
import org.apache.sqoop.metastore.JobStorage;
import org.apache.sqoop.metastore.JobStorageFactory;
@ -472,7 +473,8 @@ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context)
loadJars(options.getConf(), context.getJarFile(), context.getTableName());
}
MergeJob mergeJob = new MergeJob(options);
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
if (mergeJob.runMergeJob()) {
// Rename destination directory to proper location.
Path tmpDir = getOutputPath(options, context.getTableName());

View File

@ -30,6 +30,7 @@
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.cli.ToolOptions;
import org.apache.sqoop.mapreduce.MergeJob;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.apache.sqoop.util.LoggingUtils;
/**
@ -52,7 +53,8 @@ public MergeTool(String toolName) {
public int run(SqoopOptions options) {
try {
// Configure and execute a MapReduce job to merge these datasets.
MergeJob mergeJob = new MergeJob(options);
ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
if (!mergeJob.runMergeJob()) {
LOG.error("MapReduce job failed!");
return 1;

View File

@ -32,10 +32,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Test;
import parquet.avro.AvroSchemaConverter;
import parquet.format.CompressionCodec;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -311,8 +313,9 @@ private ParquetMetadata getOutputMetadata() {
}
private Schema getSchema() {
String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
return new Schema.Parser().parse(schemaString);
MessageType parquetSchema = getOutputMetadata().getFileMetaData().getSchema();
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
return avroSchemaConverter.convert(parquetSchema);
}
private void checkField(Field field, String name, Type type) {

View File

@ -35,7 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.avro.AvroSchemaMismatchException;
import org.apache.sqoop.mapreduce.ParquetJob;
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
@ -404,7 +404,7 @@ public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throw
createTableWithColTypes(types, vals);
thrown.expect(AvroSchemaMismatchException.class);
thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
SqoopOptions sqoopOptions = getSqoopOptions(getConf());
sqoopOptions.setThrowOnError(true);
@ -422,7 +422,7 @@ private void createHiveDataSet(String tableName) {
.name(getColName(2)).type().nullable().stringType().noDefault()
.endRecord();
String dataSetUri = "dataset:hive:/default/" + tableName;
ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()), dataSetUri);
KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri);
}
/**

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
import org.junit.Test;
import org.apache.sqoop.SqoopOptions;
@ -117,7 +118,7 @@ private JdbcExportJob stubJdbcExportJob(SqoopOptions opts, final FileType inputF
when(mockConnManager.getColumnTypes(anyString(), anyString())).thenReturn(columnTypeInts);
when(mockConnManager.toJavaType(anyString(), anyString(), anyInt())).thenReturn("String");
when(mockContext.getConnManager()).thenReturn(mockConnManager);
JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext) {
JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext, mock(ParquetExportJobConfigurator.class)) {
@Override
protected FileType getInputFileType() {
return inputFileType;

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.mapreduce.mainframe;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -27,6 +28,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.junit.Before;
import org.junit.Test;
@ -56,7 +58,7 @@ public void testGetMainframeDatasetImportMapperClass()
Path path = new Path("dummyPath");
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, path);
mfImportJob = new MainframeImportJob(options, context);
mfImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
// To access protected method by means of reflection
Class[] types = {};
@ -79,7 +81,7 @@ public void testSuperMapperClass() throws SecurityException,
options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, path);
avroImportJob = new MainframeImportJob(options, context);
avroImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
// To access protected method by means of reflection
Class[] types = {};