diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java index 603cc631..57c20625 100644 --- a/src/java/org/apache/sqoop/avro/AvroUtil.java +++ b/src/java/org/apache/sqoop/avro/AvroUtil.java @@ -340,4 +340,8 @@ public static LogicalType createDecimalType(Integer precision, Integer scale, Co return LogicalTypes.decimal(precision, scale); } + + public static Schema parseAvroSchema(String schemaString) { + return new Schema.Parser().parse(schemaString); + } } diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index d7d6279a..c80dd5d9 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -45,6 +45,8 @@ import org.apache.sqoop.hive.HiveTypes; import org.apache.sqoop.lib.BlobRef; import org.apache.sqoop.lib.ClobRef; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider; import org.apache.sqoop.util.ExportException; import org.apache.sqoop.util.ImportException; @@ -866,5 +868,8 @@ public boolean isDirectModeAccumuloSupported() { return false; } + public ParquetJobConfiguratorFactory getParquetJobConfigurator() { + return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf()); + } } diff --git a/src/java/org/apache/sqoop/manager/CubridManager.java b/src/java/org/apache/sqoop/manager/CubridManager.java index e27f616c..a75268f3 100644 --- a/src/java/org/apache/sqoop/manager/CubridManager.java +++ b/src/java/org/apache/sqoop/manager/CubridManager.java @@ -65,7 +65,7 @@ public void exportTable( throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, null, null, - ExportBatchOutputFormat.class); + ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -80,7 +80,7 @@ public void upsertTable( context.setConnManager(this); JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context, - CubridUpsertOutputFormat.class); + CubridUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java index 7ff68ce0..c78946e6 100644 --- a/src/java/org/apache/sqoop/manager/Db2Manager.java +++ b/src/java/org/apache/sqoop/manager/Db2Manager.java @@ -111,7 +111,7 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, null, null, - ExportBatchOutputFormat.class); + ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index c05e1c19..70b9b43d 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -585,7 +585,8 @@ public void exportTable(ExportJobContext context) new PostgreSQLCopyExportJob(context, null, ExportInputFormat.class, - NullOutputFormat.class); + NullOutputFormat.class, + getParquetJobConfigurator().createParquetExportJobConfigurator()); job.runExport(); } } diff --git a/src/java/org/apache/sqoop/manager/MainframeManager.java b/src/java/org/apache/sqoop/manager/MainframeManager.java index a6002ef4..4e8be155 100644 --- a/src/java/org/apache/sqoop/manager/MainframeManager.java +++ b/src/java/org/apache/sqoop/manager/MainframeManager.java @@ -90,7 +90,7 @@ public void importTable(org.apache.sqoop.manager.ImportJobContext context) importer = new AccumuloImportJob(opts, context); } else { // Import to HDFS. - importer = new MainframeImportJob(opts, context); + importer = new MainframeImportJob(opts, context, getParquetJobConfigurator().createParquetImportJobConfigurator()); } importer.setInputFormatClass(MainframeDatasetInputFormat.class); diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java index 2d177071..992c4615 100644 --- a/src/java/org/apache/sqoop/manager/MySQLManager.java +++ b/src/java/org/apache/sqoop/manager/MySQLManager.java @@ -138,7 +138,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context) LOG.warn("documentation for additional limitations."); JdbcUpsertExportJob exportJob = - new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class); + new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java index b7005d46..cdc6c54c 100644 --- a/src/java/org/apache/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/sqoop/manager/OracleManager.java @@ -462,7 +462,7 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, - null, null, ExportBatchOutputFormat.class); + null, null, ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -474,7 +474,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcUpsertExportJob exportJob = - new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class); + new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index d57a4935..b136087f 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -181,10 +181,10 @@ public void exportTable(org.apache.sqoop.manager.ExportJobContext context) JdbcExportJob exportJob; if (isNonResilientOperation()) { exportJob = new JdbcExportJob(context, null, null, - SqlServerExportBatchOutputFormat.class); + SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); } else { exportJob = new JdbcExportJob(context, null, null, - SQLServerResilientExportOutputFormat.class); + SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); configureConnectionRecoveryForExport(context); } exportJob.runExport(); @@ -202,7 +202,7 @@ public void updateTable( } else { context.setConnManager(this); JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null, - null, SQLServerResilientUpdateOutputFormat.class); + null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); configureConnectionRecoveryForUpdate(context); exportJob.runExport(); } @@ -223,7 +223,7 @@ public void upsertTable(org.apache.sqoop.manager.ExportJobContext context) } JdbcUpsertExportJob exportJob = - new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class); + new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index 45720988..d82332ae 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -682,7 +682,7 @@ public void importTable(org.apache.sqoop.manager.ImportJobContext context) } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), - context); + context, getParquetJobConfigurator().createParquetImportJobConfigurator()); } checkTableImportOptions(context); @@ -725,7 +725,7 @@ public void importQuery(org.apache.sqoop.manager.ImportJobContext context) } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), - context); + context, getParquetJobConfigurator().createParquetImportJobConfigurator()); } String splitCol = getSplitColumn(opts, null); @@ -926,7 +926,7 @@ protected int getMetadataIsolationLevel() { public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); - JdbcExportJob exportJob = new JdbcExportJob(context); + JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -935,7 +935,7 @@ public void callTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); - JdbcCallExportJob exportJob = new JdbcCallExportJob(context); + JdbcCallExportJob exportJob = new JdbcCallExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -960,7 +960,7 @@ public void updateTable( org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); - JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context); + JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java index 10524e3a..95eaacf3 100644 --- a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java +++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java @@ -321,7 +321,7 @@ public void exportTable(ExportJobContext context) throws IOException, throw ex; } JdbcExportJob exportJob = - new JdbcExportJob(context, null, null, oraOopOutputFormatClass); + new JdbcExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } @@ -343,7 +343,7 @@ public void updateTable(ExportJobContext context) throws IOException, } JdbcUpdateExportJob exportJob = - new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass); + new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); } diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index a5962ba4..3b542102 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -26,8 +26,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -48,10 +46,8 @@ import org.apache.sqoop.mapreduce.ImportJobBase; import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat; +import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; import org.apache.sqoop.orm.AvroSchemaGenerator; -import org.apache.sqoop.util.FileSystemUtil; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; /** * Actually runs a jdbc import job using the ORM files generated by the @@ -62,15 +58,24 @@ public class DataDrivenImportJob extends ImportJobBase { public static final Log LOG = LogFactory.getLog( DataDrivenImportJob.class.getName()); - @SuppressWarnings("unchecked") - public DataDrivenImportJob(final SqoopOptions opts) { - super(opts, null, DataDrivenDBInputFormat.class, null, null); + private final ParquetImportJobConfigurator parquetImportJobConfigurator; + + public DataDrivenImportJob(final SqoopOptions opts, + final Class inputFormatClass, + ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) { + super(opts, null, inputFormatClass, null, context); + this.parquetImportJobConfigurator = parquetImportJobConfigurator; } public DataDrivenImportJob(final SqoopOptions opts, final Class inputFormatClass, ImportJobContext context) { - super(opts, null, inputFormatClass, null, context); + this(opts, inputFormatClass, context, null); + } + + @SuppressWarnings("unchecked") + public DataDrivenImportJob(final SqoopOptions opts) { + this(opts, DataDrivenDBInputFormat.class, null); } @Override @@ -101,53 +106,20 @@ protected void configureMapper(Job job, String tableName, AvroJob.setMapOutputSchema(job.getConfiguration(), schema); } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { - JobConf conf = (JobConf)job.getConfiguration(); // Kite SDK requires an Avro schema to represent the data structure of // target dataset. If the schema name equals to generated java class name, // the import will fail. So we use table name as schema name and add a // prefix "codegen_" to generated java class to avoid the conflict. final String schemaNameOverride = tableName; Schema schema = generateAvroSchema(tableName, schemaNameOverride); - String uri = getKiteUri(conf, tableName); - ParquetJob.WriteMode writeMode; + Path destination = getContext().getDestination(); - if (options.doHiveImport()) { - if (options.doOverwriteHiveTable()) { - writeMode = ParquetJob.WriteMode.OVERWRITE; - } else { - writeMode = ParquetJob.WriteMode.APPEND; - if (Datasets.exists(uri)) { - LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " + - "append data into the existing Hive table. Consider using " + - "--hive-overwrite, if you do NOT intend to do appending."); - } - } - } else { - // Note that there is no such an import argument for overwriting HDFS - // dataset, so overwrite mode is not supported yet. - // Sqoop's append mode means to merge two independent datasets. We - // choose DEFAULT as write mode. - writeMode = ParquetJob.WriteMode.DEFAULT; - } - ParquetJob.configureImportJob(conf, schema, uri, writeMode); + parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination); } job.setMapperClass(getMapperClass()); } - private String getKiteUri(Configuration conf, String tableName) throws IOException { - if (options.doHiveImport()) { - String hiveDatabase = options.getHiveDatabaseName() == null ? "default" : - options.getHiveDatabaseName(); - String hiveTable = options.getHiveTableName() == null ? tableName : - options.getHiveTableName(); - return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable); - } else { - Path destination = getContext().getDestination(); - return "dataset:" + FileSystemUtil.makeQualified(destination, conf); - } - } - private Schema generateAvroSchema(String tableName, String schemaNameOverride) throws IOException { ConnManager connManager = getContext().getConnManager(); @@ -187,7 +159,7 @@ protected Class getMapperClass() { return AvroImportMapper.class; } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { - return ParquetImportMapper.class; + return parquetImportJobConfigurator.getMapperClass(); } return null; @@ -210,7 +182,7 @@ protected Class getOutputFormatClass() return AvroOutputFormat.class; } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { - return DatasetKeyOutputFormat.class; + return parquetImportJobConfigurator.getOutputFormatClass(); } return null; diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index fb5d0541..17c9ed39 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -49,6 +49,8 @@ import java.sql.SQLException; import java.util.Date; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY; + /** * Base class for running an import MapReduce job. * Allows dependency injection, etc, for easy customization of import job types. @@ -149,7 +151,7 @@ protected void configureOutputFormat(Job job, String tableName, Configuration conf = job.getConfiguration(); String shortName = CodecMap.getCodecShortNameByName(codecName, conf); if (!shortName.equalsIgnoreCase("default")) { - conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName); + conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName); } } } diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java index b7eea936..be82aeda 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java @@ -32,6 +32,7 @@ import org.apache.sqoop.manager.ConnManager; import org.apache.sqoop.manager.ExportJobContext; import com.google.common.base.Strings; +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; /** * Run an export using JDBC (JDBC-based ExportCallOutputFormat) to @@ -43,15 +44,16 @@ public class JdbcCallExportJob extends JdbcExportJob { public static final Log LOG = LogFactory.getLog( JdbcCallExportJob.class.getName()); - public JdbcCallExportJob(final ExportJobContext context) { - super(context, null, null, ExportCallOutputFormat.class); + public JdbcCallExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) { + super(context, null, null, ExportCallOutputFormat.class, parquetExportJobConfigurator); } public JdbcCallExportJob(final ExportJobContext ctxt, final Class mapperClass, final Class inputFormatClass, - final Class outputFormatClass) { - super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + final Class outputFormatClass, + final ParquetExportJobConfigurator parquetExportJobConfigurator) { + super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator); } /** diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index 37198363..e283548e 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -32,11 +32,10 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; -import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; import java.io.IOException; import java.util.Map; -import org.apache.sqoop.util.FileSystemUtil; /** * Run an export using JDBC (JDBC-based ExportOutputFormat). @@ -45,18 +44,23 @@ public class JdbcExportJob extends ExportJobBase { private FileType fileType; + private ParquetExportJobConfigurator parquetExportJobConfigurator; + public static final Log LOG = LogFactory.getLog( JdbcExportJob.class.getName()); - public JdbcExportJob(final ExportJobContext context) { + public JdbcExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) { super(context); + this.parquetExportJobConfigurator = parquetExportJobConfigurator; } public JdbcExportJob(final ExportJobContext ctxt, final Class mapperClass, final Class inputFormatClass, - final Class outputFormatClass) { + final Class outputFormatClass, + final ParquetExportJobConfigurator parquetExportJobConfigurator) { super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + this.parquetExportJobConfigurator = parquetExportJobConfigurator; } @Override @@ -78,8 +82,7 @@ protected void configureInputFormat(Job job, String tableName, } else if (fileType == FileType.PARQUET_FILE) { LOG.debug("Configuring for Parquet export"); configureGenericRecordExportInputFormat(job, tableName); - String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration()); - DatasetKeyInputFormat.configure(job).readFrom(uri); + parquetExportJobConfigurator.configureInputFormat(job, getInputPath()); } } @@ -120,7 +123,7 @@ protected Class getInputFormatClass() case AVRO_DATA_FILE: return AvroInputFormat.class; case PARQUET_FILE: - return DatasetKeyInputFormat.class; + return parquetExportJobConfigurator.getInputFormatClass(); default: return super.getInputFormatClass(); } @@ -137,7 +140,7 @@ protected Class getMapperClass() { case AVRO_DATA_FILE: return AvroExportMapper.class; case PARQUET_FILE: - return ParquetExportMapper.class; + return parquetExportJobConfigurator.getMapperClass(); case UNKNOWN: default: return TextExportMapper.class; diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index 86069c46..f901d378 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -33,15 +33,13 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.mapreduce.ExportJobBase.FileType; import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities; -import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; import org.apache.sqoop.manager.ConnManager; import org.apache.sqoop.manager.ExportJobContext; import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.apache.sqoop.mapreduce.db.DBOutputFormat; -import org.apache.sqoop.util.FileSystemUtil; +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; /** * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat). @@ -53,6 +51,8 @@ public class JdbcUpdateExportJob extends ExportJobBase { public static final Log LOG = LogFactory.getLog( JdbcUpdateExportJob.class.getName()); + private ParquetExportJobConfigurator parquetExportJobConfigurator; + /** * Return an instance of the UpdateOutputFormat class object loaded * from the shim jar. @@ -62,16 +62,19 @@ private static Class getUpdateOutputFormat() return UpdateOutputFormat.class; } - public JdbcUpdateExportJob(final ExportJobContext context) + public JdbcUpdateExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) throws IOException { super(context, null, null, getUpdateOutputFormat()); + this.parquetExportJobConfigurator = parquetExportJobConfigurator; } public JdbcUpdateExportJob(final ExportJobContext ctxt, final Class mapperClass, final Class inputFormatClass, - final Class outputFormatClass) { + final Class outputFormatClass, + final ParquetExportJobConfigurator parquetExportJobConfigurator) { super(ctxt, mapperClass, inputFormatClass, outputFormatClass); + this.parquetExportJobConfigurator = parquetExportJobConfigurator; } // Fix For Issue [SQOOP-2846] @@ -86,7 +89,7 @@ protected Class getMapperClass() { case AVRO_DATA_FILE: return AvroExportMapper.class; case PARQUET_FILE: - return ParquetExportMapper.class; + return parquetExportJobConfigurator.getMapperClass(); case UNKNOWN: default: return TextExportMapper.class; @@ -186,8 +189,7 @@ protected void configureInputFormat(Job job, String tableName, String tableClass } else if (fileType == FileType.PARQUET_FILE) { LOG.debug("Configuring for Parquet export"); configureGenericRecordExportInputFormat(job, tableName); - String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration()); - DatasetKeyInputFormat.configure(job).readFrom(uri); + parquetExportJobConfigurator.configureInputFormat(job, getInputPath()); } } @@ -222,7 +224,7 @@ protected Class getInputFormatClass() throws ClassNotFoun case AVRO_DATA_FILE: return AvroInputFormat.class; case PARQUET_FILE: - return DatasetKeyInputFormat.class; + return parquetExportJobConfigurator.getInputFormatClass(); default: return super.getInputFormatClass(); } diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java index 9a8c17a9..4db86da6 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java @@ -30,6 +30,7 @@ import org.apache.sqoop.manager.ExportJobContext; import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.apache.sqoop.mapreduce.db.DBOutputFormat; +import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; /** * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat). @@ -40,9 +41,10 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob { JdbcUpsertExportJob.class.getName()); public JdbcUpsertExportJob(final ExportJobContext context, - final Class outputFormatClass) + final Class outputFormatClass, + final ParquetExportJobConfigurator parquetExportJobConfigurator) throws IOException { - super(context, null, null, outputFormatClass); + super(context, null, null, outputFormatClass, parquetExportJobConfigurator); } @Override diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java index bb21b64d..c26a0901 100644 --- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java +++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java @@ -19,18 +19,12 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.FsInput; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.LocatedFileStatus; @@ -44,17 +38,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.sqoop.avro.AvroUtil; import org.apache.sqoop.mapreduce.ExportJobBase.FileType; +import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; import org.apache.sqoop.util.Jars; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; -import parquet.avro.AvroParquetInputFormat; -import parquet.avro.AvroSchemaConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.schema.MessageType; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.mapreduce.JobBase; @@ -79,10 +64,11 @@ public class MergeJob extends JobBase { */ public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class"; - public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema"; + private final ParquetMergeJobConfigurator parquetMergeJobConfigurator; - public MergeJob(final SqoopOptions opts) { + public MergeJob(final SqoopOptions opts, final ParquetMergeJobConfigurator parquetMergeJobConfigurator) { super(opts, null, null, null); + this.parquetMergeJobConfigurator = parquetMergeJobConfigurator; } public boolean runMergeJob() throws IOException { @@ -147,7 +133,7 @@ public boolean runMergeJob() throws IOException { case PARQUET_FILE: Path finalPath = new Path(options.getTargetDir()); finalPath = FileSystemUtil.makeQualified(finalPath, jobConf); - configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath); + parquetMergeJobConfigurator.configureParquetMergeJob(jobConf, job, oldPath, newPath, finalPath); break; case AVRO_DATA_FILE: configueAvroMergeJob(conf, job, oldPath, newPath); @@ -198,51 +184,6 @@ private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Pat job.setReducerClass(MergeAvroReducer.class); AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema); } - - private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, - Path finalPath) throws IOException { - try { - FileSystem fileSystem = finalPath.getFileSystem(conf); - LOG.info("Trying to merge parquet files"); - job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class); - job.setMapperClass(MergeParquetMapper.class); - job.setReducerClass(MergeParquetReducer.class); - job.setOutputValueClass(NullWritable.class); - - List