5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 03:21:55 +08:00

SQOOP-3335: Add Hive support to the new Parquet writing implementation

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-07-16 13:41:12 +02:00
parent a6bedca4b2
commit e639053251
14 changed files with 625 additions and 70 deletions

View File

@ -20,6 +20,7 @@
import java.sql.Types;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,6 +29,15 @@
*/
public final class HiveTypes {
private static final String HIVE_TYPE_TINYINT = "TINYINT";
private static final String HIVE_TYPE_INT = "INT";
private static final String HIVE_TYPE_BIGINT = "BIGINT";
private static final String HIVE_TYPE_FLOAT = "FLOAT";
private static final String HIVE_TYPE_DOUBLE = "DOUBLE";
private static final String HIVE_TYPE_STRING = "STRING";
private static final String HIVE_TYPE_BOOLEAN = "BOOLEAN";
private static final String HIVE_TYPE_BINARY = "BINARY";
public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
private HiveTypes() { }
@ -41,7 +51,7 @@ public static String toHiveType(int sqlType) {
switch (sqlType) {
case Types.INTEGER:
case Types.SMALLINT:
return "INT";
return HIVE_TYPE_INT;
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
@ -52,20 +62,20 @@ public static String toHiveType(int sqlType) {
case Types.TIME:
case Types.TIMESTAMP:
case Types.CLOB:
return "STRING";
return HIVE_TYPE_STRING;
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.DOUBLE:
case Types.REAL:
return "DOUBLE";
return HIVE_TYPE_DOUBLE;
case Types.BIT:
case Types.BOOLEAN:
return "BOOLEAN";
return HIVE_TYPE_BOOLEAN;
case Types.TINYINT:
return "TINYINT";
return HIVE_TYPE_TINYINT;
case Types.BIGINT:
return "BIGINT";
return HIVE_TYPE_BIGINT;
default:
// TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
// BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
@ -73,6 +83,29 @@ public static String toHiveType(int sqlType) {
}
}
public static String toHiveType(Schema.Type avroType) {
switch (avroType) {
case BOOLEAN:
return HIVE_TYPE_BOOLEAN;
case INT:
return HIVE_TYPE_INT;
case LONG:
return HIVE_TYPE_BIGINT;
case FLOAT:
return HIVE_TYPE_FLOAT;
case DOUBLE:
return HIVE_TYPE_DOUBLE;
case STRING:
case ENUM:
return HIVE_TYPE_STRING;
case BYTES:
case FIXED:
return HIVE_TYPE_BINARY;
default:
return null;
}
}
/**
* @return true if a sql type can't be translated to a precise match
* in Hive, and we have to cast it to something more generic.

View File

@ -20,24 +20,31 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.io.CodecMap;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.util.FileSystemUtil;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
/**
* Creates (Hive-specific) SQL DDL statements to create tables to hold data
* we're importing from another source.
@ -56,6 +63,7 @@ public class TableDefWriter {
private String inputTableName;
private String outputTableName;
private boolean commentsEnabled;
private Schema avroSchema;
/**
* Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
@ -82,6 +90,9 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
* Get the column names to import.
*/
private String [] getColumnNames() {
if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
return getColumnNamesFromAvroSchema();
}
String [] colNames = options.getColumns();
if (null != colNames) {
return colNames; // user-specified column names.
@ -92,6 +103,16 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
}
}
private String[] getColumnNamesFromAvroSchema() {
List<String> result = new ArrayList<>();
for (Schema.Field field : getAvroSchema().getFields()) {
result.add(field.name());
}
return result.toArray(new String[result.size()]);
}
/**
* @return the CREATE TABLE statement for the table to load into hive.
*/
@ -108,6 +129,7 @@ public String getCreateTableStmt() throws IOException {
}
String [] colNames = getColumnNames();
Map<String, Schema.Type> columnNameToAvroType = getColumnNameToAvroTypeMapping();
StringBuilder sb = new StringBuilder();
if (options.doFailIfHiveTableExists()) {
if (isHiveExternalTableSet) {
@ -158,22 +180,18 @@ public String getCreateTableStmt() throws IOException {
first = false;
Integer colType = columnTypes.get(col);
String hiveColType = userMapping.getProperty(col);
if (hiveColType == null) {
hiveColType = connManager.toHiveType(inputTableName, col, colType);
}
if (null == hiveColType) {
throw new IOException("Hive does not support the SQL type for column "
+ col);
String hiveColType;
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
Integer colType = columnTypes.get(col);
hiveColType = getHiveColumnTypeForTextTable(userMapping, col, colType);
} else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
hiveColType = HiveTypes.toHiveType(columnNameToAvroType.get(col));
} else {
throw new RuntimeException("File format is not supported for Hive tables.");
}
sb.append('`').append(col).append("` ").append(hiveColType);
if (HiveTypes.isHiveTypeImprovised(colType)) {
LOG.warn(
"Column " + col + " had to be cast to a less precise type in Hive");
}
}
sb.append(") ");
@ -190,19 +208,23 @@ public String getCreateTableStmt() throws IOException {
.append(" STRING) ");
}
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
sb.append("' LINES TERMINATED BY '");
sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
String codec = options.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
sb.append("' STORED AS INPUTFORMAT "
+ "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
sb.append(" OUTPUTFORMAT "
+ "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
if (SqoopOptions.FileLayout.ParquetFile.equals(options.getFileLayout())) {
sb.append("STORED AS PARQUET");
} else {
sb.append("' STORED AS TEXTFILE");
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
sb.append("' LINES TERMINATED BY '");
sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
String codec = options.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
sb.append("' STORED AS INPUTFORMAT "
+ "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
sb.append(" OUTPUTFORMAT "
+ "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
} else {
sb.append("' STORED AS TEXTFILE");
}
}
if (isHiveExternalTableSet) {
@ -214,6 +236,50 @@ public String getCreateTableStmt() throws IOException {
return sb.toString();
}
private Map<String, Schema.Type> getColumnNameToAvroTypeMapping() {
if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
return Collections.emptyMap();
}
Map<String, Schema.Type> result = new HashMap<>();
Schema avroSchema = getAvroSchema();
for (Schema.Field field : avroSchema.getFields()) {
result.put(field.name(), getNonNullAvroType(field.schema()));
}
return result;
}
private Schema.Type getNonNullAvroType(Schema schema) {
if (schema.getType() != Schema.Type.UNION) {
return schema.getType();
}
for (Schema subSchema : schema.getTypes()) {
if (subSchema.getType() != Schema.Type.NULL) {
return subSchema.getType();
}
}
return null;
}
private String getHiveColumnTypeForTextTable(Properties userMapping, String columnName, Integer columnType) throws IOException {
String hiveColType = userMapping.getProperty(columnName);
if (hiveColType == null) {
hiveColType = connManager.toHiveType(inputTableName, columnName, columnType);
}
if (null == hiveColType) {
throw new IOException("Hive does not support the SQL type for column "
+ columnName);
}
if (HiveTypes.isHiveTypeImprovised(columnType)) {
LOG.warn(
"Column " + columnName + " had to be cast to a less precise type in Hive");
}
return hiveColType;
}
/**
* @return the LOAD DATA statement to import the data in HDFS into hive.
*/
@ -320,5 +386,14 @@ String getOutputTableName() {
boolean isCommentsEnabled() {
return commentsEnabled;
}
Schema getAvroSchema() {
if (avroSchema == null) {
String schemaString = options.getConf().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
avroSchema = AvroUtil.parseAvroSchema(schemaString);
}
return avroSchema;
}
}

View File

@ -39,4 +39,5 @@ public interface ParquetImportJobConfigurator {
Class<? extends OutputFormat> getOutputFormatClass();
boolean isHiveImportNeeded();
}

View File

@ -58,6 +58,11 @@ public Class<? extends OutputFormat> getOutputFormatClass() {
return AvroParquetOutputFormat.class;
}
@Override
public boolean isHiveImportNeeded() {
return true;
}
void configureOutputCodec(Job job) {
String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY);
if (outputCodec != null) {

View File

@ -79,6 +79,11 @@ public Class<? extends OutputFormat> getOutputFormatClass() {
return DatasetKeyOutputFormat.class;
}
@Override
public boolean isHiveImportNeeded() {
return false;
}
private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
if (options.doHiveImport()) {
String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :

View File

@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
import java.io.File;
@ -1586,12 +1587,13 @@ protected void validateHiveOptions(SqoopOptions options)
+ "importing into SequenceFile format.");
}
// Hive import and create hive table not compatible for ParquetFile format
// Hive import and create hive table not compatible for ParquetFile format when using Kite
if (options.doHiveImport()
&& options.doFailIfHiveTableExists()
&& options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
&& options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile
&& options.getParquetConfiguratorImplementation() == KITE) {
throw new InvalidOptionsException("Hive import and create hive table is not compatible with "
+ "importing into ParquetFile format.");
+ "importing into ParquetFile format using Kite.");
}
if (options.doHiveImport()
@ -1902,7 +1904,6 @@ protected void validateHasDirectConnectorOption(SqoopOptions options) throws Sqo
protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException {
final String withoutTemplate = "The %s option cannot be used without the %s option.";
final String withTemplate = "The %s option cannot be used with the %s option.";
if (isSet(options.getHs2Url()) && !options.doHiveImport()) {
throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG));
@ -1915,11 +1916,6 @@ protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.Inva
if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) {
throw new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG));
}
if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) {
throw new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG));
}
}
private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException {

View File

@ -46,6 +46,7 @@
import org.apache.sqoop.hive.HiveClientFactory;
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.MergeJob;
import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
import org.apache.sqoop.metastore.JobData;
import org.apache.sqoop.metastore.JobStorage;
@ -541,13 +542,9 @@ protected boolean importTable(SqoopOptions options) throws IOException, ImportEx
}
// If the user wants this table to be in Hive, perform that post-load.
if (options.doHiveImport()) {
// For Parquet file, the import action will create hive table directly via
// kite. So there is no need to do hive import as a post step again.
if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
hiveClient.importTable();
}
if (isHiveImportNeeded(options)) {
HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
hiveClient.importTable();
}
saveIncrementalState(options);
@ -1192,5 +1189,18 @@ public void validateOptions(SqoopOptions options)
validateHCatalogOptions(options);
validateAccumuloOptions(options);
}
private boolean isHiveImportNeeded(SqoopOptions options) {
if (!options.doHiveImport()) {
return false;
}
if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
return true;
}
ParquetJobConfiguratorFactory parquetJobConfigurator = getParquetJobConfigurator(options);
return parquetJobConfigurator.createParquetImportJobConfigurator().isHiveImportNeeded();
}
}

View File

@ -26,8 +26,6 @@
import org.junit.rules.ExpectedException;
import parquet.hadoop.metadata.CompressionCodecName;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
@ -157,13 +155,4 @@ private ArgumentArrayBuilder incrementalImportArgs(String connectString, String
.withOption("merge-key", mergeKey)
.withOption("last-value", lastValue);
}
private static long timeFromString(String timeStampString) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return format.parse(timeStampString).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,358 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.HiveServer2TestUtil;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.util.ParquetReader;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static java.util.Arrays.deepEquals;
import static org.apache.sqoop.testutil.BaseSqoopTestCase.timeFromString;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@RunWith(Enclosed.class)
public class TestHiveServer2ParquetImport {
private static final String[] TEST_COLUMN_NAMES = {"C1_VARCHAR", "C2#INTEGER", "3C_CHAR"};
private static final String[] TEST_COLUMN_TYPES = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
private static final String[] TEST_COLUMN_ALL_TYPES = {"INTEGER", "BIGINT", "DOUBLE", "DECIMAL(10, 2)", "BOOLEAN", "TIMESTAMP", "BINARY", "VARCHAR(100)", "CHAR(100)"};
private static final List<Object> TEST_COLUMN_ALL_TYPES_VALUES = Arrays.<Object>asList(10, 12345678910123L, 12.34, 456842.45, "TRUE", "2018-06-14 15:00:00.000", "abcdef", "testVarchar", "testChar");
private static final Object[] EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES = {10, 12345678910123L, 12.34, "456842.45", true, timeFromString("2018-06-14 15:00:00.000"), decodeHex("abcdef"), "testVarchar", "testChar"};
private static final List<Object> TEST_COLUMN_VALUES = Arrays.<Object>asList("test", 42, "somestring");
private static final List<Object> TEST_COLUMN_VALUES_MAPPED = Arrays.<Object>asList("test", "42", "somestring");
private static final List<Object> TEST_COLUMN_VALUES_LINE2 = Arrays.<Object>asList("test2", 4242, "somestring2");
private static HiveMiniCluster hiveMiniCluster;
private static HiveServer2TestUtil hiveServer2TestUtil;
@RunWith(Parameterized.class)
public static class ParquetCompressionCodecTestCase extends ImportJobTestCase {
@Parameters(name = "compressionCodec = {0}")
public static Iterable<? extends Object> authenticationParameters() {
return Arrays.asList("snappy", "gzip");
}
@BeforeClass
public static void beforeClass() {
startHiveMiniCluster();
}
@AfterClass
public static void afterClass() {
stopHiveMiniCluster();
}
private final String compressionCodec;
public ParquetCompressionCodecTestCase(String compressionCodec) {
this.compressionCodec = compressionCodec;
}
@Override
@Before
public void setUp() {
super.setUp();
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
}
@Test
public void testHiveImportAsParquetWithCompressionCodecCanBeLoaded() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("compression-codec", compressionCodec)
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
@Test
public void testImportedFilesHaveCorrectCodec() throws Exception {
Path tablePath = new Path(hiveMiniCluster.getTempFolderPath() + "/" + getTableName().toLowerCase());
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("compression-codec", compressionCodec)
.build();
runImport(args);
CompressionCodecName codec = new ParquetReader(tablePath).getCodec();
assertEquals(compressionCodec, codec.name().toLowerCase());
}
}
public static class GeneralParquetTestCase extends ImportJobTestCase {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@BeforeClass
public static void beforeClass() {
startHiveMiniCluster();
}
@AfterClass
public static void afterClass() {
stopHiveMiniCluster();
}
@Override
@Before
public void setUp() {
super.setUp();
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
}
@Test
public void testNormalHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
@Test
public void testHiveImportAsParquetWithMapColumnJavaAndOriginalColumnNameSucceeds() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-java", "C2#INTEGER=String")
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES_MAPPED));
}
/**
* This test case documents that the Avro identifier(C2_INTEGER)
* of a special column name(C2#INTEGER) cannot be used in map-column-java.
* The reason is that org.apache.sqoop.orm.AvroSchemaGenerator#toAvroType(java.lang.String, int)
* which maps the Avro schema type uses the original column name and
* not the Avro identifier but org.apache.sqoop.orm.ClassWriter#toJavaType(java.lang.String, int)
* can map the DAO class field types based on the Avro identifier too so there will be a discrepancy
* between the generated Avro schema types and the DAO class field types.
*/
@Test
public void testHiveImportAsParquetWithMapColumnJavaAndAvroIdentifierFails() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-java", "C2_INTEGER=String")
.build();
expectedException.expect(IOException.class);
runImport(args);
}
/**
* This test case documents that a mapping with the Avro identifier(C2_INTEGER)
* of a special column name(C2#INTEGER) is ignored in map-column-hive.
* The reason is that the column type of the Avro schema and the Hive table must
* be equal and if we would be able to override the Hive column type using map-column-hive
* the inconsistency would cause a Hive error during reading.
*/
@Test
public void testHiveImportAsParquetWithMapColumnHiveAndAvroIdentifierIgnoresMapping() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-hive", "C2_INTEGER=STRING")
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES));
}
/**
* This test case documents that the special column name(C2#INTEGER)
* cannot be used in map-column-hive.
* The reason is that Sqoop uses the Avro identifier(C2_INTEGER) as Hive column
* name and there is a check in org.apache.sqoop.hive.TableDefWriter#getCreateTableStmt()
* which verifies that all the columns in map-column-hive are actually valid column names.
* Since C2_INTEGER is used instead of C2#INTEGER the check will fail on the latter.
*/
@Test
public void testHiveImportAsParquetWithMapColumnHiveAndOriginalColumnNameFails() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("map-column-hive", "C2#INTEGER=STRING")
.build();
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("No column by the name C2#INTEGERfound while importing data");
runImportThrowingException(args);
}
@Test
public void testAllDataTypesHiveImportAsParquet() throws Exception {
setCurTableName("all_datatypes_table");
createTableWithColTypes(TEST_COLUMN_ALL_TYPES, TEST_COLUMN_ALL_TYPES_VALUES);
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
// The result contains a byte[] so we have to use Arrays.deepEquals() to assert.
Object[] firstRow = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()).iterator().next().toArray();
assertTrue(deepEquals(EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES, firstRow));
}
@Test
public void testAppendHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName()).build();
runImport(args);
insertIntoTable(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertThat(rows, hasItems(TEST_COLUMN_VALUES, TEST_COLUMN_VALUES_LINE2));
}
@Test
public void testCreateOverwriteHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("hive-overwrite")
.build();
runImport(args);
// Recreate the test table to contain different test data.
dropTableIfExists(getTableName());
createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertEquals(asList(TEST_COLUMN_VALUES_LINE2), rows);
}
/**
* --create-hive-table option is now supported with the Hadoop Parquet writer implementation.
*/
@Test
public void testCreateHiveImportAsParquet() throws Exception {
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("create-hive-table")
.build();
runImport(args);
expectedException.expectMessage("Error executing Hive import.");
runImportThrowingException(args);
}
/**
* This scenario works fine since the Hadoop Parquet writer implementation does not
* check the Parquet schema of the existing files. The exception will be thrown
* by Hive when it tries to read the files with different schema.
*/
@Test
public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
String hiveTableName = "hiveImportAsParquetWhenTableExistsWithIncompatibleSchema";
String[] incompatibleSchemaTableTypes = {"INTEGER", "INTEGER", "INTEGER"};
List<Object> incompatibleSchemaTableData = Arrays.<Object>asList(100, 200, 300);
String[] args = commonArgs(getConnectString(), getTableName())
.withOption("hive-table", hiveTableName)
.build();
runImport(args);
// We make sure we create a new table in the test RDBMS.
incrementTableNum();
createTableWithColTypes(incompatibleSchemaTableTypes, incompatibleSchemaTableData);
// Recreate the argument array to pick up the new RDBMS table name.
args = commonArgs(getConnectString(), getTableName())
.withOption("hive-table", hiveTableName)
.build();
runImport(args);
}
}
private static ArgumentArrayBuilder commonArgs(String connectString, String tableName) {
return new ArgumentArrayBuilder()
.withProperty("parquetjob.configurator.implementation", "hadoop")
.withOption("connect", connectString)
.withOption("table", tableName)
.withOption("hive-import")
.withOption("hs2-url", hiveMiniCluster.getUrl())
.withOption("num-mappers", "1")
.withOption("as-parquetfile")
.withOption("delete-target-dir");
}
public static void startHiveMiniCluster() {
hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
hiveMiniCluster.start();
hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
}
public static void stopHiveMiniCluster() {
hiveMiniCluster.stop();
}
private static byte[] decodeHex(String hexString) {
try {
return Hex.decodeHex(hexString.toCharArray());
} catch (DecoderException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -75,6 +75,7 @@ public void testImport() throws Exception {
.withOption("hive-import")
.withOption("hs2-url", hiveMiniCluster.getUrl())
.withOption("split-by", getColName(1))
.withOption("delete-target-dir")
.build();
runImport(args);

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.avro.Schema;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Arrays;
import static org.apache.sqoop.hive.HiveTypes.toHiveType;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestHiveTypesForAvroTypeMapping {
private final String hiveType;
private final Schema.Type avroType;
@Parameters(name = "hiveType = {0}, avroType = {1}")
public static Iterable<? extends Object> parameters() {
return Arrays.asList(
new Object[] {"BOOLEAN", Schema.Type.BOOLEAN},
new Object[] {"INT", Schema.Type.INT},
new Object[] {"BIGINT", Schema.Type.LONG},
new Object[] {"FLOAT", Schema.Type.FLOAT},
new Object[] {"DOUBLE", Schema.Type.DOUBLE},
new Object[] {"STRING", Schema.Type.ENUM},
new Object[] {"STRING", Schema.Type.STRING},
new Object[] {"BINARY", Schema.Type.BYTES},
new Object[] {"BINARY", Schema.Type.FIXED});
}
public TestHiveTypesForAvroTypeMapping(String hiveType, Schema.Type avroType) {
this.hiveType = hiveType;
this.avroType = avroType;
}
@Test
public void testAvroTypeToHiveTypeMapping() throws Exception {
assertEquals(hiveType, toHiveType(avroType));
}
}

View File

@ -36,6 +36,8 @@
import java.sql.Types;
import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -51,6 +53,10 @@
*/
public class TestTableDefWriter {
private static final String TEST_AVRO_SCHEMA = "{\"type\":\"record\",\"name\":\"IMPORT_TABLE_1\",\"fields\":[{\"name\":\"C1_VARCHAR\",\"type\":[\"null\",\"string\"]},{\"name\":\"C2_INTEGER\",\"type\":[\"null\",\"int\"]},{\"name\":\"_3C_CHAR\",\"type\":[\"null\",\"string\"]}]}";
private static final String EXPECTED_CREATE_PARQUET_TABLE_STMNT = "CREATE TABLE IF NOT EXISTS `outputTable` ( `C1_VARCHAR` STRING, `C2_INTEGER` INT, `_3C_CHAR` STRING) STORED AS PARQUET";
public static final Log LOG = LogFactory.getLog(
TestTableDefWriter.class.getName());
@ -256,6 +262,14 @@ public void testGetCreateTableStmtDiscardsConnection() throws Exception {
verify(connManager).discardConnection(true);
}
@Test
public void testGetCreateTableStmtWithAvroSchema() throws Exception {
options.setFileLayout(ParquetFile);
options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, TEST_AVRO_SCHEMA);
assertEquals(EXPECTED_CREATE_PARQUET_TABLE_STMNT, writer.getCreateTableStmt());
}
private void setUpMockConnManager(String tableName, Map<String, Integer> typeMap) {
when(connManager.getColumnTypes(tableName)).thenReturn(typeMap);
when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{}));

View File

@ -41,6 +41,8 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
@ -322,6 +324,10 @@ protected void createTableWithColTypesAndNames(String[] colNames,
createTableWithColTypesAndNames(getTableName(), colNames, colTypes, vals);
}
protected void createTableWithColTypesAndNames(String[] colNames, String[] colTypes, List<Object> record) {
createTableWithColTypesAndNames(getTableName(), colNames, colTypes, toStringArray(record));
}
/**
* Create a table with a set of columns with their names and add a row of values.
* @param newTableName The name of the new table
@ -439,6 +445,10 @@ protected void insertRecordsIntoTable(String[] colTypes, List<List<Object>> reco
}
}
protected void insertIntoTable(String[] columns, String[] colTypes, List<Object> record) {
insertIntoTable(columns, colTypes, toStringArray(record));
}
protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) {
assert colTypes != null;
assert colTypes.length == vals.length;
@ -674,4 +684,13 @@ private String[] toStringArray(List<Object> columnValues) {
return result;
}
public static long timeFromString(String timeStampString) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return format.parse(timeStampString).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -137,16 +137,4 @@ public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImportAndHs2UserB
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option.");
when(sqoopOptions.doHiveImport()).thenReturn(true);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile);
sqoopTool.validateOptions(sqoopOptions);
}
}