From 00a02dec2f7507f813ee4899096c470ba1112a9e Mon Sep 17 00:00:00 2001 From: Szabolcs Vasas Date: Wed, 14 Nov 2018 11:29:02 +0100 Subject: [PATCH] SQOOP-3382: Add parquet numeric support for Parquet in hdfs import (Fero Szabo via Szabolcs Vasas) --- .../sqoop/config/ConfigurationConstants.java | 5 + .../sqoop/mapreduce/ParquetImportMapper.java | 8 +- .../HadoopParquetImportJobConfigurator.java | 14 ++ .../apache/sqoop/orm/AvroSchemaGenerator.java | 20 +- ...sTest.java => NumericTypesImportTest.java} | 197 ++++++++++++++---- .../sqoop/importjob/SplitByImportTest.java | 8 +- .../configuration/AvroTestConfiguration.java | 24 +++ ...ericImportJobSplitByTestConfiguration.java | 5 +- .../ImportJobTestConfiguration.java | 4 +- ...MSSQLServerImportJobTestConfiguration.java | 16 +- .../MySQLImportJobTestConfiguration.java | 17 +- .../OracleImportJobTestConfiguration.java | 16 +- ...leImportJobTestConfigurationForNumber.java | 16 +- .../ParquetTestConfiguration.java | 24 +++ ...lImportJobTestConfigurationForNumeric.java | 16 +- ...TestConfigurationPaddingShouldSucceed.java | 16 +- .../org/apache/sqoop/util/ParquetReader.java | 44 ++-- 17 files changed, 352 insertions(+), 98 deletions(-) rename src/test/org/apache/sqoop/importjob/{avro/AvroImportForNumericTypesTest.java => NumericTypesImportTest.java} (52%) create mode 100644 src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java rename src/test/org/apache/sqoop/importjob/{ => configuration}/ImportJobTestConfiguration.java (93%) rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/MSSQLServerImportJobTestConfiguration.java (81%) rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/MySQLImportJobTestConfiguration.java (81%) rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/OracleImportJobTestConfiguration.java (82%) rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/OracleImportJobTestConfigurationForNumber.java (83%) create mode 100644 src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/PostgresqlImportJobTestConfigurationForNumeric.java (82%) rename src/test/org/apache/sqoop/importjob/{avro => }/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java (80%) diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java index 3724f250..75928461 100644 --- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java +++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java @@ -96,6 +96,11 @@ public final class ConfigurationConstants { */ public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable"; + /** + * Enable parquet logical types (decimal support only). + */ + public static final String PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL = "sqoop.parquet.logical_types.decimal.enable"; + /** * Default precision for avro schema */ diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java index 62334f8a..b386079f 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java @@ -18,6 +18,9 @@ package org.apache.sqoop.mapreduce; +import org.apache.avro.Conversions; +import org.apache.avro.generic.GenericData; +import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.lib.LargeObjectLoader; import org.apache.sqoop.lib.SqoopRecord; import org.apache.avro.Schema; @@ -39,6 +42,7 @@ public abstract class ParquetImportMapper private Schema schema = null; private boolean bigDecimalFormatString = true; private LargeObjectLoader lobLoader = null; + private boolean bigDecimalPadding; @Override protected void setup(Context context) @@ -49,6 +53,8 @@ protected void setup(Context context) ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); lobLoader = createLobLoader(context); + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); + bigDecimalPadding = conf.getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_DECIMAL_PADDING, false); } @Override @@ -62,7 +68,7 @@ protected void map(LongWritable key, SqoopRecord val, Context context) } GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema, - bigDecimalFormatString); + bigDecimalFormatString, bigDecimalPadding); write(context, record); } diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java index e8215430..aa9740b1 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java @@ -25,7 +25,9 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.parquet.avro.GenericDataSupplier; import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.config.ConfigurationConstants; import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; import org.apache.parquet.avro.AvroParquetOutputFormat; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -46,6 +48,18 @@ public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfi public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException { configureAvroSchema(job, schema); configureOutputCodec(job); + configureLogicalTypeSupport(job, options); + } + + /** + * Configurations needed for logical types, i.e. decimal in parquet. + * @param job + * @param options + */ + private void configureLogicalTypeSupport(Job job, SqoopOptions options) { + if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false)) { + AvroParquetOutputFormat.setAvroDataSupplier(job, GenericDataSupplier.class); + } } @Override diff --git a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java index 7a2a5f9c..05ac46c0 100644 --- a/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java +++ b/src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java @@ -39,6 +39,9 @@ import org.apache.sqoop.config.ConfigurationConstants; import org.codehaus.jackson.node.NullNode; +import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile; +import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile; + /** * Creates an Avro schema to represent a table from a database. */ @@ -126,8 +129,7 @@ public Schema generate(String schemaNameOverride) throws IOException { public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) { List childSchemas = new ArrayList(); childSchemas.add(Schema.create(Schema.Type.NULL)); - if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false) - && isLogicalType(sqlType)) { + if (isLogicalTypeConversionEnabled() && isLogicalType(sqlType)) { childSchemas.add( toAvroLogicalType(columnName, sqlType, precision, scale) .addToSchema(Schema.create(Type.BYTES)) @@ -138,6 +140,20 @@ && isLogicalType(sqlType)) { return Schema.createUnion(childSchemas); } + /** + * @return True if this is a parquet import and parquet logical types are enabled, + * or if this is an avro import and avro logical types are enabled. False otherwise. + */ + private boolean isLogicalTypeConversionEnabled() { + if (ParquetFile.equals(options.getFileLayout())) { + return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false); + } + else if (AvroDataFile.equals(options.getFileLayout())) { + return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false); + } + return false; + } + public Schema toAvroSchema(int sqlType) { return toAvroSchema(sqlType, null, null, null); } diff --git a/src/test/org/apache/sqoop/importjob/avro/AvroImportForNumericTypesTest.java b/src/test/org/apache/sqoop/importjob/NumericTypesImportTest.java similarity index 52% rename from src/test/org/apache/sqoop/importjob/avro/AvroImportForNumericTypesTest.java rename to src/test/org/apache/sqoop/importjob/NumericTypesImportTest.java index ff13dc3b..0714441c 100644 --- a/src/test/org/apache/sqoop/importjob/avro/AvroImportForNumericTypesTest.java +++ b/src/test/org/apache/sqoop/importjob/NumericTypesImportTest.java @@ -16,13 +16,21 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro; +package org.apache.sqoop.importjob; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; import org.apache.sqoop.SqoopOptions; -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +import org.apache.sqoop.importjob.configuration.AvroTestConfiguration; +import org.apache.sqoop.importjob.configuration.MSSQLServerImportJobTestConfiguration; +import org.apache.sqoop.importjob.configuration.MySQLImportJobTestConfiguration; +import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration; +import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration; +import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric; import org.apache.sqoop.testutil.ArgumentArrayBuilder; import org.apache.sqoop.testutil.AvroTestUtils; import org.apache.sqoop.testutil.ImportJobTestCase; @@ -31,12 +39,9 @@ import org.apache.sqoop.testutil.adapter.MySqlDatabaseAdapter; import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter; import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter; -import org.apache.sqoop.importjob.avro.configuration.MSSQLServerImportJobTestConfiguration; -import org.apache.sqoop.importjob.avro.configuration.MySQLImportJobTestConfiguration; -import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfigurationForNumber; -import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfiguration; -import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationForNumeric; -import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed; +import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber; +import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed; +import org.apache.sqoop.util.ParquetReader; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -51,6 +56,10 @@ import java.util.Arrays; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile; +import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile; + @RunWith(Parameterized.class) /** * This test covers the behavior of the Avro import for fixed point decimal types, i.e. NUMBER, NUMERIC @@ -59,21 +68,20 @@ * Oracle and Postgres store numbers without padding, while other DBs store them padded with 0s. * * The features tested here affect two phases in Sqoop: - * 1. Avro schema generation + * 1. Avro schema generation during avro and parquet import * Default precision and scale are used here to avoid issues with Oracle and Postgres, as these * don't return valid precision and scale if they weren't specified in the table DDL. * - * 2. Avro import: padding. + * 2. Decimal padding during avro or parquet import * In case of Oracle and Postgres, Sqoop has to pad the values with 0s to avoid errors. */ -public class AvroImportForNumericTypesTest extends ImportJobTestCase { +public class NumericTypesImportTest extends ImportJobTestCase { - public static final Log LOG = LogFactory.getLog( - AvroImportForNumericTypesTest.class.getName()); + public static final Log LOG = LogFactory.getLog(NumericTypesImportTest.class.getName()); private Configuration conf = new Configuration(); - private final ImportJobTestConfiguration configuration; + private final T configuration; private final DatabaseAdapter adapter; private final boolean failWithoutExtraArgs; private final boolean failWithPadding; @@ -86,6 +94,7 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase { // Constants for the test case that has padding specified but not default precision and scale. private final static boolean SUCCEED_WITH_PADDING_ONLY = false; private final static boolean FAIL_WITH_PADDING_ONLY = true; + private Path tableDirPath; @Parameters(name = "Adapter: {0}| Config: {1}| failWithoutExtraArgs: {2}| failWithPadding: {3}") public static Iterable testConfigurations() { @@ -101,7 +110,7 @@ public static Iterable testConfigurations() { ); } - public AvroImportForNumericTypesTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) { + public NumericTypesImportTest(DatabaseAdapter adapter, T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) { this.adapter = adapter; this.configuration = configuration; this.failWithoutExtraArgs = failWithoutExtraArgs; @@ -148,6 +157,7 @@ public void setUp() { for (String[] input : inputData) { insertIntoTable(names, types, input); } + tableDirPath = new Path(getWarehouseDir() + "/" + getTableName()); } @After @@ -160,51 +170,158 @@ public void tearDown() { super.tearDown(); } - private ArgumentArrayBuilder getArgsBuilder() { - ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this); - builder.withOption("connect", getConnectString()); - return builder; + private ArgumentArrayBuilder getArgsBuilder(SqoopOptions.FileLayout fileLayout) { + ArgumentArrayBuilder builder = new ArgumentArrayBuilder(); + if (AvroDataFile.equals(fileLayout)) { + builder.withOption("as-avrodatafile"); + } + else if (ParquetFile.equals(fileLayout)) { + builder.withOption("as-parquetfile"); + } + + return builder.withCommonHadoopFlags(true) + .withOption("warehouse-dir", getWarehouseDir()) + .withOption("num-mappers", "1") + .withOption("table", getTableName()) + .withOption("connect", getConnectString()); + } + + /** + * Adds properties to the given arg builder for decimal precision and scale. + * @param builder + */ + private void addPrecisionAndScale(ArgumentArrayBuilder builder) { + builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38"); + builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3"); + } + + /** + * Enables padding for decimals in avro and parquet import. + * @param builder + */ + private void addPadding(ArgumentArrayBuilder builder) { + builder.withProperty("sqoop.avro.decimal_padding.enable", "true"); + } + + private void addEnableAvroDecimal(ArgumentArrayBuilder builder) { + builder.withProperty("sqoop.avro.logical_types.decimal.enable", "true"); + } + + private void addEnableParquetDecimal(ArgumentArrayBuilder builder) { + builder.withProperty("sqoop.parquet.logical_types.decimal.enable", "true"); + } + + private void configureJunitToExpectFailure(boolean failWithPadding) { + if (failWithPadding) { + thrown.expect(IOException.class); + thrown.expectMessage("Failure during job; return status 1"); + } } @Test public void testAvroImportWithoutPadding() throws IOException { - if (failWithoutExtraArgs) { - thrown.expect(IOException.class); - thrown.expectMessage("Failure during job; return status 1"); - } - String[] args = getArgsBuilder().build(); + configureJunitToExpectFailure(failWithoutExtraArgs); + ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile); + addEnableAvroDecimal(builder); + String[] args = builder.build(); runImport(args); if (!failWithoutExtraArgs) { - verify(); + verify(AvroDataFile); } } @Test public void testAvroImportWithPadding() throws IOException { - if (failWithPadding) { - thrown.expect(IOException.class); - thrown.expectMessage("Failure during job; return status 1"); - } - ArgumentArrayBuilder builder = getArgsBuilder(); - builder.withProperty("sqoop.avro.decimal_padding.enable", "true"); + configureJunitToExpectFailure(failWithPadding); + ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile); + addEnableAvroDecimal(builder); + addPadding(builder); runImport(builder.build()); if (!failWithPadding) { - verify(); + verify(AvroDataFile); } } @Test public void testAvroImportWithDefaultPrecisionAndScale() throws IOException { - ArgumentArrayBuilder builder = getArgsBuilder(); - builder.withProperty("sqoop.avro.decimal_padding.enable", "true"); - builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38"); - builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3"); + ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile); + addEnableAvroDecimal(builder); + addPadding(builder); + addPrecisionAndScale(builder); runImport(builder.build()); - verify(); + verify(AvroDataFile); } - private void verify() { - AvroTestUtils.registerDecimalConversionUsageForVerification(); - AvroTestUtils.verify(configuration.getExpectedResults(), getConf(), getTablePath()); + @Test + public void testParquetImportWithoutPadding() throws IOException { + configureJunitToExpectFailure(failWithoutExtraArgs); + ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile); + addEnableParquetDecimal(builder); + String[] args = builder.build(); + runImport(args); + if (!failWithoutExtraArgs) { + verify(ParquetFile); + } + } + + @Test + public void testParquetImportWithPadding() throws IOException { + configureJunitToExpectFailure(failWithPadding); + ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile); + addEnableParquetDecimal(builder); + addPadding(builder); + runImport(builder.build()); + if (!failWithPadding) { + verify(ParquetFile); + } + } + + @Test + public void testParquetImportWithDefaultPrecisionAndScale() throws IOException { + ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile); + addEnableParquetDecimal(builder); + addPadding(builder); + addPrecisionAndScale(builder); + runImport(builder.build()); + verify(ParquetFile); + } + + private void verify(SqoopOptions.FileLayout fileLayout) { + if (AvroDataFile.equals(fileLayout)) { + AvroTestUtils.registerDecimalConversionUsageForVerification(); + AvroTestUtils.verify(configuration.getExpectedResultsForAvro(), getConf(), getTablePath()); + } else if (ParquetFile.equals(fileLayout)) { + verifyParquetFile(); + } + } + + private void verifyParquetFile() { + verifyParquetSchema(); + verifyParquetContent(); + } + + private void verifyParquetContent() { + ParquetReader reader = new ParquetReader(tableDirPath); + assertEquals(Arrays.asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted()); + } + + private void verifyParquetSchema() { + ParquetReader reader = new ParquetReader(tableDirPath); + MessageType parquetSchema = reader.readParquetSchema(); + + String[] types = configuration.getTypes(); + for (int i = 0; i < types.length; i ++) { + String type = types[i]; + if (isNumericSqlType(type)) { + OriginalType parquetFieldType = parquetSchema.getFields().get(i).getOriginalType(); + assertEquals(OriginalType.DECIMAL, parquetFieldType); + } + } + } + + private boolean isNumericSqlType(String type) { + return type.toUpperCase().startsWith("DECIMAL") + || type.toUpperCase().startsWith("NUMBER") + || type.toUpperCase().startsWith("NUMERIC"); } } diff --git a/src/test/org/apache/sqoop/importjob/SplitByImportTest.java b/src/test/org/apache/sqoop/importjob/SplitByImportTest.java index 7977c0b0..c6fe4f2e 100644 --- a/src/test/org/apache/sqoop/importjob/SplitByImportTest.java +++ b/src/test/org/apache/sqoop/importjob/SplitByImportTest.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.importjob.configuration.GenericImportJobSplitByTestConfiguration; +import org.apache.sqoop.importjob.configuration.ImportJobTestConfiguration; +import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration; import org.apache.sqoop.testutil.ArgumentArrayBuilder; import org.apache.sqoop.testutil.ImportJobTestCase; import org.apache.sqoop.testutil.adapter.DatabaseAdapter; @@ -55,7 +57,7 @@ public class SplitByImportTest extends ImportJobTestCase { private Configuration conf = new Configuration(); - private final ImportJobTestConfiguration configuration; + private final ParquetTestConfiguration configuration; private final DatabaseAdapter adapter; @Parameters(name = "Adapter: {0}| Config: {1}") @@ -69,7 +71,7 @@ public static Iterable testConfigurations() { ); } - public SplitByImportTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration) { + public SplitByImportTest(DatabaseAdapter adapter, ParquetTestConfiguration configuration) { this.adapter = adapter; this.configuration = configuration; } @@ -148,6 +150,6 @@ public void testSplitBy() throws IOException { private void verifyParquetFile() { ParquetReader reader = new ParquetReader(new Path(getWarehouseDir() + "/" + getTableName()), getConf()); - assertEquals(asList(configuration.getExpectedResults()), reader.readAllInCsvSorted()); + assertEquals(asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted()); } } diff --git a/src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java new file mode 100644 index 00000000..10088996 --- /dev/null +++ b/src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.importjob.configuration; + +public interface AvroTestConfiguration extends ImportJobTestConfiguration { + + String[] getExpectedResultsForAvro(); +} diff --git a/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java index f137b56b..e99b526e 100644 --- a/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java +++ b/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java @@ -19,7 +19,6 @@ package org.apache.sqoop.importjob.configuration; import org.apache.commons.lang3.StringUtils; -import org.apache.sqoop.importjob.ImportJobTestConfiguration; import java.util.ArrayList; import java.util.List; @@ -28,7 +27,7 @@ * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s, * therefore when importing into avro, one has to use the padding feature. */ -public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration { +public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration, ParquetTestConfiguration { public static final String NAME_COLUMN = "NAME"; public static final char SEPARATOR = ','; @@ -65,7 +64,7 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForParquet() { return data.stream() .map(element -> StringUtils.join(element, SEPARATOR)) .toArray(String[]::new); diff --git a/src/test/org/apache/sqoop/importjob/ImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/ImportJobTestConfiguration.java similarity index 93% rename from src/test/org/apache/sqoop/importjob/ImportJobTestConfiguration.java rename to src/test/org/apache/sqoop/importjob/configuration/ImportJobTestConfiguration.java index 14de910b..a57f2f51 100644 --- a/src/test/org/apache/sqoop/importjob/ImportJobTestConfiguration.java +++ b/src/test/org/apache/sqoop/importjob/configuration/ImportJobTestConfiguration.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.sqoop.importjob; +package org.apache.sqoop.importjob.configuration; import java.util.List; @@ -27,6 +27,4 @@ public interface ImportJobTestConfiguration { String[] getNames(); List getSampleData(); - - String[] getExpectedResults(); } diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/MSSQLServerImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/MSSQLServerImportJobTestConfiguration.java similarity index 81% rename from src/test/org/apache/sqoop/importjob/avro/configuration/MSSQLServerImportJobTestConfiguration.java rename to src/test/org/apache/sqoop/importjob/configuration/MSSQLServerImportJobTestConfiguration.java index 182d2967..4ad7defe 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/MSSQLServerImportJobTestConfiguration.java +++ b/src/test/org/apache/sqoop/importjob/configuration/MSSQLServerImportJobTestConfiguration.java @@ -16,14 +16,12 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; -public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration { +public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override public String[] getTypes() { @@ -47,7 +45,7 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " + "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}"; String[] expectedResult = new String[1]; @@ -55,6 +53,14 @@ public String[] getExpectedResults() { return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/MySQLImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/MySQLImportJobTestConfiguration.java similarity index 81% rename from src/test/org/apache/sqoop/importjob/avro/configuration/MySQLImportJobTestConfiguration.java rename to src/test/org/apache/sqoop/importjob/configuration/MySQLImportJobTestConfiguration.java index e9bf9912..fbcbdebe 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/MySQLImportJobTestConfiguration.java +++ b/src/test/org/apache/sqoop/importjob/configuration/MySQLImportJobTestConfiguration.java @@ -16,14 +16,12 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; -public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration { +public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override public String[] getTypes() { @@ -46,9 +44,8 @@ public List getSampleData() { return inputData; } - @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " + "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}"; String[] expectedResult = new String[1]; @@ -56,6 +53,14 @@ public String[] getExpectedResults() { return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java similarity index 82% rename from src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfiguration.java rename to src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java index b7bad08c..303a5239 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfiguration.java +++ b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; @@ -27,7 +25,7 @@ * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s, * therefore when importing into avro, one has to use the padding feature. */ -public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration { +public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override public String[] getTypes() { @@ -49,7 +47,7 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " + "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}"; String[] expectedResult = new String[1]; @@ -57,6 +55,14 @@ public String[] getExpectedResults() { return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfigurationForNumber.java b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java similarity index 83% rename from src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfigurationForNumber.java rename to src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java index 465e61f4..96dd0770 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfigurationForNumber.java +++ b/src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; @@ -30,7 +28,7 @@ * Therefore, NUMBER requires special treatment. * The user has to specify precision and scale when importing into avro. */ -public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration { +public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override @@ -51,13 +49,21 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.03000}"; String[] expectedResult = new String[1]; expectedResult[0] = expectedRecord; return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,100.010,100,100.03000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java new file mode 100644 index 00000000..3c161d19 --- /dev/null +++ b/src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.importjob.configuration; + +public interface ParquetTestConfiguration extends ImportJobTestConfiguration{ + + String[] getExpectedResultsForParquet(); +} diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationForNumeric.java b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java similarity index 82% rename from src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationForNumeric.java rename to src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java index 66715c17..8ba0bdc3 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationForNumeric.java +++ b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; @@ -28,7 +26,7 @@ * for precision and scale for NUMERIC. Also, important, that the accompanying columns * - NUMERIC(20) and NUMERIC(20, 5) don't get modified. */ -public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration { +public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override public String[] getTypes() { @@ -50,13 +48,21 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.01000}"; String[] expectedResult = new String[1]; expectedResult[0] = expectedRecord; return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,100.010,100,100.01000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java similarity index 80% rename from src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java rename to src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java index ec4db41b..45eaf04d 100644 --- a/src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java +++ b/src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java @@ -16,14 +16,12 @@ * limitations under the License. */ -package org.apache.sqoop.importjob.avro.configuration; - -import org.apache.sqoop.importjob.ImportJobTestConfiguration; +package org.apache.sqoop.importjob.configuration; import java.util.ArrayList; import java.util.List; -public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration { +public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration { @Override public String[] getTypes() { @@ -47,7 +45,7 @@ public List getSampleData() { } @Override - public String[] getExpectedResults() { + public String[] getExpectedResultsForAvro() { String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " + "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}"; String[] expectedResult = new String[1]; @@ -55,6 +53,14 @@ public String[] getExpectedResults() { return expectedResult; } + @Override + public String[] getExpectedResultsForParquet() { + String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000"; + String[] expectedResult = new String[1]; + expectedResult[0] = expectedRecord; + return expectedResult; + } + @Override public String toString() { return getClass().getSimpleName(); diff --git a/src/test/org/apache/sqoop/util/ParquetReader.java b/src/test/org/apache/sqoop/util/ParquetReader.java index 908ce566..727be583 100644 --- a/src/test/org/apache/sqoop/util/ParquetReader.java +++ b/src/test/org/apache/sqoop/util/ParquetReader.java @@ -18,6 +18,8 @@ package org.apache.sqoop.util; +import org.apache.avro.Conversions; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -29,7 +31,9 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HiddenFileFilter; +import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayDeque; @@ -65,7 +69,7 @@ public ParquetReader(Path pathToRead) { this(pathToRead, new Configuration()); } - public GenericRecord next() throws IOException { + private GenericRecord next() throws IOException { GenericRecord result = reader.read(); if (result != null) { return result; @@ -113,29 +117,38 @@ public List readAllInCsvSorted() { } public CompressionCodecName getCodec() { - List