5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:21:22 +08:00

SQOOP-2567: SQOOP import for Oracle fails with invalid precision/scale for decimal

(Fero Szabo via Szabolcs Vasas)
This commit is contained in:
Szabolcs Vasas 2018-04-18 11:13:31 +02:00
parent af7a594d98
commit 44ac3012f1
31 changed files with 1187 additions and 340 deletions

View File

@ -465,6 +465,20 @@ To avoid this error, one can use the sqoop.avro.decimal_padding.enable flag
to turn on padding with 0s. This flag has to be used together with the
sqoop.avro.logical_types.decimal.enable flag set to true.
Default precision and scale in avro import
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
All of the databases allow users to specify numeric columns without
a precision or scale. While MS SQL and MySQL translate these into
a valid precision and scale values, Oracle and Postgres don't.
Therefore, when a table contains NUMBER in a table in Oracle or
NUMERIC/DECIMAL in Postgres, one can specify a default precision and scale
to be used in the avro schema by using the +sqoop.avro.logical_types.decimal.default.precision+
and +sqoop.avro.logical_types.decimal.default.scale+ flags.
Avro padding also has to be enabled, if the values are shorter than
the specified default scale.
Large Objects
^^^^^^^^^^^^^
@ -807,3 +821,13 @@ $ sqoop import -Dsqoop.avro.decimal_padding.enable=true -Dsqoop.avro.logical_typ
--target-dir hdfs://nameservice1//etl/target_path --as-avrodatafile --verbose -m 1
----
Enabling logical types in avro import and also turning on padding with 0s, while specifying default precision and scale as well:
----
$ sqoop import -Dsqoop.avro.decimal_padding.enable=true -Dsqoop.avro.logical_types.decimal.enable=true
-Dsqoop.avro.logical_types.decimal.default.precision=38 -Dsqoop.avro.logical_types.decimal.default.scale=10
--connect $CON --username $USER --password $PASS --query "select * from table_name where \$CONDITIONS"
--target-dir hdfs://nameservice1//etl/target_path --as-avrodatafile --verbose -m 1
----

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.avro;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
@ -34,6 +35,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.orm.ClassWriter;
@ -308,4 +311,33 @@ public boolean accept(Path p) {
fileReader.close();
return result;
}
/**
* This method checks if the precision is an invalid value, i.e. smaller than 0 and
* if so, tries to overwrite precision and scale with the configured defaults from
* the configuration object. If a default precision is not defined, then throws an Exception.
*
* @param precision precision
* @param scale scale
* @param conf Configuration that contains the default values if the user specified them
* @return an avro decimal type, that can be added as a column type in the avro schema generation
*/
public static LogicalType createDecimalType(Integer precision, Integer scale, Configuration conf) {
if (precision == null || precision <= 0) {
// we check if the user configured default precision and scale and use these values instead of invalid ones.
Integer configuredPrecision = ConfigurationHelper.getIntegerConfigIfExists(conf, ConfigurationConstants.PROP_AVRO_DECIMAL_PRECISION);
if (configuredPrecision != null) {
precision = configuredPrecision;
} else {
throw new RuntimeException("Invalid precision for Avro Schema. Please specify a default precision with the -D" +
ConfigurationConstants.PROP_AVRO_DECIMAL_PRECISION + " flag to avoid this issue.");
}
Integer configuredScale = ConfigurationHelper.getIntegerConfigIfExists(conf, ConfigurationConstants.PROP_AVRO_DECIMAL_SCALE);
if (configuredScale != null) {
scale = configuredScale;
}
}
return LogicalTypes.decimal(precision, scale);
}
}

View File

@ -105,6 +105,16 @@ public final class ConfigurationConstants {
*/
public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
/**
* Default precision for avro schema
*/
public static final String PROP_AVRO_DECIMAL_PRECISION = "sqoop.avro.logical_types.decimal.default.precision";
/**
* Default scale for avro schema
*/
public static final String PROP_AVRO_DECIMAL_SCALE = "sqoop.avro.logical_types.decimal.default.scale";
/**
* Enable padding for avro logical types (decimal support only).
*/

View File

@ -249,4 +249,13 @@ public static boolean isLocalJobTracker(Configuration conf) {
private ConfigurationHelper() {
// Disable explicit object creation
}
public static Integer getIntegerConfigIfExists(Configuration conf, String key) {
Integer config = null;
String configString = conf.get(key, null);
if (configString != null) {
config = Integer.valueOf(configString);
}
return config;
}
}

View File

@ -33,18 +33,18 @@
import java.util.StringTokenizer;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema.Type;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.hive.HiveTypes;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.manager.SqlManager;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
@ -57,6 +57,8 @@ public abstract class ConnManager {
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
protected SqoopOptions options;
/**
* Return a list of all databases on a server.
*/
@ -226,14 +228,17 @@ public Type toAvroType(int sqlType) {
/**
* Resolve a database-specific type to Avro logical data type.
* @param sqlType sql type
* @return avro type
* @param sqlType sql type
* @param precision
* @param scale
* @return avro type
*/
public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
Configuration conf = options.getConf();
switch (sqlType) {
case Types.NUMERIC:
case Types.DECIMAL:
return LogicalTypes.decimal(precision, scale);
return AvroUtil.createDecimalType(precision, scale, conf);
default:
throw new IllegalArgumentException("Cannot convert SQL type "
+ sqlType + " to avro logical type");

View File

@ -43,9 +43,11 @@
import java.util.TimeZone;
import java.util.TreeMap;
import org.apache.avro.LogicalType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.manager.oracle.OracleUtils;
import org.apache.sqoop.util.LoggingUtils;
@ -1103,5 +1105,12 @@ public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
return "SELECT MIN(" + splitByCol + "), MAX(" + splitByCol + ") FROM ("
+ sanitizedQuery + ") t1";
}
@Override
public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
Configuration conf = options.getConf();
return OracleUtils.toAvroLogicalType(sqlType, precision, scale, conf);
}
}

View File

@ -77,7 +77,6 @@ public abstract class SqlManager
protected static final int DEFAULT_FETCH_SIZE = 1000;
protected SqoopOptions options;
private Statement lastStatement;
/**

View File

@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Properties;
import org.apache.avro.LogicalType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -648,4 +649,10 @@ public boolean isDirectModeHBaseSupported() {
public boolean isDirectModeAccumuloSupported() {
return true;
}
@Override
public LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale) {
Configuration conf = options.getConf();
return OracleUtils.toAvroLogicalType(sqlType, precision, scale, conf);
}
}

View File

@ -18,19 +18,28 @@
package org.apache.sqoop.manager.oracle;
import org.apache.avro.LogicalType;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.config.ConfigurationHelper;
import java.sql.Types;
/**
* Utility class for Oracle.
*
*/
public final class OracleUtils {
private static final String PERIOD_REGEX = "\\.";
private static final String PERIOD_DELIMITER = ".";
private static final String PERIOD_REGEX = "\\.";
public static boolean isOracleEscapingDisabled(Configuration conf) {
private static final String PERIOD_DELIMITER = ".";
private static final int SCALE_VALUE_NOT_SET = -127;
public static boolean isOracleEscapingDisabled(Configuration conf) {
return conf.getBoolean(SqoopOptions.ORACLE_ESCAPING_DISABLED, true);
}
@ -76,4 +85,47 @@ public static String unescapeIdentifier(final String identifier) {
return identifier;
}
}
public static LogicalType toAvroLogicalType(int sqlType, Integer precision, Integer scale, Configuration conf) {
switch (sqlType) {
case Types.NUMERIC:
case Types.DECIMAL:
// Negative scale means that there are a couple of zeros before the decimal point.
// We need to add it to precision as an offset because negative scales are not allowed in Avro.
if (scale < 0 && isValidScale(scale) && isValidPrecision(precision)) {
precision = precision - scale;
scale = 0;
}
Integer configuredScale = ConfigurationHelper.getIntegerConfigIfExists(
conf, ConfigurationConstants.PROP_AVRO_DECIMAL_SCALE);
if (!isValidScale(scale) && configuredScale == null) {
throw new RuntimeException("Invalid scale for Avro Schema. Please specify a default scale with the -D" +
ConfigurationConstants.PROP_AVRO_DECIMAL_SCALE + " flag to avoid this issue.");
}
// AvroUtil will take care of a precision that's 0.
return AvroUtil.createDecimalType(precision, scale, conf);
default:
throw new IllegalArgumentException("Cannot convert SQL type "
+ sqlType + " to avro logical type");
}
}
/**
* When the scale is not set, Oracle returns it as -127
* @param scale
* @return
*/
public static boolean isValidScale(Integer scale) {
return scale != SCALE_VALUE_NOT_SET;
}
/**
* Oracle returns 0 as precision if it's not set
* @param precision
* @return
*/
public static boolean isValidPrecision(Integer precision) {
return precision >= 1;
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob;
import java.util.List;
public interface ImportJobTestConfiguration {
String[] getTypes();
String[] getNames();
List<String[]> getSampleData();
String[] getExpectedResults();
}

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.AvroTestUtils;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
import org.apache.sqoop.testutil.adapter.MSSQLServerDatabaseAdapter;
import org.apache.sqoop.testutil.adapter.MySqlDatabaseAdapter;
import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
import org.apache.sqoop.importjob.avro.configuration.MSSQLServerImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.MySQLImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfigurationForNumber;
import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfiguration;
import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationForNumeric;
import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
@RunWith(Parameterized.class)
/**
* This test covers the behavior of the Avro import for fixed point decimal types, i.e. NUMBER, NUMERIC
* and DECIMAL.
*
* Oracle and Postgres store numbers without padding, while other DBs store them padded with 0s.
*
* The features tested here affect two phases in Sqoop:
* 1. Avro schema generation
* Default precision and scale are used here to avoid issues with Oracle and Postgres, as these
* don't return valid precision and scale if they weren't specified in the table DDL.
*
* 2. Avro import: padding.
* In case of Oracle and Postgres, Sqoop has to pad the values with 0s to avoid errors.
*/
public class AvroImportForNumericTypesTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
AvroImportForNumericTypesTest.class.getName());
private Configuration conf = new Configuration();
private final ImportJobTestConfiguration configuration;
private final DatabaseAdapter adapter;
private final boolean failWithoutExtraArgs;
private final boolean failWithPadding;
// Constants for the basic test case, that doesn't use extra arguments
// that are required to avoid errors, i.e. padding and default precision and scale.
private final static boolean SUCCEED_WITHOUT_EXTRA_ARGS = false;
private final static boolean FAIL_WITHOUT_EXTRA_ARGS = true;
// Constants for the test case that has padding specified but not default precision and scale.
private final static boolean SUCCEED_WITH_PADDING_ONLY = false;
private final static boolean FAIL_WITH_PADDING_ONLY = true;
@Parameters(name = "Adapter: {0}| Config: {1}| failWithoutExtraArgs: {2}| failWithPadding: {3}")
public static Iterable<? extends Object> testConfigurations() {
DatabaseAdapter postgresAdapter = new PostgresDatabaseAdapter();
OracleDatabaseAdapter oracleDatabaseAdapter = new OracleDatabaseAdapter();
return Arrays.asList(
new Object[] {oracleDatabaseAdapter, new OracleImportJobTestConfigurationForNumber(), FAIL_WITHOUT_EXTRA_ARGS, FAIL_WITH_PADDING_ONLY},
new Object[] {oracleDatabaseAdapter, new OracleImportJobTestConfiguration(), FAIL_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY},
new Object[] { new MySqlDatabaseAdapter(), new MySQLImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY},
new Object[] { new MSSQLServerDatabaseAdapter(), new MSSQLServerImportJobTestConfiguration(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY},
new Object[] { postgresAdapter, new PostgresqlImportJobTestConfigurationForNumeric(), FAIL_WITHOUT_EXTRA_ARGS, FAIL_WITH_PADDING_ONLY},
new Object[] { postgresAdapter, new PostgresqlImportJobTestConfigurationPaddingShouldSucceed(), SUCCEED_WITHOUT_EXTRA_ARGS, SUCCEED_WITH_PADDING_ONLY}
);
}
public AvroImportForNumericTypesTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
this.adapter = adapter;
this.configuration = configuration;
this.failWithoutExtraArgs = failWithoutExtraArgs;
this.failWithPadding = failWithPaddingOnly;
}
@Rule
public ExpectedException thrown = ExpectedException.none();
@Override
protected Configuration getConf() {
return conf;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return adapter.getConnectionString();
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions opts = new SqoopOptions(conf);
adapter.injectConnectionParameters(opts);
return opts;
}
@Override
protected void dropTableIfExists(String table) throws SQLException {
adapter.dropTableIfExists(table, getManager());
}
@Before
public void setUp() {
super.setUp();
String[] names = configuration.getNames();
String[] types = configuration.getTypes();
createTableWithColTypesAndNames(names, types, new String[0]);
List<String[]> inputData = configuration.getSampleData();
for (String[] input : inputData) {
insertIntoTable(names, types, input);
}
}
@After
public void tearDown() {
try {
dropTableIfExists(getTableName());
} catch (SQLException e) {
LOG.warn("Error trying to drop table on tearDown: " + e);
}
super.tearDown();
}
private ArgumentArrayBuilder getArgsBuilder() {
ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
builder.withOption("connect", getConnectString());
return builder;
}
@Test
public void testAvroImportWithoutPadding() throws IOException {
if (failWithoutExtraArgs) {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
}
String[] args = getArgsBuilder().build();
runImport(args);
if (!failWithoutExtraArgs) {
verify();
}
}
@Test
public void testAvroImportWithPadding() throws IOException {
if (failWithPadding) {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
}
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
runImport(builder.build());
if (!failWithPadding) {
verify();
}
}
@Test
public void testAvroImportWithDefaultPrecisionAndScale() throws IOException {
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
runImport(builder.build());
verify();
}
private void verify() {
AvroTestUtils.verify(configuration.getExpectedResults(), getConf(), getTablePath());
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
String[] columnTypes = {"INT", "NUMERIC", "NUMERIC(20)", "NUMERIC(20,5)", "NUMERIC(20,0)", "NUMERIC(38,5)",
"DECIMAL", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,0)", "DECIMAL(38,5)"};
return columnTypes;
}
@Override
public String[] getNames() {
String[] columnNames = {"ID", "N1", "N2", "N3", "N4", "N5", "D1", "D2", "D3", "D4", "D5"};
return columnNames;
}
@Override
public List<String[]> getSampleData() {
List<String[]> inputData = new ArrayList<>();
inputData.add(new String[]{"1", "100.050", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
"100.060", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
return inputData;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
String[] columnTypes = {"INT", "NUMERIC", "NUMERIC(20)", "NUMERIC(20,5)", "NUMERIC(20,0)", "NUMERIC(65,5)",
"DECIMAL", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,0)", "DECIMAL(65,5)"};
return columnTypes;
}
@Override
public String[] getNames() {
String[] columnNames = {"ID", "N1", "N2", "N3", "N4", "N5", "D1", "D2", "D3", "D4", "D5"};
return columnNames;
}
@Override
public List<String[]> getSampleData() {
List<String[]> inputData = new ArrayList<>();
inputData.add(new String[]{"1", "100.030", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
"100.040", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
return inputData;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
/**
* This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
* therefore when importing into avro, one has to use the padding feature.
*/
public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
return new String[]{"INT", "NUMBER(20)", "NUMBER(20,5)", "NUMBER(20,-5)", "NUMBER(*,5)",
"DECIMAL", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,-5)", "DECIMAL(*,5)"};
}
@Override
public String[] getNames() {
return new String[]{"ID", "N2", "N3", "N4", "N5", "D1", "D2", "D3", "D4", "D5"};
}
@Override
public List<String[]> getSampleData() {
List<String[]> data = new ArrayList<>();
data.add(new String[]{"1", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
"100.02", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
return data;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
/**
* This test configuration covers NUMBER without a defined precision and scale.
* This is the type that is probably the most commonly used to store numbers and also the most problematic,
* as Sqoop sees this type with a 0 precision and -127 scale, both invalid values.
* Therefore, NUMBER requires special treatment.
* The user has to specify precision and scale when importing into avro.
*/
public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
return new String[]{"INT", "NUMBER", "NUMBER(20)", "NUMBER(20,5)"};
}
@Override
public String[] getNames() {
return new String[]{"ID", "N1", "N2", "N3"};
}
@Override
public List<String[]> getSampleData() {
List<String[]> data = new ArrayList<>();
data.add(new String[]{"1", "100.01", "100.01", "100.03"});
return data;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.03000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
/**
* This test configuration covers the case when postgres returns invalid values
* for precision and scale for NUMERIC. Also, important, that the accompanying columns
* - NUMERIC(20) and NUMERIC(20, 5) don't get modified.
*/
public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
String[] columnTypes = {"INT", "NUMERIC", "NUMERIC(20)", "NUMERIC(20, 5)"};
return columnTypes;
}
@Override
public String[] getNames() {
String[] columnNames = {"ID", "N1", "N2", "N3"};
return columnNames;
}
@Override
public List<String[]> getSampleData() {
List<String[]> inputData = new ArrayList<>();
inputData.add(new String[]{"1", "100.01", "100.01", "100.01"});
return inputData;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.01000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.importjob.avro.configuration;
import org.apache.sqoop.importjob.ImportJobTestConfiguration;
import java.util.ArrayList;
import java.util.List;
public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration {
@Override
public String[] getTypes() {
String[] columnTypes = {"INT", "NUMERIC(20)", "NUMERIC(20,5)", "NUMERIC(20,0)", "NUMERIC(1000,5)",
"DECIMAL(20)", "DECIMAL(20)", "DECIMAL(20,5)", "DECIMAL(20,0)", "DECIMAL(1000,5)"};
return columnTypes;
}
@Override
public String[] getNames() {
String[] columnNames = {"ID", "N2", "N3", "N4", "N5", "D1", "D2", "D3", "D4", "D5"};
return columnNames;
}
@Override
public List<String[]> getSampleData() {
List<String[]> inputData = new ArrayList<>();
inputData.add(new String[]{"1", "1000000.05", "1000000.05", "1000000.05", "1000000.05",
"100.02", "1000000.05", "1000000.05", "1000000.05", "1000000.05"});
return inputData;
}
@Override
public String[] getExpectedResults() {
String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
"\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
String[] expectedResult = new String[1];
expectedResult[0] = expectedRecord;
return expectedResult;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -18,9 +18,6 @@
package org.apache.sqoop.manager.mysql;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
@ -37,6 +34,7 @@ public class MySQLLobAvroImportTest extends LobAvroImportTestCase {
public static final Log LOG = LogFactory.getLog(
MySQLLobAvroImportTest.class.getName());
private MySQLTestUtils mySQLTestUtils = new MySQLTestUtils();
@Override
@ -64,16 +62,7 @@ protected SqoopOptions getSqoopOptions(Configuration conf) {
@Override
protected void dropTableIfExists(String table) throws SQLException {
Connection conn = getManager().getConnection();
PreparedStatement statement = conn.prepareStatement(
"DROP TABLE IF EXISTS " + table,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try {
statement.executeUpdate();
conn.commit();
} finally {
statement.close();
}
mySQLTestUtils.dropTableIfExists(table, getManager());
}
@Override

View File

@ -22,7 +22,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
/**
@ -123,4 +128,16 @@ public void addPasswordIfIsSet(SqoopOptions opts) {
}
}
public void dropTableIfExists(String table, ConnManager manager) throws SQLException {
Connection conn = manager.getConnection();
PreparedStatement statement = conn.prepareStatement(
"DROP TABLE IF EXISTS " + table,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
try {
statement.executeUpdate();
conn.commit();
} finally {
statement.close();
}
}
}

View File

@ -1,120 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.oracle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.oracle.util.OracleUtils;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.AvroTestUtils;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
public class OracleAvroPaddingImportTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
OracleAvroPaddingImportTest.class.getName());
private Configuration conf = new Configuration();
@Rule
public ExpectedException thrown = ExpectedException.none();
@Override
protected Configuration getConf() {
return conf;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String getConnectString() {
return org.apache.sqoop.manager.oracle.util.OracleUtils.CONNECT_STRING;
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions opts = new SqoopOptions(conf);
org.apache.sqoop.manager.oracle.util.OracleUtils.setOracleAuth(opts);
return opts;
}
@Override
protected void dropTableIfExists(String table) throws SQLException {
OracleUtils.dropTable(table, getManager());
}
@Before
public void setUp() {
super.setUp();
String [] names = {"ID", "NAME", "SALARY", "DEPT"};
String [] types = { "INT", "VARCHAR(24)", "DECIMAL(20,5)", "VARCHAR(32)"};
List<String[]> inputData = AvroTestUtils.getInputData();
createTableWithColTypesAndNames(names, types, new String[0]);
insertIntoTable(names, types, inputData.get(0));
insertIntoTable(names, types, inputData.get(1));
insertIntoTable(names, types, inputData.get(2));
}
@After
public void tearDown() {
try {
dropTableIfExists(getTableName());
} catch (SQLException e) {
LOG.warn("Error trying to drop table on tearDown: " + e);
}
super.tearDown();
}
protected ArgumentArrayBuilder getArgsBuilder() {
ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
builder.withOption("connect", getConnectString());
return builder;
}
@Test
public void testAvroImportWithoutPaddingFails() throws IOException {
thrown.expect(IOException.class);
thrown.expectMessage("Failure during job; return status 1");
String[] args = getArgsBuilder().build();
runImport(args);
}
@Test
public void testAvroImportWithPadding() throws IOException {
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
runImport(builder.build());
AvroTestUtils.verify(AvroTestUtils.getExpectedResults(), getConf(), getTablePath());
}
}

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.testutil.SqlUtil;
/**
* Helper methods for Oracle testing.
@ -69,7 +70,9 @@ public static void setOracleSecondaryUserAuth(SqoopOptions options) {
/**
* Drop a table if it exists.
* Use the executeStatement method in {@link SqlUtil} instead.
*/
@Deprecated
public static void dropTable(String tableName, ConnManager manager)
throws SQLException {
Connection connection = null;
@ -99,4 +102,5 @@ public static String getDropTableStatement(String tableName) {
return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; "
+ "exception when others then null; end;";
}
}

View File

@ -92,26 +92,6 @@ public class PostgresqlImportTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
PostgresqlImportTest.class.getName());
static final String HOST_URL = System.getProperty(
"sqoop.test.postgresql.connectstring.host_url",
"jdbc:postgresql://localhost/");
static final String DATABASE_USER = System.getProperty(
"sqoop.test.postgresql.username",
"sqooptest");
static final String DATABASE_NAME = System.getProperty(
"sqoop.test.postgresql.database",
"sqooptest");
static final String PASSWORD = System.getProperty(
"sqoop.test.postgresql.password");
static final String TABLE_NAME = "EMPLOYEES_PG";
static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG";
static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
static final String SCHEMA_PUBLIC = "public";
static final String SCHEMA_SPECIAL = "special";
static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
protected Connection connection;
@Override
@ -119,24 +99,16 @@ protected boolean useHsqldbTestServer() {
return false;
}
public String quoteTableOrSchemaName(String tableName) {
return "\"" + tableName + "\"";
}
private String getDropTableStatement(String tableName, String schema) {
return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
}
@Before
public void setUp() {
super.setUp();
LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
LOG.debug("Setting up another postgresql test: " + PostgresqlTestUtil.CONNECT_STRING);
setUpData(TABLE_NAME, SCHEMA_PUBLIC, false);
setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true);
setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false);
setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false);
setUpData(PostgresqlTestUtil.TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC, false);
setUpData(PostgresqlTestUtil.NULL_TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC, true);
setUpData(PostgresqlTestUtil.SPECIAL_TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC, false);
setUpData(PostgresqlTestUtil.DIFFERENT_TABLE_NAME, PostgresqlTestUtil.SCHEMA_SPECIAL, false);
LOG.debug("setUp complete.");
}
@ -145,10 +117,10 @@ public void setUp() {
public void tearDown() {
try {
Statement stmt = connection.createStatement();
stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC));
stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC));
stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL));
stmt.executeUpdate(PostgresqlTestUtil.getDropTableStatement(PostgresqlTestUtil.TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC));
stmt.executeUpdate(PostgresqlTestUtil.getDropTableStatement(PostgresqlTestUtil.NULL_TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC));
stmt.executeUpdate(PostgresqlTestUtil.getDropTableStatement(PostgresqlTestUtil.SPECIAL_TABLE_NAME, PostgresqlTestUtil.SCHEMA_PUBLIC));
stmt.executeUpdate(PostgresqlTestUtil.getDropTableStatement(PostgresqlTestUtil.DIFFERENT_TABLE_NAME, PostgresqlTestUtil.SCHEMA_SPECIAL));
} catch (SQLException e) {
LOG.error("Can't clean up the database:", e);
}
@ -165,9 +137,9 @@ public void tearDown() {
public void setUpData(String tableName, String schema, boolean nullEntry) {
SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
options.setUsername(DATABASE_USER);
options.setPassword(PASSWORD);
SqoopOptions options = new SqoopOptions(PostgresqlTestUtil.CONNECT_STRING, tableName);
options.setUsername(PostgresqlTestUtil.DATABASE_USER);
options.setPassword(PostgresqlTestUtil.PASSWORD);
ConnManager manager = null;
Statement st = null;
@ -257,11 +229,11 @@ public void setUpData(String tableName, String schema, boolean nullEntry) {
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(CONNECT_STRING);
args.add(PostgresqlTestUtil.CONNECT_STRING);
args.add("--username");
args.add(DATABASE_USER);
args.add(PostgresqlTestUtil.DATABASE_USER);
args.add("--password");
args.add(PASSWORD);
args.add(PostgresqlTestUtil.PASSWORD);
args.add("--where");
args.add("id > 1");
args.add("-m");
@ -328,7 +300,7 @@ public void testJdbcBasedImport() throws IOException {
"3,Fred,2009-01-23,15.0,false,marketing",
};
doImportAndVerify(false, expectedResults, TABLE_NAME);
doImportAndVerify(false, expectedResults, PostgresqlTestUtil.TABLE_NAME);
}
@Test
@ -338,21 +310,21 @@ public void testDirectImport() throws IOException {
"3,Fred,2009-01-23,15,FALSE,marketing",
};
doImportAndVerify(true, expectedResults, TABLE_NAME);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.TABLE_NAME);
}
@Test
public void testListTables() throws IOException {
SqoopOptions options = new SqoopOptions(new Configuration());
options.setConnectString(CONNECT_STRING);
options.setUsername(DATABASE_USER);
options.setPassword(PASSWORD);
options.setConnectString(PostgresqlTestUtil.CONNECT_STRING);
options.setUsername(PostgresqlTestUtil.DATABASE_USER);
options.setPassword(PostgresqlTestUtil.PASSWORD);
ConnManager mgr = new PostgresqlManager(options);
String[] tables = mgr.listTables();
Arrays.sort(tables);
assertTrue(TABLE_NAME + " is not found!",
Arrays.binarySearch(tables, TABLE_NAME) >= 0);
assertTrue(PostgresqlTestUtil.TABLE_NAME + " is not found!",
Arrays.binarySearch(tables, PostgresqlTestUtil.TABLE_NAME) >= 0);
}
@Test
@ -362,7 +334,7 @@ public void testTableNameWithSpecialCharacter() throws IOException {
"3,Fred,2009-01-23,15.0,false,marketing",
};
doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
doImportAndVerify(false, expectedResults, PostgresqlTestUtil.SPECIAL_TABLE_NAME);
}
@Test
@ -373,7 +345,7 @@ public void testIncrementalImport() throws IOException {
"--check-column", "start_date",
};
doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
doImportAndVerify(false, expectedResults, PostgresqlTestUtil.TABLE_NAME, extraArgs);
}
@Test
@ -384,7 +356,7 @@ public void testDirectIncrementalImport() throws IOException {
"--check-column", "start_date",
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.TABLE_NAME, extraArgs);
}
@Test
@ -395,7 +367,7 @@ public void testDirectIncrementalImportMerge() throws IOException {
"--check-column", "start_date",
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.TABLE_NAME, extraArgs);
extraArgs = new String[] { "--incremental", "lastmodified",
"--check-column", "start_date",
@ -403,7 +375,7 @@ public void testDirectIncrementalImportMerge() throws IOException {
"--last-value", "2009-04-20"
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.TABLE_NAME, extraArgs);
}
@Test
@ -414,10 +386,10 @@ public void testDifferentSchemaImport() throws IOException {
};
String [] extraArgs = { "--",
"--schema", SCHEMA_SPECIAL,
"--schema", PostgresqlTestUtil.SCHEMA_SPECIAL,
};
doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
doImportAndVerify(false, expectedResults, PostgresqlTestUtil.DIFFERENT_TABLE_NAME, extraArgs);
}
@Test
@ -428,10 +400,10 @@ public void testDifferentSchemaImportDirect() throws IOException {
};
String [] extraArgs = { "--",
"--schema", SCHEMA_SPECIAL,
"--schema", PostgresqlTestUtil.SCHEMA_SPECIAL,
};
doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.DIFFERENT_TABLE_NAME, extraArgs);
}
@Test
@ -447,7 +419,7 @@ public void testNullEscapeCharacters() throws Exception {
"--null-non-string", "\\\\\\\\N",
};
doImportAndVerify(true, expectedResults, NULL_TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.NULL_TABLE_NAME, extraArgs);
}
@Test
@ -463,6 +435,6 @@ public void testDifferentBooleanValues() throws Exception {
"--boolean-false-string", "REAL_FALSE",
};
doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs);
doImportAndVerify(true, expectedResults, PostgresqlTestUtil.TABLE_NAME, extraArgs);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.postgresql;
public class PostgresqlTestUtil {
public static final String HOST_URL = System.getProperty(
"sqoop.test.postgresql.connectstring.host_url",
"jdbc:postgresql://localhost/");
public static final String DATABASE_USER = System.getProperty(
"sqoop.test.postgresql.username",
"sqooptest");
public static final String DATABASE_NAME = System.getProperty(
"sqoop.test.postgresql.database",
"sqooptest");
public static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
public static final String PASSWORD = System.getProperty(
"sqoop.test.postgresql.password");
static final String TABLE_NAME = "EMPLOYEES_PG";
static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG";
static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
public static final String SCHEMA_PUBLIC = "public";
public static final String SCHEMA_SPECIAL = "special";
public static String quoteTableOrSchemaName(String tableName) {
return "\"" + tableName + "\"";
}
public static String getDropTableStatement(String tableName, String schema) {
return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
}
}

View File

@ -37,11 +37,11 @@ public class MSSQLTestUtils {
public static final Log LOG = LogFactory.getLog(
MSSQLTestUtils.class.getName());
static final String DATABASE_USER = System.getProperty(
public static final String DATABASE_USER = System.getProperty(
"ms.sqlserver.username", "SQOOPUSER");
static final String DATABASE_PASSWORD = System.getProperty(
public static final String DATABASE_PASSWORD = System.getProperty(
"ms.sqlserver.password", "PASSWORD");
static final String DATABASE_NAME = System.getProperty(
public static final String DATABASE_NAME = System.getProperty(
"sqoop.test.sqlserver.database",
"sqooptest");
public static final String HOST_URL = System.getProperty(

View File

@ -1,134 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager.sqlserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.AvroTestUtils;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
public class SQLServerAvroPaddingImportTest extends ImportJobTestCase {
public static final Log LOG = LogFactory.getLog(
SQLServerAvroPaddingImportTest.class.getName());
private Configuration conf = new Configuration();
@Override
protected String getConnectString() {
return MSSQLTestUtils.CONNECT_STRING;
}
@Override
protected Configuration getConf() {
return conf;
}
@Override
protected SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions options = new SqoopOptions();
options.setConnectString(MSSQLTestUtils.CONNECT_STRING);
options.setUsername(MSSQLTestUtils.DATABASE_USER);
options.setPassword(MSSQLTestUtils.DATABASE_PASSWORD);
return options;
}
@Override
protected boolean useHsqldbTestServer() {
return false;
}
@Override
protected String dropTableIfExistsCommand(String table) {
return "DROP TABLE IF EXISTS " + manager.escapeTableName(table);
}
@Before
public void setUp() {
super.setUp();
String [] names = {"ID", "NAME", "SALARY", "DEPT"};
String [] types = { "INT", "VARCHAR(24)", "DECIMAL(20,5)", "VARCHAR(32)"};
List<String[]> inputData = AvroTestUtils.getInputData();
createTableWithColTypesAndNames(names, types, new String[0]);
insertIntoTable(names, types, inputData.get(0));
insertIntoTable(names, types, inputData.get(1));
insertIntoTable(names, types, inputData.get(2));
}
@After
public void tearDown() {
try {
dropTableIfExists(getTableName());
} catch (SQLException e) {
LOG.warn("Error trying to drop table on tearDown: " + e);
}
super.tearDown();
}
protected ArgumentArrayBuilder getArgsBuilder() {
ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
builder.withOption("connect", MSSQLTestUtils.CONNECT_STRING);
builder.withOption("username", MSSQLTestUtils.DATABASE_USER);
builder.withOption("password", MSSQLTestUtils.DATABASE_PASSWORD);
return builder;
}
/**
* Test for avro import with a number value in the table.
* SQL Server stores the values padded in the database, therefore this import should always be successful
* (Oracle for instance doesn't pad numbers in the database, therefore that one fails without the
* sqoop.avro.decimal_padding.enable property)
* @throws IOException
*/
@Test
public void testAvroImportWithoutPaddingFails() throws IOException {
String[] args = getArgsBuilder().build();
runImport(args);
String [] expectedResults = AvroTestUtils.getExpectedResults();
AvroTestUtils.verify(expectedResults, getConf(), getTablePath());
}
/**
* This test covers a different code path than {@link #testAvroImportWithoutPaddingFails()},
* since the BigDecimal values are checked and padded by Sqoop in
* {@link AvroUtil#padBigDecimal(java.math.BigDecimal, org.apache.avro.Schema)}
* No actual padding occurs, as the values coming back from SQL Server are already padded with 0s.
* @throws IOException
*/
@Test
public void testAvroImportWithPadding() throws IOException {
ArgumentArrayBuilder builder = getArgsBuilder();
builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
runImport(builder.build());
String [] expectedResults = AvroTestUtils.getExpectedResults();
AvroTestUtils.verify(expectedResults, getConf(), getTablePath());
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.manager.ConnManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class SqlUtil {
public static final Log LOG = LogFactory.getLog(SqlUtil.class.getName());
public static void executeStatement(String statement, ConnManager manager)
throws SQLException {
Connection connection = null;
Statement st = null;
try {
connection = manager.getConnection();
connection.setAutoCommit(false);
st = connection.createStatement();
// create the database table and populate it with data.
st.executeUpdate(statement);
connection.commit();
} finally {
try {
if (null != st) {
st.close();
}
} catch (SQLException sqlE) {
LOG.warn("Got SQLException when closing connection: " + sqlE);
}
}
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil.adapter;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import java.sql.SQLException;
public interface DatabaseAdapter {
String getConnectionString();
SqoopOptions injectConnectionParameters(SqoopOptions options);
void dropTableIfExists(String tableName, ConnManager manager) throws SQLException;
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil.adapter;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.sqlserver.MSSQLTestUtils;
import java.sql.SQLException;
public class MSSQLServerDatabaseAdapter implements DatabaseAdapter {
@Override
public String getConnectionString() {
return MSSQLTestUtils.CONNECT_STRING;
}
@Override
public SqoopOptions injectConnectionParameters(SqoopOptions options) {
options.setConnectString(MSSQLTestUtils.CONNECT_STRING);
options.setUsername(MSSQLTestUtils.DATABASE_USER);
options.setPassword(MSSQLTestUtils.DATABASE_PASSWORD);
return options;
}
@Override
public void dropTableIfExists(String tableName, ConnManager manager) throws SQLException {
String dropTableStatement = "DROP TABLE IF EXISTS " + manager.escapeTableName(tableName);
manager.execAndPrint(dropTableStatement);
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil.adapter;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.mysql.MySQLTestUtils;
import java.sql.SQLException;
public class MySqlDatabaseAdapter implements DatabaseAdapter {
private MySQLTestUtils mySQLTestUtils = new MySQLTestUtils();
public SqoopOptions injectConnectionParameters(SqoopOptions options) {
options.setUsername(mySQLTestUtils.getUserName());
mySQLTestUtils.addPasswordIfIsSet(options);
return options;
}
public void dropTableIfExists(String tableName, ConnManager manager) throws SQLException {
mySQLTestUtils.dropTableIfExists(tableName, manager);
}
public String getConnectionString() {
return mySQLTestUtils.getMySqlConnectString();
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil.adapter;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.oracle.util.OracleUtils;
import org.apache.sqoop.testutil.SqlUtil;
import java.sql.SQLException;
public class OracleDatabaseAdapter implements DatabaseAdapter {
@Override
public SqoopOptions injectConnectionParameters(SqoopOptions options) {
org.apache.sqoop.manager.oracle.util.OracleUtils.setOracleAuth(options);
return options;
}
@Override
public void dropTableIfExists(String tableName, ConnManager manager) throws SQLException {
String dropTableStatement = OracleUtils.getDropTableStatement(tableName);
SqlUtil.executeStatement(dropTableStatement, manager);
}
@Override
public String getConnectionString() {
return OracleUtils.CONNECT_STRING;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil.adapter;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.postgresql.PostgresqlTestUtil;
import org.apache.sqoop.testutil.SqlUtil;
import java.sql.SQLException;
public class PostgresDatabaseAdapter implements DatabaseAdapter {
@Override
public SqoopOptions injectConnectionParameters(SqoopOptions options) {
options.setUsername(PostgresqlTestUtil.DATABASE_USER);
options.setPassword(PostgresqlTestUtil.PASSWORD);
return options;
}
@Override
public void dropTableIfExists(String tableName, ConnManager manager) throws SQLException {
String dropTableStatement = PostgresqlTestUtil.getDropTableStatement(tableName, PostgresqlTestUtil.SCHEMA_PUBLIC);
SqlUtil.executeStatement(dropTableStatement, manager);
}
@Override
public String getConnectionString() {
return PostgresqlTestUtil.CONNECT_STRING;
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}