5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 20:09:32 +08:00

SQOOP-3382: Add parquet numeric support for Parquet in hdfs import

(Fero Szabo via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-11-14 11:29:02 +01:00
parent 6dd6a4fc86
commit 00a02dec2f
17 changed files with 352 additions and 98 deletions

View File

@ -96,6 +96,11 @@ public final class ConfigurationConstants {
*/
public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
/**
* Enable parquet logical types (decimal support only).
*/
public static final String PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL = "sqoop.parquet.logical_types.decimal.enable";
/**
* Default precision for avro schema
*/

View File

@ -18,6 +18,9 @@
package org.apache.sqoop.mapreduce;
import org.apache.avro.Conversions;
import org.apache.avro.generic.GenericData;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.lib.LargeObjectLoader;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.avro.Schema;
@ -39,6 +42,7 @@ public abstract class ParquetImportMapper<KEYOUT, VALOUT>
private Schema schema = null;
private boolean bigDecimalFormatString = true;
private LargeObjectLoader lobLoader = null;
private boolean bigDecimalPadding;
@Override
protected void setup(Context context)
@ -49,6 +53,8 @@ protected void setup(Context context)
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
lobLoader = createLobLoader(context);
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
bigDecimalPadding = conf.getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_DECIMAL_PADDING, false);
}
@Override
@ -62,7 +68,7 @@ protected void map(LongWritable key, SqoopRecord val, Context context)
}
GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
bigDecimalFormatString);
bigDecimalFormatString, bigDecimalPadding);
write(context, record);
}

View File

@ -25,7 +25,9 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.parquet.avro.GenericDataSupplier;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetOutputFormat;
@ -46,6 +48,18 @@ public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfi
public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
configureAvroSchema(job, schema);
configureOutputCodec(job);
configureLogicalTypeSupport(job, options);
}
/**
* Configurations needed for logical types, i.e. decimal in parquet.
* @param job
* @param options
*/
private void configureLogicalTypeSupport(Job job, SqoopOptions options) {
if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false)) {
AvroParquetOutputFormat.setAvroDataSupplier(job, GenericDataSupplier.class);
}
}
@Override

View File

@ -39,6 +39,9 @@
import org.apache.sqoop.config.ConfigurationConstants;
import org.codehaus.jackson.node.NullNode;
import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile;
import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
/**
* Creates an Avro schema to represent a table from a database.
*/
@ -126,8 +129,7 @@ public Schema generate(String schemaNameOverride) throws IOException {
public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) {
List<Schema> childSchemas = new ArrayList<Schema>();
childSchemas.add(Schema.create(Schema.Type.NULL));
if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false)
&& isLogicalType(sqlType)) {
if (isLogicalTypeConversionEnabled() && isLogicalType(sqlType)) {
childSchemas.add(
toAvroLogicalType(columnName, sqlType, precision, scale)
.addToSchema(Schema.create(Type.BYTES))
@ -138,6 +140,20 @@ && isLogicalType(sqlType)) {
return Schema.createUnion(childSchemas);
}
/**
* @return True if this is a parquet import and parquet logical types are enabled,
* or if this is an avro import and avro logical types are enabled. False otherwise.
*/
private boolean isLogicalTypeConversionEnabled() {
if (ParquetFile.equals(options.getFileLayout())) {
return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false);
}
else if (AvroDataFile.equals(options.getFileLayout())) {
return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false);
}
return false;
}
public Schema toAvroSchema(int sqlType) {
return toAvroSchema(sqlType, null, null, null);
}

View File

@ -16,13 +16,21 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro;
package org.apache.sqoop.importjob;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
import org.apache.sqoop.importjob.configuration.MSSQLServerImportJobTestConfiguration;
import org.apache.sqoop.importjob.configuration.MySQLImportJobTestConfiguration;
import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration;
import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.AvroTestUtils;
import org.apache.sqoop.testutil.ImportJobTestCase;
@ -31,12 +39,9 @@
import org.apache.sqoop.testutil.adapter.MySqlDatabaseAdapter;
import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
import org.apache.sqoop.importjob.avro.configuration.MSSQLServerImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.MySQLImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfigurationForNumber;
import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationForNumeric;
import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber;
import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -51,6 +56,10 @@
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile;
import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
@RunWith(Parameterized.class)
/**
* This test covers the behavior of the Avro import for fixed point decimal types, i.e. NUMBER, NUMERIC
@ -59,21 +68,20 @@
* Oracle and Postgres store numbers without padding, while other DBs store them padded with 0s.
*
* The features tested here affect two phases in Sqoop:
* 1. Avro schema generation
* 1. Avro schema generation during avro and parquet import
* Default precision and scale are used here to avoid issues with Oracle and Postgres, as these
* don't return valid precision and scale if they weren't specified in the table DDL.
*
* 2. Avro import: padding.
* 2. Decimal padding during avro or parquet import
* In case of Oracle and Postgres, Sqoop has to pad the values with 0s to avoid errors.
*/
public class AvroImportForNumericTypesTest extends ImportJobTestCase {
public class NumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
AvroImportForNumericTypesTest.class.getName());
public static final Log LOG = LogFactory.getLog(NumericTypesImportTest.class.getName());
private Configuration conf = new Configuration();
private final ImportJobTestConfiguration configuration;
private final T configuration;
private final DatabaseAdapter adapter;
private final boolean failWithoutExtraArgs;
private final boolean failWithPadding;
@ -86,6 +94,7 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase {
// Constants for the test case that has padding specified but not default precision and scale.
private final static boolean SUCCEED_WITH_PADDING_ONLY = false;
private final static boolean FAIL_WITH_PADDING_ONLY = true;
private Path tableDirPath;
@Parameters(name = "Adapter: {0}| Config: {1}| failWithoutExtraArgs: {2}| failWithPadding: {3}")
public static Iterable<? extends Object> testConfigurations() {
@ -101,7 +110,7 @@ public static Iterable<? extends Object> testConfigurations() {
);
}
public AvroImportForNumericTypesTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
public NumericTypesImportTest(DatabaseAdapter adapter, T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
this.adapter = adapter;
this.configuration = configuration;
this.failWithoutExtraArgs = failWithoutExtraArgs;
@ -148,6 +157,7 @@ public void setUp() {
for (String[] input : inputData) {
insertIntoTable(names, types, input);
}
tableDirPath = new Path(getWarehouseDir() + "/" + getTableName());
}
@After
@ -160,51 +170,158 @@ public void tearDown() {
super.tearDown();
}
private ArgumentArrayBuilder getArgsBuilder() {
ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
builder.withOption("connect", getConnectString());
return builder;
private ArgumentArrayBuilder getArgsBuilder(SqoopOptions.FileLayout fileLayout) {
ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
if (AvroDataFile.equals(fileLayout)) {
builder.withOption("as-avrodatafile");
}
else if (ParquetFile.equals(fileLayout)) {
builder.withOption("as-parquetfile");
}
return builder.withCommonHadoopFlags(true)
.withOption("warehouse-dir", getWarehouseDir())
.withOption("num-mappers", "1")
.withOption("table", getTableName())
.withOption("connect", getConnectString());
}
/**
* Adds properties to the given arg builder for decimal precision and scale.
* @param builder
*/
private void addPrecisionAndScale(ArgumentArrayBuilder builder) {
builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
}
/**
* Enables padding for decimals in avro and parquet import.
* @param builder
*/
private void addPadding(ArgumentArrayBuilder builder) {
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
}
private void addEnableAvroDecimal(ArgumentArrayBuilder builder) {
builder.withProperty("sqoop.avro.logical_types.decimal.enable", "true");
}
private void addEnableParquetDecimal(ArgumentArrayBuilder builder) {
builder.withProperty("sqoop.parquet.logical_types.decimal.enable", "true");
}
private void configureJunitToExpectFailure(boolean failWithPadding) {
if (failWithPadding) {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
}
}
@Test
public void testAvroImportWithoutPadding() throws IOException {
if (failWithoutExtraArgs) {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
}
String[] args = getArgsBuilder().build();
configureJunitToExpectFailure(failWithoutExtraArgs);
ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
addEnableAvroDecimal(builder);
String[] args = builder.build();
runImport(args);
if (!failWithoutExtraArgs) {
verify();
verify(AvroDataFile);
}
}
@Test
public void testAvroImportWithPadding() throws IOException {
if (failWithPadding) {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
}
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
configureJunitToExpectFailure(failWithPadding);
ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
addEnableAvroDecimal(builder);
addPadding(builder);
runImport(builder.build());
if (!failWithPadding) {
verify();
verify(AvroDataFile);
}
}
@Test
public void testAvroImportWithDefaultPrecisionAndScale() throws IOException {
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
addEnableAvroDecimal(builder);
addPadding(builder);
addPrecisionAndScale(builder);
runImport(builder.build());
verify();
verify(AvroDataFile);
}
private void verify() {
AvroTestUtils.registerDecimalConversionUsageForVerification();
AvroTestUtils.verify(configuration.getExpectedResults(), getConf(), getTablePath());
@Test
public void testParquetImportWithoutPadding() throws IOException {
configureJunitToExpectFailure(failWithoutExtraArgs);
ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
addEnableParquetDecimal(builder);
String[] args = builder.build();
runImport(args);
if (!failWithoutExtraArgs) {
verify(ParquetFile);
}
}
@Test
public void testParquetImportWithPadding() throws IOException {
configureJunitToExpectFailure(failWithPadding);
ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
addEnableParquetDecimal(builder);
addPadding(builder);
runImport(builder.build());
if (!failWithPadding) {
verify(ParquetFile);
}
}
@Test
public void testParquetImportWithDefaultPrecisionAndScale() throws IOException {
ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
addEnableParquetDecimal(builder);
addPadding(builder);
addPrecisionAndScale(builder);
runImport(builder.build());
verify(ParquetFile);
}
private void verify(SqoopOptions.FileLayout fileLayout) {
if (AvroDataFile.equals(fileLayout)) {
AvroTestUtils.registerDecimalConversionUsageForVerification();
AvroTestUtils.verify(configuration.getExpectedResultsForAvro(), getConf(), getTablePath());
} else if (ParquetFile.equals(fileLayout)) {
verifyParquetFile();
}
}
private void verifyParquetFile() {
verifyParquetSchema();
verifyParquetContent();
}
private void verifyParquetContent() {
ParquetReader reader = new ParquetReader(tableDirPath);
assertEquals(Arrays.asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
}
private void verifyParquetSchema() {
ParquetReader reader = new ParquetReader(tableDirPath);
MessageType parquetSchema = reader.readParquetSchema();
String[] types = configuration.getTypes();
for (int i = 0; i < types.length; i ++) {
String type = types[i];
if (isNumericSqlType(type)) {
OriginalType parquetFieldType = parquetSchema.getFields().get(i).getOriginalType();
assertEquals(OriginalType.DECIMAL, parquetFieldType);
}
}
}
private boolean isNumericSqlType(String type) {
return type.toUpperCase().startsWith("DECIMAL")
|| type.toUpperCase().startsWith("NUMBER")
|| type.toUpperCase().startsWith("NUMERIC");
}
}

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.importjob.configuration.GenericImportJobSplitByTestConfiguration;
import org.apache.sqoop.importjob.configuration.ImportJobTestConfiguration;
import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
@ -55,7 +57,7 @@ public class SplitByImportTest extends ImportJobTestCase {
private Configuration conf = new Configuration();
private final ImportJobTestConfiguration configuration;
private final ParquetTestConfiguration configuration;
private final DatabaseAdapter adapter;
@Parameters(name = "Adapter: {0}| Config: {1}")
@ -69,7 +71,7 @@ public static Iterable<? extends Object> testConfigurations() {
);
}
public SplitByImportTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration) {
public SplitByImportTest(DatabaseAdapter adapter, ParquetTestConfiguration configuration) {
this.adapter = adapter;
this.configuration = configuration;
}
@ -148,6 +150,6 @@ public void testSplitBy() throws IOException {
private void verifyParquetFile() {
ParquetReader reader = new ParquetReader(new Path(getWarehouseDir() + "/" + getTableName()), getConf());
assertEquals(asList(configuration.getExpectedResults()), reader.readAllInCsvSorted());
assertEquals(asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.configuration;
public interface AvroTestConfiguration extends ImportJobTestConfiguration {
String[] getExpectedResultsForAvro();
}

View File

@ -19,7 +19,6 @@
package org.apache.sqoop.importjob.configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
@ -28,7 +27,7 @@
* This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
* therefore when importing into avro, one has to use the padding feature.
*/
public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration {
public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration, ParquetTestConfiguration {
public static final String NAME_COLUMN = "NAME";
public static final char SEPARATOR = ',';
@ -65,7 +64,7 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForParquet() {
return data.stream()
.map(element -> StringUtils.join(element, SEPARATOR))
.toArray(String[]::new);

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob;
package org.apache.sqoop.importjob.configuration;
import java.util.List;
@ -27,6 +27,4 @@ public interface ImportJobTestConfiguration {
String[] getNames();
List<String[]> getSampleData();
String[] getExpectedResults();
}

View File

@ -16,14 +16,12 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration {
public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
public String[] getTypes() {
@ -47,7 +45,7 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
@ -55,6 +53,14 @@ public String[] getExpectedResults() {
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -16,14 +16,12 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration {
public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
public String[] getTypes() {
@ -46,9 +44,8 @@ public List<String[]> getSampleData() {
return inputData;
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
@ -56,6 +53,14 @@ public String[] getExpectedResults() {
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -16,9 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
@ -27,7 +25,7 @@
* This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
* therefore when importing into avro, one has to use the padding feature.
*/
public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration {
public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
public String[] getTypes() {
@ -49,7 +47,7 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
@ -57,6 +55,14 @@ public String[] getExpectedResults() {
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -16,9 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
@ -30,7 +28,7 @@
* Therefore, NUMBER requires special treatment.
* The user has to specify precision and scale when importing into avro.
*/
public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration {
public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
@ -51,13 +49,21 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.03000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,100.010,100,100.03000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.configuration;
public interface ParquetTestConfiguration extends ImportJobTestConfiguration{
String[] getExpectedResultsForParquet();
}

View File

@ -16,9 +16,7 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
@ -28,7 +26,7 @@
* for precision and scale for NUMERIC. Also, important, that the accompanying columns
* - NUMERIC(20) and NUMERIC(20, 5) don't get modified.
*/
public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration {
public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
public String[] getTypes() {
@ -50,13 +48,21 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.01000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,100.010,100,100.01000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -16,14 +16,12 @@
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
package org.apache.sqoop.importjob.configuration;
import java.util.ArrayList;
import java.util.List;
public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration {
public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
@Override
public String[] getTypes() {
@ -47,7 +45,7 @@ public List<String[]> getSampleData() {
}
@Override
public String[] getExpectedResults() {
public String[] getExpectedResultsForAvro() {
String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
@ -55,6 +53,14 @@ public String[] getExpectedResults() {
return expectedResult;
}
@Override
public String[] getExpectedResultsForParquet() {
String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();

View File

@ -18,6 +18,8 @@
package org.apache.sqoop.util;
import org.apache.avro.Conversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -29,7 +31,9 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayDeque;
@ -65,7 +69,7 @@ public ParquetReader(Path pathToRead) {
this(pathToRead, new Configuration());
}
public GenericRecord next() throws IOException {
private GenericRecord next() throws IOException {
GenericRecord result = reader.read();
if (result != null) {
return result;
@ -113,29 +117,38 @@ public List<String> readAllInCsvSorted() {
}
public CompressionCodecName getCodec() {
List<Footer> footers = getFooters();
ParquetMetadata parquetMetadata = getParquetMetadata();
Iterator<Footer> footersIterator = footers.iterator();
if (footersIterator.hasNext()) {
Footer footer = footersIterator.next();
Iterator<BlockMetaData> blockMetaDataIterator = parquetMetadata.getBlocks().iterator();
if (blockMetaDataIterator.hasNext()) {
BlockMetaData blockMetaData = blockMetaDataIterator.next();
Iterator<BlockMetaData> blockMetaDataIterator = footer.getParquetMetadata().getBlocks().iterator();
if (blockMetaDataIterator.hasNext()) {
BlockMetaData blockMetaData = blockMetaDataIterator.next();
Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
if (columnChunkMetaDataIterator.hasNext()) {
ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
if (columnChunkMetaDataIterator.hasNext()) {
ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
return columnChunkMetaData.getCodec();
}
return columnChunkMetaData.getCodec();
}
}
return null;
}
public MessageType readParquetSchema() {
try {
ParquetMetadata parquetMetadata = getParquetMetadata();
return parquetMetadata.getFileMetaData().getSchema();
} finally {
close();
}
}
private ParquetMetadata getParquetMetadata() {
return getFooters().stream().findFirst().get().getParquetMetadata();
}
private List<Footer> getFooters() {
final List<Footer> footers;
try {
@ -163,7 +176,8 @@ private void initReader(Path file) {
if (reader != null) {
reader.close();
}
this.reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
this.reader = AvroParquetReader.<GenericRecord>builder(file).withDataModel(GenericData.get()).build();
} catch (IOException e) {
throw new RuntimeException(e);
}