From 4a5bd295d828eaa37b3401872dff83addf36193f Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Tue, 23 Jun 2015 13:47:03 -0700 Subject: [PATCH] SQOOP-2382: Sqoop2: Arithmetic exception in partitioner when allow null is true (Banmeet Singh via Abraham Elmahrek) --- .../sqoop/job/etl/PartitionerContext.java | 28 +++++++++++ .../jdbc/GenericJdbcPartitioner.java | 7 ++- docs/src/site/sphinx/Sqoop5MinutesDemo.rst | 4 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 4 +- .../jdbc/generic/FromRDBMSToHDFSTest.java | 50 +++++++++++++++++++ 5 files changed, 90 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java index 67fffd6d..bb52bb24 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java @@ -35,6 +35,8 @@ public class PartitionerContext extends TransferableContext { private Schema schema; + private boolean skipMaxPartitionCheck = false; + public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) { super(context); this.maxPartitions = maxPartitions; @@ -53,6 +55,32 @@ public long getMaxPartitions() { return maxPartitions; } + /** + * Set flag indicating whether to skip check that number of splits + * < max extractors specified by user. + * + * Needed in case user specifies number of extractors as 1 as well as + * allows null values in partitioning column + * + * @return + */ + public void setSkipMaxPartitionCheck(boolean skipMaxPartitionCheck) { + this.skipMaxPartitionCheck = skipMaxPartitionCheck; + } + + /** + * Return flag indicating whether to skip the check that number of splits + * < max extractors specified by user. + * + * Needed in case user specifies number of extractors as 1 as well as + * allows null values in partitioning column + * + * @return + */ + public boolean getSkipMaxPartitionCheck() { + return this.skipMaxPartitionCheck; + } + /** * Return schema associated with this step. * diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java index 23b57c0f..a99b3a96 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java @@ -74,7 +74,12 @@ public List getPartitions(PartitionerContext context, LinkConfigurati GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions(partitionColumnName + " IS NULL"); partitions.add(partition); - numberPartitions -= 1; + if (numberPartitions > 1) { + numberPartitions -= 1; + } + else { + context.setSkipMaxPartitionCheck(true); + } } switch (partitionColumnType) { diff --git a/docs/src/site/sphinx/Sqoop5MinutesDemo.rst b/docs/src/site/sphinx/Sqoop5MinutesDemo.rst index fc9a43b2..19115a2c 100644 --- a/docs/src/site/sphinx/Sqoop5MinutesDemo.rst +++ b/docs/src/site/sphinx/Sqoop5MinutesDemo.rst @@ -184,7 +184,9 @@ Next, we can use the two link Ids to associate the ``From`` and ``To`` for the j Loaders:(Optional) 2 New job was successfully created with validation status OK and persistent id 1 -Our new job object was created with assigned id 1. +Our new job object was created with assigned id 1. Note that if null value is allowed for the partition column, +at least 2 extractors are needed for Sqoop to carry out the data transfer. On specifying 1 extractor in this +scenario, Sqoop shall ignore this setting and continue with 2 extractors. Start Job ( a.k.a Data transfer ) ================================= diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 732ee0a5..5aef878e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -80,7 +80,9 @@ public List getSplits(JobContext context) splits.add(split); } - if(splits.size() > maxPartitions) { + //SQOOP-2382: Need to skip this check in case extractors is set to 1 + // and null values are allowed in partitioning column + if(splits.size() > maxPartitions && (false == partitionerContext.getSkipMaxPartitionCheck())) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025, String.format("Got %d, max was %d", splits.size(), maxPartitions)); } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 52d62a64..1272ed98 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -270,4 +270,54 @@ public void testDuplicateColumns() throws Exception { // Clean up testing table dropTable(); } + + @Test + public void testAllowNullsWithOneExtractor() throws Exception { + //Integration test case for SQOOP-2382 + //Server must not throw an exception when null values are allowed in the + //partitioning column and number of extractors is set to only 1 + + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId()); + + // Set rdbms "FROM" config + fillRdbmsFromConfig(job, "id"); + + MConfigList configs = job.getFromJobConfig(); + configs.getBooleanInput("fromJobConfig.allowNullValueInPartitionColumn").setValue(true); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + // driver config + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1); + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + // Clean up testing table + dropTable(); + } + }