mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 09:50:54 +08:00
SQOOP-3329: Remove Kite dependency from the Sqoop project
(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
parent
17461e91db
commit
739bbce485
10
ivy.xml
10
ivy.xml
@ -114,15 +114,7 @@ under the License.
|
||||
conf="common->default;redist->default"/>
|
||||
<dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
|
||||
conf="common->default;redist->default"/>
|
||||
<dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data.version}"
|
||||
conf="common->default;redist->default">
|
||||
<exclude org="org.apache.avro" module="avro" />
|
||||
</dependency>
|
||||
<dependency org="org.kitesdk" name="kite-data-hive" rev="${kite-data.version}"
|
||||
conf="common->default;redist->default">
|
||||
<exclude org="com.twitter" module="parquet-hive-bundle"/>
|
||||
<exclude org="org.apache.avro" module="avro" />
|
||||
</dependency>
|
||||
<dependency org="com.twitter" name="parquet-avro" rev="${parquet.version}" conf="common->default;redist->default"/>
|
||||
|
||||
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="${jackson-databind.version}"
|
||||
conf="common->default;redist->default" />
|
||||
|
@ -20,8 +20,6 @@
|
||||
|
||||
avro.version=1.8.1
|
||||
|
||||
kite-data.version=1.1.0
|
||||
|
||||
checkstyle.version=5.0
|
||||
|
||||
commons-cli.version=1.2
|
||||
@ -62,3 +60,4 @@ hbase.version=1.2.4
|
||||
hcatalog.version=1.2.1
|
||||
|
||||
jackson-databind.version=2.9.5
|
||||
parquet.version=1.6.0
|
||||
|
@ -28,11 +28,3 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to
|
||||
+STRING+ in Hive. The +NUMERIC+ and +DECIMAL+ SQL types will be coerced to
|
||||
+DOUBLE+. In these cases, Sqoop will emit a warning in its log messages
|
||||
informing you of the loss of precision.
|
||||
|
||||
Parquet Support in Hive
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
When using the Kite Dataset API based Parquet implementation in order to contact the Hive MetaStore
|
||||
from a MapReduce job, a delegation token will be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add
|
||||
Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet
|
||||
format may not work.
|
||||
|
@ -60,7 +60,7 @@ Argument Description
|
||||
+\--as-sequencefile+ Imports data to SequenceFiles
|
||||
+\--as-textfile+ Imports data as plain text (default)
|
||||
+\--as-parquetfile+ Imports data to Parquet Files
|
||||
+\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported values: kite, hadoop.
|
||||
+\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported value: hadoop.
|
||||
+\--boundary-query <statement>+ Boundary query to use for creating splits
|
||||
+\--columns <col,col,col...>+ Columns to import from table
|
||||
+\--delete-target-dir+ Delete the import target directory\
|
||||
@ -448,35 +448,14 @@ and Avro files.
|
||||
Parquet support
|
||||
+++++++++++++++
|
||||
|
||||
Sqoop has two different implementations for importing data in Parquet format:
|
||||
Sqoop has only one implementation now for importing data in Parquet format which is based on the Parquet Hadoop API.
|
||||
Note that the legacy Kite Dataset API based implementation is removed so users have to make sure that both
|
||||
+\--parquet-configurator-implementation+ option and +parquetjob.configurator.implementation+ property are unset or
|
||||
set to "hadoop".
|
||||
|
||||
- Kite Dataset API based implementation (default, legacy)
|
||||
- Parquet Hadoop API based implementation (recommended)
|
||||
|
||||
The users can specify the desired implementation with the +\--parquet-configurator-implementation+ option:
|
||||
|
||||
----
|
||||
$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation kite
|
||||
----
|
||||
|
||||
----
|
||||
$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation hadoop
|
||||
----
|
||||
|
||||
If the +\--parquet-configurator-implementation+ option is not present then Sqoop will check the value of +parquetjob.configurator.implementation+
|
||||
property (which can be specified using -D in the Sqoop command or in the site.xml). If that value is also absent Sqoop will
|
||||
default to Kite Dataset API based implementation.
|
||||
|
||||
The Kite Dataset API based implementation executes the import command on a different code
|
||||
path than the text import: it creates the Hive table based on the generated Avro schema by connecting to the Hive metastore.
|
||||
This can be a disadvantage since sometimes moving from the text file format to the Parquet file format can lead to many
|
||||
unexpected behavioral changes. Kite checks the Hive table schema before importing the data into it so if the user wants
|
||||
to import some data which has a schema incompatible with the Hive table's schema Sqoop will throw an error. This implementation
|
||||
uses snappy codec for compression by default and apart from this it supports the bzip codec too.
|
||||
|
||||
The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the
|
||||
LOAD DATA INPATH command just like the text import does. Unlike Kite it also supports connecting to HiveServer2 (using the +\--hs2-url+ option)
|
||||
so it provides better security features. This implementation does not check the Hive table's schema before importing so
|
||||
The default Parquet import implementation builds the Hive CREATE TABLE statement and executes the
|
||||
LOAD DATA INPATH command just like the text import does. It supports connecting to HiveServer2 (using the +\--hs2-url+ option)
|
||||
but it does not check the Hive table's schema before importing so
|
||||
it is possible that the user can successfully import data into Hive but they get an error during a Hive read operation later.
|
||||
It does not use any compression by default but supports snappy and bzip codecs.
|
||||
|
||||
@ -487,6 +466,8 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-par
|
||||
--parquet-configurator-implementation hadoop --hs2-url "jdbc:hive2://hs2.foo.com:10000" --hs2-keytab "/path/to/keytab"
|
||||
----
|
||||
|
||||
Note that +\--parquet-configurator-implementation hadoop+ is now optional.
|
||||
|
||||
Enabling Logical Types in Avro and Parquet import for numbers
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
@ -53,7 +53,7 @@
|
||||
import org.apache.sqoop.util.StoredAsProperty;
|
||||
|
||||
import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
|
||||
import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
|
||||
|
||||
/**
|
||||
@ -1161,7 +1161,7 @@ private void initDefaults(Configuration baseConfiguration) {
|
||||
// set escape column mapping to true
|
||||
this.escapeColumnMappingEnabled = true;
|
||||
|
||||
this.parquetConfiguratorImplementation = KITE;
|
||||
this.parquetConfiguratorImplementation = HADOOP;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,14 +19,13 @@
|
||||
package org.apache.sqoop.mapreduce.parquet;
|
||||
|
||||
import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory;
|
||||
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
|
||||
|
||||
/**
|
||||
* An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}.
|
||||
* The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects.
|
||||
*/
|
||||
public enum ParquetJobConfiguratorImplementation {
|
||||
KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class);
|
||||
HADOOP(HadoopParquetJobConfiguratorFactory.class);
|
||||
|
||||
private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass;
|
||||
|
||||
|
@ -1,36 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.sqoop.mapreduce.MergeParquetReducer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
|
||||
|
||||
@Override
|
||||
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
|
||||
context.write(record, null);
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
|
||||
import org.apache.sqoop.util.FileSystemUtil;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
|
||||
|
||||
@Override
|
||||
public void configureInputFormat(Job job, Path inputPath) throws IOException {
|
||||
String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
|
||||
DatasetKeyInputFormat.configure(job).readFrom(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Mapper> getMapperClass() {
|
||||
return KiteParquetExportMapper.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends InputFormat> getInputFormatClass() {
|
||||
return DatasetKeyInputFormat.class;
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
|
||||
|
||||
@Override
|
||||
protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
|
||||
context.write(toSqoopRecord(key), NullWritable.get());
|
||||
}
|
||||
|
||||
}
|
@ -1,98 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.sqoop.SqoopOptions;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
|
||||
import org.apache.sqoop.util.FileSystemUtil;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
|
||||
|
||||
@Override
|
||||
public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
|
||||
JobConf conf = (JobConf) job.getConfiguration();
|
||||
String uri = getKiteUri(conf, options, tableName, destination);
|
||||
KiteParquetUtils.WriteMode writeMode;
|
||||
|
||||
if (options.doHiveImport()) {
|
||||
if (options.doOverwriteHiveTable()) {
|
||||
writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
|
||||
} else {
|
||||
writeMode = KiteParquetUtils.WriteMode.APPEND;
|
||||
if (Datasets.exists(uri)) {
|
||||
LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
|
||||
"append data into the existing Hive table. Consider using " +
|
||||
"--hive-overwrite, if you do NOT intend to do appending.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Note that there is no such an import argument for overwriting HDFS
|
||||
// dataset, so overwrite mode is not supported yet.
|
||||
// Sqoop's append mode means to merge two independent datasets. We
|
||||
// choose DEFAULT as write mode.
|
||||
writeMode = KiteParquetUtils.WriteMode.DEFAULT;
|
||||
}
|
||||
KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Mapper> getMapperClass() {
|
||||
return KiteParquetImportMapper.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends OutputFormat> getOutputFormatClass() {
|
||||
return DatasetKeyOutputFormat.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHiveImportNeeded() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
|
||||
if (options.doHiveImport()) {
|
||||
String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
|
||||
options.getHiveDatabaseName();
|
||||
String hiveTable = options.getHiveTableName() == null ? tableName :
|
||||
options.getHiveTableName();
|
||||
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
|
||||
} else {
|
||||
return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.avro.AvroUtil;
|
||||
import org.apache.sqoop.lib.LargeObjectLoader;
|
||||
import org.apache.sqoop.mapreduce.ParquetImportMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
|
||||
|
||||
/**
|
||||
* An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
|
||||
|
||||
@Override
|
||||
protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
|
||||
return new LargeObjectLoader(conf, workPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema getAvroSchema(Configuration configuration) {
|
||||
String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
|
||||
return AvroUtil.parseAvroSchema(schemaString);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
|
||||
context.write(record, null);
|
||||
}
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
|
||||
|
||||
/**
|
||||
* A concrete factory implementation which produces configurator objects using the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
|
||||
|
||||
@Override
|
||||
public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
|
||||
return new KiteParquetImportJobConfigurator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
|
||||
return new KiteParquetExportJobConfigurator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
|
||||
return new KiteParquetMergeJobConfigurator();
|
||||
}
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.sqoop.mapreduce.MergeParquetMapper;
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.Formats;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
|
||||
import parquet.avro.AvroParquetInputFormat;
|
||||
import parquet.avro.AvroSchemaConverter;
|
||||
import parquet.hadoop.Footer;
|
||||
import parquet.hadoop.ParquetFileReader;
|
||||
import parquet.schema.MessageType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
|
||||
|
||||
/**
|
||||
* An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API.
|
||||
*/
|
||||
public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
|
||||
|
||||
@Override
|
||||
public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
|
||||
Path finalPath) throws IOException {
|
||||
try {
|
||||
FileSystem fileSystem = finalPath.getFileSystem(conf);
|
||||
LOG.info("Trying to merge parquet files");
|
||||
job.setOutputKeyClass(GenericRecord.class);
|
||||
job.setMapperClass(MergeParquetMapper.class);
|
||||
job.setReducerClass(KiteMergeParquetReducer.class);
|
||||
job.setOutputValueClass(NullWritable.class);
|
||||
|
||||
List<Footer> footers = new ArrayList<Footer>();
|
||||
FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
|
||||
FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
|
||||
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
|
||||
footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
|
||||
|
||||
MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
|
||||
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
|
||||
Schema avroSchema = avroSchemaConverter.convert(schema);
|
||||
|
||||
if (!fileSystem.exists(finalPath)) {
|
||||
Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
|
||||
DatasetKeyOutputFormat.configure(job).overwrite(dataset);
|
||||
} else {
|
||||
DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
|
||||
}
|
||||
|
||||
job.setInputFormatClass(AvroParquetInputFormat.class);
|
||||
AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
|
||||
|
||||
conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
|
||||
Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
|
||||
|
||||
job.setOutputFormatClass(outClass);
|
||||
} catch (Exception cnfe) {
|
||||
throw new IOException(cnfe);
|
||||
}
|
||||
}
|
||||
|
||||
public static Dataset createDataset(Schema schema, String uri) {
|
||||
DatasetDescriptor descriptor =
|
||||
new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
|
||||
return Datasets.create(uri, descriptor, GenericRecord.class);
|
||||
}
|
||||
}
|
@ -1,217 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.parquet.kite;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.sqoop.avro.AvroSchemaMismatchException;
|
||||
import org.apache.sqoop.hive.HiveConfig;
|
||||
import org.kitesdk.data.CompressionType;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.Formats;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
|
||||
import org.kitesdk.data.spi.SchemaValidationUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
|
||||
|
||||
/**
|
||||
* Helper class using the Kite Dataset API for setting up a Parquet MapReduce job.
|
||||
*/
|
||||
public final class KiteParquetUtils {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
|
||||
|
||||
public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
|
||||
|
||||
public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
|
||||
// Purposefully choosing the same token alias as the one Oozie chooses.
|
||||
// Make sure we don't generate a new delegation token if oozie
|
||||
// has already generated one.
|
||||
public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
|
||||
|
||||
public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
|
||||
|
||||
public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
|
||||
"Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
|
||||
" but it is possible that date/timestamp types were mapped to strings during table" +
|
||||
" creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
|
||||
" (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
|
||||
|
||||
private static final String HIVE_URI_PREFIX = "dataset:hive";
|
||||
|
||||
private KiteParquetUtils() {
|
||||
}
|
||||
|
||||
public enum WriteMode {
|
||||
DEFAULT, APPEND, OVERWRITE
|
||||
};
|
||||
|
||||
public static CompressionType getCompressionType(Configuration conf) {
|
||||
CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
|
||||
String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
|
||||
try {
|
||||
return CompressionType.forName(codec);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
LOG.warn(String.format(
|
||||
"Unsupported compression type '%s'. Fallback to '%s'.",
|
||||
codec, defaults));
|
||||
}
|
||||
return defaults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the import job. The import process will use a Kite dataset to
|
||||
* write data records into Parquet format internally. The input key class is
|
||||
* {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
|
||||
* {@link org.apache.avro.generic.GenericRecord}.
|
||||
*/
|
||||
public static void configureImportJob(JobConf conf, Schema schema,
|
||||
String uri, WriteMode writeMode) throws IOException {
|
||||
Dataset dataset;
|
||||
|
||||
// Add hive delegation token only if we don't already have one.
|
||||
if (isHiveImport(uri)) {
|
||||
Configuration hiveConf = HiveConfig.getHiveConf(conf);
|
||||
if (isSecureMetastore(hiveConf)) {
|
||||
// Copy hive configs to job config
|
||||
HiveConfig.addHiveConfigs(hiveConf, conf);
|
||||
|
||||
if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
|
||||
addHiveDelegationToken(conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (Datasets.exists(uri)) {
|
||||
if (WriteMode.DEFAULT.equals(writeMode)) {
|
||||
throw new IOException("Destination exists! " + uri);
|
||||
}
|
||||
|
||||
dataset = Datasets.load(uri);
|
||||
Schema writtenWith = dataset.getDescriptor().getSchema();
|
||||
if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
|
||||
String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
|
||||
throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
|
||||
}
|
||||
} else {
|
||||
dataset = createDataset(schema, getCompressionType(conf), uri);
|
||||
}
|
||||
conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
|
||||
|
||||
DatasetKeyOutputFormat.ConfigBuilder builder =
|
||||
DatasetKeyOutputFormat.configure(conf);
|
||||
if (WriteMode.OVERWRITE.equals(writeMode)) {
|
||||
builder.overwrite(dataset);
|
||||
} else if (WriteMode.APPEND.equals(writeMode)) {
|
||||
builder.appendTo(dataset);
|
||||
} else {
|
||||
builder.writeTo(dataset);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isHiveImport(String importUri) {
|
||||
return importUri.startsWith(HIVE_URI_PREFIX);
|
||||
}
|
||||
|
||||
public static Dataset createDataset(Schema schema,
|
||||
CompressionType compressionType, String uri) {
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(schema)
|
||||
.format(Formats.PARQUET)
|
||||
.compressionType(compressionType)
|
||||
.build();
|
||||
return Datasets.create(uri, descriptor, GenericRecord.class);
|
||||
}
|
||||
|
||||
private static boolean isSecureMetastore(Configuration conf) {
|
||||
return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add hive delegation token to credentials store.
|
||||
* @param conf
|
||||
*/
|
||||
private static void addHiveDelegationToken(JobConf conf) {
|
||||
// Need to use reflection since there's no compile time dependency on the client libs.
|
||||
Class<?> HiveConfClass;
|
||||
Class<?> HiveMetaStoreClientClass;
|
||||
|
||||
try {
|
||||
HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
|
||||
} catch (ClassNotFoundException ex) {
|
||||
LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
|
||||
+ " when adding hive delegation token. "
|
||||
+ "Make sure HIVE_CONF_DIR is set correctly.", ex);
|
||||
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
||||
}
|
||||
|
||||
try {
|
||||
HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
|
||||
} catch (ClassNotFoundException ex) {
|
||||
LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
|
||||
+ " when adding hive delegation token."
|
||||
+ " Make sure HIVE_CONF_DIR is set correctly.", ex);
|
||||
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
||||
}
|
||||
|
||||
try {
|
||||
Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
|
||||
HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
|
||||
);
|
||||
// getDelegationToken(String kerberosPrincial)
|
||||
Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
|
||||
Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
|
||||
|
||||
// Load token
|
||||
Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
|
||||
metastoreToken.decodeFromUrlString(tokenStringForm.toString());
|
||||
conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
|
||||
|
||||
LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Couldn't fetch delegation token.", ex);
|
||||
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
|
||||
String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
|
||||
|
||||
if (hiveImport) {
|
||||
exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
|
||||
}
|
||||
|
||||
return exceptionMessage;
|
||||
}
|
||||
|
||||
}
|
@ -21,7 +21,6 @@
|
||||
import static java.lang.String.format;
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
|
||||
|
||||
import java.io.File;
|
||||
@ -1587,15 +1586,6 @@ protected void validateHiveOptions(SqoopOptions options)
|
||||
+ "importing into SequenceFile format.");
|
||||
}
|
||||
|
||||
// Hive import and create hive table not compatible for ParquetFile format when using Kite
|
||||
if (options.doHiveImport()
|
||||
&& options.doFailIfHiveTableExists()
|
||||
&& options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile
|
||||
&& options.getParquetConfiguratorImplementation() == KITE) {
|
||||
throw new InvalidOptionsException("Hive import and create hive table is not compatible with "
|
||||
+ "importing into ParquetFile format using Kite.");
|
||||
}
|
||||
|
||||
if (options.doHiveImport()
|
||||
&& options.getIncrementalMode().equals(IncrementalMode.DateLastModified)) {
|
||||
throw new InvalidOptionsException(HIVE_IMPORT_WITH_LASTMODIFIED_NOT_SUPPORTED);
|
||||
|
@ -27,7 +27,6 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
|
||||
import org.apache.sqoop.testutil.CommonArgs;
|
||||
import org.apache.sqoop.testutil.HsqldbTestServer;
|
||||
import org.apache.sqoop.manager.ConnManager;
|
||||
@ -54,8 +53,6 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
@ -84,8 +81,6 @@ public class TestMerge extends BaseSqoopTestCase {
|
||||
Arrays.asList(new Integer(1), new Integer(43)),
|
||||
Arrays.asList(new Integer(3), new Integer(313)));
|
||||
|
||||
private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
@ -118,7 +113,6 @@ public Configuration newConf() {
|
||||
public SqoopOptions getSqoopOptions(Configuration conf) {
|
||||
SqoopOptions options = new SqoopOptions(conf);
|
||||
options.setConnectString(HsqldbTestServer.getDbUrl());
|
||||
options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation);
|
||||
|
||||
return options;
|
||||
}
|
||||
@ -164,14 +158,7 @@ public void testAvroFileMerge() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParquetFileMergeHadoop() throws Exception {
|
||||
parquetJobConfiguratorImplementation = HADOOP;
|
||||
runMergeTest(SqoopOptions.FileLayout.ParquetFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParquetFileMergeKite() throws Exception {
|
||||
parquetJobConfiguratorImplementation = KITE;
|
||||
public void testParquetFileMerge() throws Exception {
|
||||
runMergeTest(SqoopOptions.FileLayout.ParquetFile);
|
||||
}
|
||||
|
||||
|
@ -18,9 +18,6 @@
|
||||
|
||||
package org.apache.sqoop;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.testutil.ExportJobTestCase;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -32,8 +29,6 @@
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import parquet.avro.AvroParquetWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -44,7 +39,6 @@
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -58,23 +52,11 @@
|
||||
/**
|
||||
* Test that we can export Parquet Data Files from HDFS into databases.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestParquetExport extends ExportJobTestCase {
|
||||
|
||||
@Parameterized.Parameters(name = "parquetImplementation = {0}")
|
||||
public static Iterable<? extends Object> parquetImplementationParameters() {
|
||||
return Arrays.asList("kite", "hadoop");
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
private final String parquetImplementation;
|
||||
|
||||
public TestParquetExport(String parquetImplementation) {
|
||||
this.parquetImplementation = parquetImplementation;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an argv for the CodeGenTool to use when creating tables to export.
|
||||
*/
|
||||
@ -144,8 +126,6 @@ public Schema getColumnParquetSchema() {
|
||||
|
||||
/**
|
||||
* Create a data file that gets exported to the db.
|
||||
* Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files
|
||||
* but since we do not use Kite in our test cases anymore we generate the .metadata directory here.
|
||||
* @param numRecords how many records to write to the file.
|
||||
*/
|
||||
protected void createParquetFile(int numRecords,
|
||||
@ -153,7 +133,6 @@ protected void createParquetFile(int numRecords,
|
||||
|
||||
Schema schema = buildSchema(extraCols);
|
||||
|
||||
createMetadataDir(schema);
|
||||
String fileName = UUID.randomUUID().toString() + ".parquet";
|
||||
Path filePath = new Path(getTablePath(), fileName);
|
||||
try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) {
|
||||
@ -167,25 +146,6 @@ protected void createParquetFile(int numRecords,
|
||||
}
|
||||
}
|
||||
|
||||
private void createMetadataDir(Schema schema) throws IOException {
|
||||
final String descriptorFileTemplate = "location=file\\:%s\n" +
|
||||
" version=1\n" +
|
||||
" compressionType=snappy\n" +
|
||||
" format=parquet\n";
|
||||
Path metadataDirPath = new Path(getTablePath(), ".metadata");
|
||||
Path schemaFile = new Path(metadataDirPath, "schema.avsc");
|
||||
Path descriptorFile = new Path(metadataDirPath, "descriptor.properties");
|
||||
FileSystem fileSystem = getTablePath().getFileSystem(new Configuration());
|
||||
fileSystem.mkdirs(metadataDirPath);
|
||||
|
||||
try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) {
|
||||
fileOs.write(schema.toString().getBytes());
|
||||
}
|
||||
try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) {
|
||||
fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
private Schema buildSchema(ColumnGenerator... extraCols) {
|
||||
List<Field> fields = new ArrayList<Field>();
|
||||
fields.add(buildField("id", Schema.Type.INT));
|
||||
@ -492,11 +452,4 @@ public void testMissingParquetFields() throws IOException, SQLException {
|
||||
thrown.reportMissingExceptionWithMessage("Expected Exception on missing Parquet fields");
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration getConf() {
|
||||
Configuration conf = super.getConf();
|
||||
conf.set("parquetjob.configurator.implementation", parquetImplementation);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.sqoop;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.testutil.CommonArgs;
|
||||
import org.apache.sqoop.testutil.HsqldbTestServer;
|
||||
import org.apache.sqoop.testutil.ImportJobTestCase;
|
||||
@ -31,9 +30,6 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -48,32 +44,15 @@
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* Tests --as-parquetfile.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestParquetImport extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory
|
||||
.getLog(TestParquetImport.class.getName());
|
||||
|
||||
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
|
||||
|
||||
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
|
||||
|
||||
@Parameters(name = "parquetImplementation = {0}")
|
||||
public static Iterable<? extends Object> parquetImplementationParameters() {
|
||||
return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
|
||||
}
|
||||
|
||||
private final String parquetImplementation;
|
||||
|
||||
public TestParquetImport(String parquetImplementation) {
|
||||
this.parquetImplementation = parquetImplementation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the argv to pass to Sqoop.
|
||||
*
|
||||
@ -136,27 +115,17 @@ public void testSnappyCompression() throws IOException {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHadoopGzipCompression() throws IOException {
|
||||
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
|
||||
public void testGzipCompression() throws IOException {
|
||||
runParquetImportTest("gzip");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKiteDeflateCompression() throws IOException {
|
||||
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
|
||||
// The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
|
||||
// See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
|
||||
runParquetImportTest("deflate", "gzip");
|
||||
}
|
||||
|
||||
/**
|
||||
* This test case is added to document that the deflate codec is not supported with
|
||||
* the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test(expected = IOException.class)
|
||||
public void testHadoopDeflateCompression() throws IOException {
|
||||
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
|
||||
public void testDeflateCompression() throws IOException {
|
||||
runParquetImportTest("deflate");
|
||||
}
|
||||
|
||||
@ -334,11 +303,4 @@ private void checkField(Field field, String name, Type type) {
|
||||
assertEquals(Type.NULL, field.schema().getTypes().get(0).getType());
|
||||
assertEquals(type, field.schema().getTypes().get(1).getType());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration getConf() {
|
||||
Configuration conf = super.getConf();
|
||||
conf.set("parquetjob.configurator.implementation", parquetImplementation);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
@ -23,20 +23,12 @@
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.sqoop.Sqoop;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.avro.AvroSchemaMismatchException;
|
||||
import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -54,7 +46,6 @@
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import static java.util.Collections.sort;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -284,54 +275,6 @@ public void testNormalHiveImport() throws IOException {
|
||||
getArgv(false, null), new ImportTool());
|
||||
}
|
||||
|
||||
/** Test that strings and ints are handled in the normal fashion as parquet
|
||||
* file. */
|
||||
@Test
|
||||
public void testNormalHiveImportAsParquet() throws IOException {
|
||||
final String TABLE_NAME = "normal_hive_import_as_parquet";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = getTypes();
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
String [] extraArgs = {"--as-parquetfile"};
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
|
||||
new ImportTool());
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
}
|
||||
|
||||
private void verifyHiveDataset(Object[][] valsArray) {
|
||||
List<String> expected = getExpectedLines(valsArray);
|
||||
List<String> result = new ParquetReader(getTablePath()).readAllInCsv();
|
||||
|
||||
sort(expected);
|
||||
sort(result);
|
||||
|
||||
assertEquals(expected, result);
|
||||
}
|
||||
|
||||
private List<String> getExpectedLines(Object[][] valsArray) {
|
||||
List<String> expectations = new ArrayList<>();
|
||||
if (valsArray != null) {
|
||||
for (Object[] vals : valsArray) {
|
||||
expectations.add(toCsv(vals));
|
||||
}
|
||||
}
|
||||
return expectations;
|
||||
}
|
||||
|
||||
private String toCsv(Object[] vals) {
|
||||
StringBuilder result = new StringBuilder();
|
||||
|
||||
for (Object val : vals) {
|
||||
result.append(val).append(",");
|
||||
}
|
||||
|
||||
result.deleteCharAt(result.length() - 1);
|
||||
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
/** Test that table is created in hive with no data import. */
|
||||
@Test
|
||||
public void testCreateOnlyHiveImport() throws IOException {
|
||||
@ -365,108 +308,6 @@ public void testCreateOverwriteHiveImport() throws IOException {
|
||||
new CreateHiveTableTool());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that table is created in hive and replaces the existing table if
|
||||
* any.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateOverwriteHiveImportAsParquet() throws IOException {
|
||||
final String TABLE_NAME = "create_overwrite_hive_import_as_parquet";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = getTypes();
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
String [] extraArgs = {"--as-parquetfile"};
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
|
||||
String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
|
||||
String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
|
||||
runImportTest(TABLE_NAME, types, valsToOverwrite, "",
|
||||
getArgv(false, extraArgsForOverwrite), tool);
|
||||
verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
|
||||
final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
|
||||
String [] types = { "VARCHAR(32)", "INTEGER", "DATE" };
|
||||
String [] vals = { "'test'", "42", "'2009-12-31'" };
|
||||
String [] extraArgs = {"--as-parquetfile"};
|
||||
|
||||
createHiveDataSet(TABLE_NAME);
|
||||
|
||||
createTableWithColTypes(types, vals);
|
||||
|
||||
thrown.expect(AvroSchemaMismatchException.class);
|
||||
thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
|
||||
|
||||
SqoopOptions sqoopOptions = getSqoopOptions(getConf());
|
||||
sqoopOptions.setThrowOnError(true);
|
||||
Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions);
|
||||
sqoop.run(getArgv(false, extraArgs));
|
||||
|
||||
}
|
||||
|
||||
private void createHiveDataSet(String tableName) {
|
||||
Schema dataSetSchema = SchemaBuilder
|
||||
.record(tableName)
|
||||
.fields()
|
||||
.name(getColName(0)).type().nullable().stringType().noDefault()
|
||||
.name(getColName(1)).type().nullable().stringType().noDefault()
|
||||
.name(getColName(2)).type().nullable().stringType().noDefault()
|
||||
.endRecord();
|
||||
String dataSetUri = "dataset:hive:/default/" + tableName;
|
||||
KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that records are appended to an existing table.
|
||||
*/
|
||||
@Test
|
||||
public void testAppendHiveImportAsParquet() throws IOException {
|
||||
final String TABLE_NAME = "append_hive_import_as_parquet";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] types = getTypes();
|
||||
String [] vals = { "'test'", "42", "'somestring'" };
|
||||
String [] extraArgs = {"--as-parquetfile"};
|
||||
String [] args = getArgv(false, extraArgs);
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", args, tool);
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
|
||||
String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
|
||||
runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
|
||||
verifyHiveDataset(new Object[][] {
|
||||
{"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive create and --as-parquetfile options validation.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateHiveImportAsParquet() throws ParseException, InvalidOptionsException {
|
||||
final String TABLE_NAME = "CREATE_HIVE_IMPORT_AS_PARQUET";
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String [] extraArgs = {"--as-parquetfile", "--create-hive-table"};
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
thrown.expect(InvalidOptionsException.class);
|
||||
thrown.reportMissingExceptionWithMessage("Expected InvalidOptionsException during Hive table creation with " +
|
||||
"--as-parquetfile");
|
||||
tool.validateOptions(tool.parseArguments(getArgv(false, extraArgs), null,
|
||||
null, true));
|
||||
}
|
||||
|
||||
|
||||
/** Test that dates are coerced properly to strings. */
|
||||
@Test
|
||||
public void testDate() throws IOException {
|
||||
|
@ -28,7 +28,6 @@
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
|
||||
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
|
||||
import static org.hamcrest.CoreMatchers.sameInstance;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@ -124,7 +123,7 @@ public void testApplyCommonOptionsPrefersParquetJobConfigurationImplementationFr
|
||||
public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception {
|
||||
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid");
|
||||
|
||||
exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]");
|
||||
exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [HADOOP]");
|
||||
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
|
||||
}
|
||||
|
||||
@ -132,6 +131,6 @@ public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplem
|
||||
public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception {
|
||||
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
|
||||
|
||||
assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation());
|
||||
assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user