diff --git a/src/java/org/apache/sqoop/hive/HiveTypes.java b/src/java/org/apache/sqoop/hive/HiveTypes.java index ad00535e..554a0360 100644 --- a/src/java/org/apache/sqoop/hive/HiveTypes.java +++ b/src/java/org/apache/sqoop/hive/HiveTypes.java @@ -20,6 +20,7 @@ import java.sql.Types; +import org.apache.avro.Schema; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,15 @@ */ public final class HiveTypes { + private static final String HIVE_TYPE_TINYINT = "TINYINT"; + private static final String HIVE_TYPE_INT = "INT"; + private static final String HIVE_TYPE_BIGINT = "BIGINT"; + private static final String HIVE_TYPE_FLOAT = "FLOAT"; + private static final String HIVE_TYPE_DOUBLE = "DOUBLE"; + private static final String HIVE_TYPE_STRING = "STRING"; + private static final String HIVE_TYPE_BOOLEAN = "BOOLEAN"; + private static final String HIVE_TYPE_BINARY = "BINARY"; + public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName()); private HiveTypes() { } @@ -41,7 +51,7 @@ public static String toHiveType(int sqlType) { switch (sqlType) { case Types.INTEGER: case Types.SMALLINT: - return "INT"; + return HIVE_TYPE_INT; case Types.VARCHAR: case Types.CHAR: case Types.LONGVARCHAR: @@ -52,20 +62,20 @@ public static String toHiveType(int sqlType) { case Types.TIME: case Types.TIMESTAMP: case Types.CLOB: - return "STRING"; + return HIVE_TYPE_STRING; case Types.NUMERIC: case Types.DECIMAL: case Types.FLOAT: case Types.DOUBLE: case Types.REAL: - return "DOUBLE"; + return HIVE_TYPE_DOUBLE; case Types.BIT: case Types.BOOLEAN: - return "BOOLEAN"; + return HIVE_TYPE_BOOLEAN; case Types.TINYINT: - return "TINYINT"; + return HIVE_TYPE_TINYINT; case Types.BIGINT: - return "BIGINT"; + return HIVE_TYPE_BIGINT; default: // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT. @@ -73,6 +83,29 @@ public static String toHiveType(int sqlType) { } } + public static String toHiveType(Schema.Type avroType) { + switch (avroType) { + case BOOLEAN: + return HIVE_TYPE_BOOLEAN; + case INT: + return HIVE_TYPE_INT; + case LONG: + return HIVE_TYPE_BIGINT; + case FLOAT: + return HIVE_TYPE_FLOAT; + case DOUBLE: + return HIVE_TYPE_DOUBLE; + case STRING: + case ENUM: + return HIVE_TYPE_STRING; + case BYTES: + case FIXED: + return HIVE_TYPE_BINARY; + default: + return null; + } + } + /** * @return true if a sql type can't be translated to a precise match * in Hive, and we have to cast it to something more generic. diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java index 27d988c5..b21dfe53 100644 --- a/src/java/org/apache/sqoop/hive/TableDefWriter.java +++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java @@ -20,24 +20,31 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Date; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Properties; +import org.apache.avro.Schema; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.avro.AvroUtil; import org.apache.sqoop.io.CodecMap; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.manager.ConnManager; import org.apache.sqoop.util.FileSystemUtil; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; + /** * Creates (Hive-specific) SQL DDL statements to create tables to hold data * we're importing from another source. @@ -56,6 +63,7 @@ public class TableDefWriter { private String inputTableName; private String outputTableName; private boolean commentsEnabled; + private Schema avroSchema; /** * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement. @@ -82,6 +90,9 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr, * Get the column names to import. */ private String [] getColumnNames() { + if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { + return getColumnNamesFromAvroSchema(); + } String [] colNames = options.getColumns(); if (null != colNames) { return colNames; // user-specified column names. @@ -92,6 +103,16 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr, } } + private String[] getColumnNamesFromAvroSchema() { + List result = new ArrayList<>(); + + for (Schema.Field field : getAvroSchema().getFields()) { + result.add(field.name()); + } + + return result.toArray(new String[result.size()]); + } + /** * @return the CREATE TABLE statement for the table to load into hive. */ @@ -108,6 +129,7 @@ public String getCreateTableStmt() throws IOException { } String [] colNames = getColumnNames(); + Map columnNameToAvroType = getColumnNameToAvroTypeMapping(); StringBuilder sb = new StringBuilder(); if (options.doFailIfHiveTableExists()) { if (isHiveExternalTableSet) { @@ -158,22 +180,18 @@ public String getCreateTableStmt() throws IOException { first = false; - Integer colType = columnTypes.get(col); - String hiveColType = userMapping.getProperty(col); - if (hiveColType == null) { - hiveColType = connManager.toHiveType(inputTableName, col, colType); - } - if (null == hiveColType) { - throw new IOException("Hive does not support the SQL type for column " - + col); + String hiveColType; + if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) { + Integer colType = columnTypes.get(col); + hiveColType = getHiveColumnTypeForTextTable(userMapping, col, colType); + } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { + hiveColType = HiveTypes.toHiveType(columnNameToAvroType.get(col)); + } else { + throw new RuntimeException("File format is not supported for Hive tables."); } sb.append('`').append(col).append("` ").append(hiveColType); - if (HiveTypes.isHiveTypeImprovised(colType)) { - LOG.warn( - "Column " + col + " had to be cast to a less precise type in Hive"); - } } sb.append(") "); @@ -190,19 +208,23 @@ public String getCreateTableStmt() throws IOException { .append(" STRING) "); } - sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '"); - sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim())); - sb.append("' LINES TERMINATED BY '"); - sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim())); - String codec = options.getCompressionCodec(); - if (codec != null && (codec.equals(CodecMap.LZOP) - || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) { - sb.append("' STORED AS INPUTFORMAT " - + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'"); - sb.append(" OUTPUTFORMAT " - + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"); + if (SqoopOptions.FileLayout.ParquetFile.equals(options.getFileLayout())) { + sb.append("STORED AS PARQUET"); } else { - sb.append("' STORED AS TEXTFILE"); + sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '"); + sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim())); + sb.append("' LINES TERMINATED BY '"); + sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim())); + String codec = options.getCompressionCodec(); + if (codec != null && (codec.equals(CodecMap.LZOP) + || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) { + sb.append("' STORED AS INPUTFORMAT " + + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'"); + sb.append(" OUTPUTFORMAT " + + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"); + } else { + sb.append("' STORED AS TEXTFILE"); + } } if (isHiveExternalTableSet) { @@ -214,6 +236,50 @@ public String getCreateTableStmt() throws IOException { return sb.toString(); } + private Map getColumnNameToAvroTypeMapping() { + if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) { + return Collections.emptyMap(); + } + Map result = new HashMap<>(); + Schema avroSchema = getAvroSchema(); + for (Schema.Field field : avroSchema.getFields()) { + result.put(field.name(), getNonNullAvroType(field.schema())); + } + + return result; + } + + private Schema.Type getNonNullAvroType(Schema schema) { + if (schema.getType() != Schema.Type.UNION) { + return schema.getType(); + } + + for (Schema subSchema : schema.getTypes()) { + if (subSchema.getType() != Schema.Type.NULL) { + return subSchema.getType(); + } + } + + return null; + } + + private String getHiveColumnTypeForTextTable(Properties userMapping, String columnName, Integer columnType) throws IOException { + String hiveColType = userMapping.getProperty(columnName); + if (hiveColType == null) { + hiveColType = connManager.toHiveType(inputTableName, columnName, columnType); + } + if (null == hiveColType) { + throw new IOException("Hive does not support the SQL type for column " + + columnName); + } + + if (HiveTypes.isHiveTypeImprovised(columnType)) { + LOG.warn( + "Column " + columnName + " had to be cast to a less precise type in Hive"); + } + return hiveColType; + } + /** * @return the LOAD DATA statement to import the data in HDFS into hive. */ @@ -320,5 +386,14 @@ String getOutputTableName() { boolean isCommentsEnabled() { return commentsEnabled; } + + Schema getAvroSchema() { + if (avroSchema == null) { + String schemaString = options.getConf().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY); + avroSchema = AvroUtil.parseAvroSchema(schemaString); + } + + return avroSchema; + } } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java index eb6d08f8..30db6ed7 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java @@ -39,4 +39,5 @@ public interface ParquetImportJobConfigurator { Class getOutputFormatClass(); + boolean isHiveImportNeeded(); } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java index 3f35faf8..90b910a3 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java @@ -58,6 +58,11 @@ public Class getOutputFormatClass() { return AvroParquetOutputFormat.class; } + @Override + public boolean isHiveImportNeeded() { + return true; + } + void configureOutputCodec(Job job) { String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY); if (outputCodec != null) { diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java index feb3bf19..7e179a27 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java @@ -79,6 +79,11 @@ public Class getOutputFormatClass() { return DatasetKeyOutputFormat.class; } + @Override + public boolean isHiveImportNeeded() { + return false; + } + private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException { if (options.doHiveImport()) { String hiveDatabase = options.getHiveDatabaseName() == null ? "default" : diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index e505c267..87fc5e98 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf; import java.io.File; @@ -1586,12 +1587,13 @@ protected void validateHiveOptions(SqoopOptions options) + "importing into SequenceFile format."); } - // Hive import and create hive table not compatible for ParquetFile format + // Hive import and create hive table not compatible for ParquetFile format when using Kite if (options.doHiveImport() && options.doFailIfHiveTableExists() - && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { + && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile + && options.getParquetConfiguratorImplementation() == KITE) { throw new InvalidOptionsException("Hive import and create hive table is not compatible with " - + "importing into ParquetFile format."); + + "importing into ParquetFile format using Kite."); } if (options.doHiveImport() @@ -1902,7 +1904,6 @@ protected void validateHasDirectConnectorOption(SqoopOptions options) throws Sqo protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException { final String withoutTemplate = "The %s option cannot be used without the %s option."; - final String withTemplate = "The %s option cannot be used with the %s option."; if (isSet(options.getHs2Url()) && !options.doHiveImport()) { throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG)); @@ -1915,11 +1916,6 @@ protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.Inva if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) { throw new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG)); } - - if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) { - throw new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG)); - } - } private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException { diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 25c3f703..f7310b93 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -46,6 +46,7 @@ import org.apache.sqoop.hive.HiveClientFactory; import org.apache.sqoop.manager.ImportJobContext; import org.apache.sqoop.mapreduce.MergeJob; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; import org.apache.sqoop.metastore.JobData; import org.apache.sqoop.metastore.JobStorage; @@ -541,13 +542,9 @@ protected boolean importTable(SqoopOptions options) throws IOException, ImportEx } // If the user wants this table to be in Hive, perform that post-load. - if (options.doHiveImport()) { - // For Parquet file, the import action will create hive table directly via - // kite. So there is no need to do hive import as a post step again. - if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) { - HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager); - hiveClient.importTable(); - } + if (isHiveImportNeeded(options)) { + HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager); + hiveClient.importTable(); } saveIncrementalState(options); @@ -1192,5 +1189,18 @@ public void validateOptions(SqoopOptions options) validateHCatalogOptions(options); validateAccumuloOptions(options); } + + private boolean isHiveImportNeeded(SqoopOptions options) { + if (!options.doHiveImport()) { + return false; + } + + if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) { + return true; + } + + ParquetJobConfiguratorFactory parquetJobConfigurator = getParquetJobConfigurator(options); + return parquetJobConfigurator.createParquetImportJobConfigurator().isHiveImportNeeded(); + } } diff --git a/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java b/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java index d8d3af40..adad0cc1 100644 --- a/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java +++ b/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java @@ -26,8 +26,6 @@ import org.junit.rules.ExpectedException; import parquet.hadoop.metadata.CompressionCodecName; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; @@ -157,13 +155,4 @@ private ArgumentArrayBuilder incrementalImportArgs(String connectString, String .withOption("merge-key", mergeKey) .withOption("last-value", lastValue); } - - private static long timeFromString(String timeStampString) { - try { - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - return format.parse(timeStampString).getTime(); - } catch (ParseException e) { - throw new RuntimeException(e); - } - } } diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java b/src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java new file mode 100644 index 00000000..b55179a4 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.hive.minicluster.HiveMiniCluster; +import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration; +import org.apache.sqoop.testutil.ArgumentArrayBuilder; +import org.apache.sqoop.testutil.HiveServer2TestUtil; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.util.ParquetReader; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Arrays.deepEquals; +import static org.apache.sqoop.testutil.BaseSqoopTestCase.timeFromString; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +@RunWith(Enclosed.class) +public class TestHiveServer2ParquetImport { + + private static final String[] TEST_COLUMN_NAMES = {"C1_VARCHAR", "C2#INTEGER", "3C_CHAR"}; + + private static final String[] TEST_COLUMN_TYPES = {"VARCHAR(32)", "INTEGER", "CHAR(64)"}; + + private static final String[] TEST_COLUMN_ALL_TYPES = {"INTEGER", "BIGINT", "DOUBLE", "DECIMAL(10, 2)", "BOOLEAN", "TIMESTAMP", "BINARY", "VARCHAR(100)", "CHAR(100)"}; + + private static final List TEST_COLUMN_ALL_TYPES_VALUES = Arrays.asList(10, 12345678910123L, 12.34, 456842.45, "TRUE", "2018-06-14 15:00:00.000", "abcdef", "testVarchar", "testChar"); + + private static final Object[] EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES = {10, 12345678910123L, 12.34, "456842.45", true, timeFromString("2018-06-14 15:00:00.000"), decodeHex("abcdef"), "testVarchar", "testChar"}; + + private static final List TEST_COLUMN_VALUES = Arrays.asList("test", 42, "somestring"); + + private static final List TEST_COLUMN_VALUES_MAPPED = Arrays.asList("test", "42", "somestring"); + + private static final List TEST_COLUMN_VALUES_LINE2 = Arrays.asList("test2", 4242, "somestring2"); + + private static HiveMiniCluster hiveMiniCluster; + + private static HiveServer2TestUtil hiveServer2TestUtil; + + @RunWith(Parameterized.class) + public static class ParquetCompressionCodecTestCase extends ImportJobTestCase { + + @Parameters(name = "compressionCodec = {0}") + public static Iterable authenticationParameters() { + return Arrays.asList("snappy", "gzip"); + } + + @BeforeClass + public static void beforeClass() { + startHiveMiniCluster(); + } + + @AfterClass + public static void afterClass() { + stopHiveMiniCluster(); + } + + private final String compressionCodec; + + public ParquetCompressionCodecTestCase(String compressionCodec) { + this.compressionCodec = compressionCodec; + } + + @Override + @Before + public void setUp() { + super.setUp(); + + createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES); + } + + @Test + public void testHiveImportAsParquetWithCompressionCodecCanBeLoaded() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("compression-codec", compressionCodec) + .build(); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertThat(rows, hasItems(TEST_COLUMN_VALUES)); + } + + @Test + public void testImportedFilesHaveCorrectCodec() throws Exception { + Path tablePath = new Path(hiveMiniCluster.getTempFolderPath() + "/" + getTableName().toLowerCase()); + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("compression-codec", compressionCodec) + .build(); + + runImport(args); + + CompressionCodecName codec = new ParquetReader(tablePath).getCodec(); + assertEquals(compressionCodec, codec.name().toLowerCase()); + } + } + + public static class GeneralParquetTestCase extends ImportJobTestCase { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @BeforeClass + public static void beforeClass() { + startHiveMiniCluster(); + } + + @AfterClass + public static void afterClass() { + stopHiveMiniCluster(); + } + + @Override + @Before + public void setUp() { + super.setUp(); + + createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES); + } + + @Test + public void testNormalHiveImportAsParquet() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()).build(); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertThat(rows, hasItems(TEST_COLUMN_VALUES)); + } + + @Test + public void testHiveImportAsParquetWithMapColumnJavaAndOriginalColumnNameSucceeds() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("map-column-java", "C2#INTEGER=String") + .build(); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertThat(rows, hasItems(TEST_COLUMN_VALUES_MAPPED)); + } + + /** + * This test case documents that the Avro identifier(C2_INTEGER) + * of a special column name(C2#INTEGER) cannot be used in map-column-java. + * The reason is that org.apache.sqoop.orm.AvroSchemaGenerator#toAvroType(java.lang.String, int) + * which maps the Avro schema type uses the original column name and + * not the Avro identifier but org.apache.sqoop.orm.ClassWriter#toJavaType(java.lang.String, int) + * can map the DAO class field types based on the Avro identifier too so there will be a discrepancy + * between the generated Avro schema types and the DAO class field types. + */ + @Test + public void testHiveImportAsParquetWithMapColumnJavaAndAvroIdentifierFails() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("map-column-java", "C2_INTEGER=String") + .build(); + + expectedException.expect(IOException.class); + runImport(args); + } + + /** + * This test case documents that a mapping with the Avro identifier(C2_INTEGER) + * of a special column name(C2#INTEGER) is ignored in map-column-hive. + * The reason is that the column type of the Avro schema and the Hive table must + * be equal and if we would be able to override the Hive column type using map-column-hive + * the inconsistency would cause a Hive error during reading. + */ + @Test + public void testHiveImportAsParquetWithMapColumnHiveAndAvroIdentifierIgnoresMapping() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("map-column-hive", "C2_INTEGER=STRING") + .build(); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertThat(rows, hasItems(TEST_COLUMN_VALUES)); + } + + /** + * This test case documents that the special column name(C2#INTEGER) + * cannot be used in map-column-hive. + * The reason is that Sqoop uses the Avro identifier(C2_INTEGER) as Hive column + * name and there is a check in org.apache.sqoop.hive.TableDefWriter#getCreateTableStmt() + * which verifies that all the columns in map-column-hive are actually valid column names. + * Since C2_INTEGER is used instead of C2#INTEGER the check will fail on the latter. + */ + @Test + public void testHiveImportAsParquetWithMapColumnHiveAndOriginalColumnNameFails() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("map-column-hive", "C2#INTEGER=STRING") + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("No column by the name C2#INTEGERfound while importing data"); + + runImportThrowingException(args); + } + + @Test + public void testAllDataTypesHiveImportAsParquet() throws Exception { + setCurTableName("all_datatypes_table"); + createTableWithColTypes(TEST_COLUMN_ALL_TYPES, TEST_COLUMN_ALL_TYPES_VALUES); + String[] args = commonArgs(getConnectString(), getTableName()).build(); + + runImport(args); + + // The result contains a byte[] so we have to use Arrays.deepEquals() to assert. + Object[] firstRow = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()).iterator().next().toArray(); + assertTrue(deepEquals(EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES, firstRow)); + } + + @Test + public void testAppendHiveImportAsParquet() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()).build(); + + runImport(args); + + insertIntoTable(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertThat(rows, hasItems(TEST_COLUMN_VALUES, TEST_COLUMN_VALUES_LINE2)); + } + + @Test + public void testCreateOverwriteHiveImportAsParquet() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("hive-overwrite") + .build(); + + runImport(args); + + // Recreate the test table to contain different test data. + dropTableIfExists(getTableName()); + createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertEquals(asList(TEST_COLUMN_VALUES_LINE2), rows); + } + + /** + * --create-hive-table option is now supported with the Hadoop Parquet writer implementation. + */ + @Test + public void testCreateHiveImportAsParquet() throws Exception { + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("create-hive-table") + .build(); + + runImport(args); + + expectedException.expectMessage("Error executing Hive import."); + runImportThrowingException(args); + } + + /** + * This scenario works fine since the Hadoop Parquet writer implementation does not + * check the Parquet schema of the existing files. The exception will be thrown + * by Hive when it tries to read the files with different schema. + */ + @Test + public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception { + String hiveTableName = "hiveImportAsParquetWhenTableExistsWithIncompatibleSchema"; + String[] incompatibleSchemaTableTypes = {"INTEGER", "INTEGER", "INTEGER"}; + List incompatibleSchemaTableData = Arrays.asList(100, 200, 300); + + String[] args = commonArgs(getConnectString(), getTableName()) + .withOption("hive-table", hiveTableName) + .build(); + + runImport(args); + + // We make sure we create a new table in the test RDBMS. + incrementTableNum(); + createTableWithColTypes(incompatibleSchemaTableTypes, incompatibleSchemaTableData); + + // Recreate the argument array to pick up the new RDBMS table name. + args = commonArgs(getConnectString(), getTableName()) + .withOption("hive-table", hiveTableName) + .build(); + + runImport(args); + } + + } + + private static ArgumentArrayBuilder commonArgs(String connectString, String tableName) { + return new ArgumentArrayBuilder() + .withProperty("parquetjob.configurator.implementation", "hadoop") + .withOption("connect", connectString) + .withOption("table", tableName) + .withOption("hive-import") + .withOption("hs2-url", hiveMiniCluster.getUrl()) + .withOption("num-mappers", "1") + .withOption("as-parquetfile") + .withOption("delete-target-dir"); + } + + public static void startHiveMiniCluster() { + hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration()); + hiveMiniCluster.start(); + hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl()); + } + + public static void stopHiveMiniCluster() { + hiveMiniCluster.stop(); + } + + private static byte[] decodeHex(String hexString) { + try { + return Hex.decodeHex(hexString.toCharArray()); + } catch (DecoderException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java b/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java index 3d115ab3..410724f3 100644 --- a/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java +++ b/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java @@ -75,6 +75,7 @@ public void testImport() throws Exception { .withOption("hive-import") .withOption("hs2-url", hiveMiniCluster.getUrl()) .withOption("split-by", getColName(1)) + .withOption("delete-target-dir") .build(); runImport(args); diff --git a/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java b/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java new file mode 100644 index 00000000..276e9eaa --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.avro.Schema; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; + +import static org.apache.sqoop.hive.HiveTypes.toHiveType; +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestHiveTypesForAvroTypeMapping { + + private final String hiveType; + private final Schema.Type avroType; + + @Parameters(name = "hiveType = {0}, avroType = {1}") + public static Iterable parameters() { + return Arrays.asList( + new Object[] {"BOOLEAN", Schema.Type.BOOLEAN}, + new Object[] {"INT", Schema.Type.INT}, + new Object[] {"BIGINT", Schema.Type.LONG}, + new Object[] {"FLOAT", Schema.Type.FLOAT}, + new Object[] {"DOUBLE", Schema.Type.DOUBLE}, + new Object[] {"STRING", Schema.Type.ENUM}, + new Object[] {"STRING", Schema.Type.STRING}, + new Object[] {"BINARY", Schema.Type.BYTES}, + new Object[] {"BINARY", Schema.Type.FIXED}); + } + + public TestHiveTypesForAvroTypeMapping(String hiveType, Schema.Type avroType) { + this.hiveType = hiveType; + this.avroType = avroType; + } + + @Test + public void testAvroTypeToHiveTypeMapping() throws Exception { + assertEquals(hiveType, toHiveType(avroType)); + } +} diff --git a/src/test/org/apache/sqoop/hive/TestTableDefWriter.java b/src/test/org/apache/sqoop/hive/TestTableDefWriter.java index 3ea61f64..626ad22f 100644 --- a/src/test/org/apache/sqoop/hive/TestTableDefWriter.java +++ b/src/test/org/apache/sqoop/hive/TestTableDefWriter.java @@ -36,6 +36,8 @@ import java.sql.Types; +import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile; +import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -51,6 +53,10 @@ */ public class TestTableDefWriter { + private static final String TEST_AVRO_SCHEMA = "{\"type\":\"record\",\"name\":\"IMPORT_TABLE_1\",\"fields\":[{\"name\":\"C1_VARCHAR\",\"type\":[\"null\",\"string\"]},{\"name\":\"C2_INTEGER\",\"type\":[\"null\",\"int\"]},{\"name\":\"_3C_CHAR\",\"type\":[\"null\",\"string\"]}]}"; + + private static final String EXPECTED_CREATE_PARQUET_TABLE_STMNT = "CREATE TABLE IF NOT EXISTS `outputTable` ( `C1_VARCHAR` STRING, `C2_INTEGER` INT, `_3C_CHAR` STRING) STORED AS PARQUET"; + public static final Log LOG = LogFactory.getLog( TestTableDefWriter.class.getName()); @@ -256,6 +262,14 @@ public void testGetCreateTableStmtDiscardsConnection() throws Exception { verify(connManager).discardConnection(true); } + @Test + public void testGetCreateTableStmtWithAvroSchema() throws Exception { + options.setFileLayout(ParquetFile); + options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, TEST_AVRO_SCHEMA); + + assertEquals(EXPECTED_CREATE_PARQUET_TABLE_STMNT, writer.getCreateTableStmt()); + } + private void setUpMockConnManager(String tableName, Map typeMap) { when(connManager.getColumnTypes(tableName)).thenReturn(typeMap); when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{})); diff --git a/src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java b/src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java index ac6db0b1..1730698e 100644 --- a/src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java @@ -41,6 +41,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; @@ -322,6 +324,10 @@ protected void createTableWithColTypesAndNames(String[] colNames, createTableWithColTypesAndNames(getTableName(), colNames, colTypes, vals); } + protected void createTableWithColTypesAndNames(String[] colNames, String[] colTypes, List record) { + createTableWithColTypesAndNames(getTableName(), colNames, colTypes, toStringArray(record)); + } + /** * Create a table with a set of columns with their names and add a row of values. * @param newTableName The name of the new table @@ -439,6 +445,10 @@ protected void insertRecordsIntoTable(String[] colTypes, List> reco } } + protected void insertIntoTable(String[] columns, String[] colTypes, List record) { + insertIntoTable(columns, colTypes, toStringArray(record)); + } + protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) { assert colTypes != null; assert colTypes.length == vals.length; @@ -674,4 +684,13 @@ private String[] toStringArray(List columnValues) { return result; } + + public static long timeFromString(String timeStampString) { + try { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + return format.parse(timeStampString).getTime(); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java b/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java index 4d3f9389..ed4b5a49 100644 --- a/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java +++ b/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java @@ -137,16 +137,4 @@ public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImportAndHs2UserB sqoopTool.validateOptions(sqoopOptions); } - @Test - public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception { - expectedException.expect(SqoopOptions.InvalidOptionsException.class); - expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option."); - - when(sqoopOptions.doHiveImport()).thenReturn(true); - when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); - when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile); - - sqoopTool.validateOptions(sqoopOptions); - } - }