diff --git a/ivy.xml b/ivy.xml index 1f587f3e..796ef70a 100644 --- a/ivy.xml +++ b/ivy.xml @@ -114,15 +114,7 @@ under the License. conf="common->default;redist->default"/> - - - - - - - + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 565a8bf5..c506ca82 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -20,8 +20,6 @@ avro.version=1.8.1 -kite-data.version=1.1.0 - checkstyle.version=5.0 commons-cli.version=1.2 @@ -62,3 +60,4 @@ hbase.version=1.2.4 hcatalog.version=1.2.1 jackson-databind.version=2.9.5 +parquet.version=1.6.0 diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt index af97d94b..d58c4d6e 100644 --- a/src/docs/user/hive-notes.txt +++ b/src/docs/user/hive-notes.txt @@ -28,11 +28,3 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to +STRING+ in Hive. The +NUMERIC+ and +DECIMAL+ SQL types will be coerced to +DOUBLE+. In these cases, Sqoop will emit a warning in its log messages informing you of the loss of precision. - -Parquet Support in Hive -~~~~~~~~~~~~~~~~~~~~~~~ - -When using the Kite Dataset API based Parquet implementation in order to contact the Hive MetaStore -from a MapReduce job, a delegation token will be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add -Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet -format may not work. diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index a2c16d95..79f71012 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -60,7 +60,7 @@ Argument Description +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) +\--as-parquetfile+ Imports data to Parquet Files -+\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported values: kite, hadoop. ++\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported value: hadoop. +\--boundary-query + Boundary query to use for creating splits +\--columns + Columns to import from table +\--delete-target-dir+ Delete the import target directory\ @@ -448,35 +448,14 @@ and Avro files. Parquet support +++++++++++++++ -Sqoop has two different implementations for importing data in Parquet format: +Sqoop has only one implementation now for importing data in Parquet format which is based on the Parquet Hadoop API. +Note that the legacy Kite Dataset API based implementation is removed so users have to make sure that both ++\--parquet-configurator-implementation+ option and +parquetjob.configurator.implementation+ property are unset or +set to "hadoop". -- Kite Dataset API based implementation (default, legacy) -- Parquet Hadoop API based implementation (recommended) - -The users can specify the desired implementation with the +\--parquet-configurator-implementation+ option: - ----- -$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation kite ----- - ----- -$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation hadoop ----- - -If the +\--parquet-configurator-implementation+ option is not present then Sqoop will check the value of +parquetjob.configurator.implementation+ -property (which can be specified using -D in the Sqoop command or in the site.xml). If that value is also absent Sqoop will -default to Kite Dataset API based implementation. - -The Kite Dataset API based implementation executes the import command on a different code -path than the text import: it creates the Hive table based on the generated Avro schema by connecting to the Hive metastore. -This can be a disadvantage since sometimes moving from the text file format to the Parquet file format can lead to many -unexpected behavioral changes. Kite checks the Hive table schema before importing the data into it so if the user wants -to import some data which has a schema incompatible with the Hive table's schema Sqoop will throw an error. This implementation -uses snappy codec for compression by default and apart from this it supports the bzip codec too. - -The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the -LOAD DATA INPATH command just like the text import does. Unlike Kite it also supports connecting to HiveServer2 (using the +\--hs2-url+ option) -so it provides better security features. This implementation does not check the Hive table's schema before importing so +The default Parquet import implementation builds the Hive CREATE TABLE statement and executes the +LOAD DATA INPATH command just like the text import does. It supports connecting to HiveServer2 (using the +\--hs2-url+ option) +but it does not check the Hive table's schema before importing so it is possible that the user can successfully import data into Hive but they get an error during a Hive read operation later. It does not use any compression by default but supports snappy and bzip codecs. @@ -487,6 +466,8 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-par --parquet-configurator-implementation hadoop --hs2-url "jdbc:hive2://hs2.foo.com:10000" --hs2-keytab "/path/to/keytab" ---- +Note that +\--parquet-configurator-implementation hadoop+ is now optional. + Enabling Logical Types in Avro and Parquet import for numbers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index cc1b7528..f97dbfdf 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -53,7 +53,7 @@ import org.apache.sqoop.util.StoredAsProperty; import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier; /** @@ -1161,7 +1161,7 @@ private void initDefaults(Configuration baseConfiguration) { // set escape column mapping to true this.escapeColumnMappingEnabled = true; - this.parquetConfiguratorImplementation = KITE; + this.parquetConfiguratorImplementation = HADOOP; } /** diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java index 050c8548..c6b576da 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java @@ -19,14 +19,13 @@ package org.apache.sqoop.mapreduce.parquet; import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory; /** * An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}. * The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects. */ public enum ParquetJobConfiguratorImplementation { - KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class); + HADOOP(HadoopParquetJobConfiguratorFactory.class); private Class configuratorFactoryClass; diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java deleted file mode 100644 index 02816d77..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.NullWritable; -import org.apache.sqoop.mapreduce.MergeParquetReducer; - -import java.io.IOException; - -/** - * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API. - */ -public class KiteMergeParquetReducer extends MergeParquetReducer { - - @Override - protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { - context.write(record, null); - } -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java deleted file mode 100644 index 6ebc5a31..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; -import org.apache.sqoop.util.FileSystemUtil; -import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; - -import java.io.IOException; - -/** - * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator { - - @Override - public void configureInputFormat(Job job, Path inputPath) throws IOException { - String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration()); - DatasetKeyInputFormat.configure(job).readFrom(uri); - } - - @Override - public Class getMapperClass() { - return KiteParquetExportMapper.class; - } - - @Override - public Class getInputFormatClass() { - return DatasetKeyInputFormat.class; - } -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java deleted file mode 100644 index 122ff3fc..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.NullWritable; -import org.apache.sqoop.mapreduce.GenericRecordExportMapper; - -import java.io.IOException; - -/** - * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API. - */ -public class KiteParquetExportMapper extends GenericRecordExportMapper { - - @Override - protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException { - context.write(toSqoopRecord(key), NullWritable.get()); - } - -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java deleted file mode 100644 index 7e179a27..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.SqoopOptions; -import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; -import org.apache.sqoop.util.FileSystemUtil; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; - -import java.io.IOException; - -/** - * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator { - - public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName()); - - @Override - public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException { - JobConf conf = (JobConf) job.getConfiguration(); - String uri = getKiteUri(conf, options, tableName, destination); - KiteParquetUtils.WriteMode writeMode; - - if (options.doHiveImport()) { - if (options.doOverwriteHiveTable()) { - writeMode = KiteParquetUtils.WriteMode.OVERWRITE; - } else { - writeMode = KiteParquetUtils.WriteMode.APPEND; - if (Datasets.exists(uri)) { - LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " + - "append data into the existing Hive table. Consider using " + - "--hive-overwrite, if you do NOT intend to do appending."); - } - } - } else { - // Note that there is no such an import argument for overwriting HDFS - // dataset, so overwrite mode is not supported yet. - // Sqoop's append mode means to merge two independent datasets. We - // choose DEFAULT as write mode. - writeMode = KiteParquetUtils.WriteMode.DEFAULT; - } - KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode); - } - - @Override - public Class getMapperClass() { - return KiteParquetImportMapper.class; - } - - @Override - public Class getOutputFormatClass() { - return DatasetKeyOutputFormat.class; - } - - @Override - public boolean isHiveImportNeeded() { - return false; - } - - private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException { - if (options.doHiveImport()) { - String hiveDatabase = options.getHiveDatabaseName() == null ? "default" : - options.getHiveDatabaseName(); - String hiveTable = options.getHiveTableName() == null ? tableName : - options.getHiveTableName(); - return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable); - } else { - return "dataset:" + FileSystemUtil.makeQualified(destination, conf); - } - } -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java deleted file mode 100644 index 0a91e4a2..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.sqoop.avro.AvroUtil; -import org.apache.sqoop.lib.LargeObjectLoader; -import org.apache.sqoop.mapreduce.ParquetImportMapper; - -import java.io.IOException; - -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; - -/** - * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API. - */ -public class KiteParquetImportMapper extends ParquetImportMapper { - - @Override - protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())); - return new LargeObjectLoader(conf, workPath); - } - - @Override - protected Schema getAvroSchema(Configuration configuration) { - String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY); - return AvroUtil.parseAvroSchema(schemaString); - } - - @Override - protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { - context.write(record, null); - } -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java deleted file mode 100644 index bd07c09f..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; -import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; -import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; - -/** - * A concrete factory implementation which produces configurator objects using the Kite Dataset API. - */ -public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory { - - @Override - public ParquetImportJobConfigurator createParquetImportJobConfigurator() { - return new KiteParquetImportJobConfigurator(); - } - - @Override - public ParquetExportJobConfigurator createParquetExportJobConfigurator() { - return new KiteParquetExportJobConfigurator(); - } - - @Override - public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() { - return new KiteParquetMergeJobConfigurator(); - } -} diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java deleted file mode 100644 index ed045cd1..00000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.sqoop.mapreduce.MergeParquetMapper; -import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; -import parquet.avro.AvroParquetInputFormat; -import parquet.avro.AvroSchemaConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; - -/** - * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator { - - public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName()); - - @Override - public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, - Path finalPath) throws IOException { - try { - FileSystem fileSystem = finalPath.getFileSystem(conf); - LOG.info("Trying to merge parquet files"); - job.setOutputKeyClass(GenericRecord.class); - job.setMapperClass(MergeParquetMapper.class); - job.setReducerClass(KiteMergeParquetReducer.class); - job.setOutputValueClass(NullWritable.class); - - List