mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 20:30:06 +08:00
SQOOP-2382: Sqoop2: Arithmetic exception in partitioner when allow null is true
(Banmeet Singh via Abraham Elmahrek)
This commit is contained in:
parent
87855a3768
commit
4a5bd295d8
@ -35,6 +35,8 @@ public class PartitionerContext extends TransferableContext {
|
|||||||
|
|
||||||
private Schema schema;
|
private Schema schema;
|
||||||
|
|
||||||
|
private boolean skipMaxPartitionCheck = false;
|
||||||
|
|
||||||
public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) {
|
public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) {
|
||||||
super(context);
|
super(context);
|
||||||
this.maxPartitions = maxPartitions;
|
this.maxPartitions = maxPartitions;
|
||||||
@ -53,6 +55,32 @@ public long getMaxPartitions() {
|
|||||||
return maxPartitions;
|
return maxPartitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set flag indicating whether to skip check that number of splits
|
||||||
|
* < max extractors specified by user.
|
||||||
|
*
|
||||||
|
* Needed in case user specifies number of extractors as 1 as well as
|
||||||
|
* allows null values in partitioning column
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public void setSkipMaxPartitionCheck(boolean skipMaxPartitionCheck) {
|
||||||
|
this.skipMaxPartitionCheck = skipMaxPartitionCheck;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return flag indicating whether to skip the check that number of splits
|
||||||
|
* < max extractors specified by user.
|
||||||
|
*
|
||||||
|
* Needed in case user specifies number of extractors as 1 as well as
|
||||||
|
* allows null values in partitioning column
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean getSkipMaxPartitionCheck() {
|
||||||
|
return this.skipMaxPartitionCheck;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return schema associated with this step.
|
* Return schema associated with this step.
|
||||||
*
|
*
|
||||||
|
@ -74,7 +74,12 @@ public List<Partition> getPartitions(PartitionerContext context, LinkConfigurati
|
|||||||
GenericJdbcPartition partition = new GenericJdbcPartition();
|
GenericJdbcPartition partition = new GenericJdbcPartition();
|
||||||
partition.setConditions(partitionColumnName + " IS NULL");
|
partition.setConditions(partitionColumnName + " IS NULL");
|
||||||
partitions.add(partition);
|
partitions.add(partition);
|
||||||
numberPartitions -= 1;
|
if (numberPartitions > 1) {
|
||||||
|
numberPartitions -= 1;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
context.setSkipMaxPartitionCheck(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (partitionColumnType) {
|
switch (partitionColumnType) {
|
||||||
|
@ -184,7 +184,9 @@ Next, we can use the two link Ids to associate the ``From`` and ``To`` for the j
|
|||||||
Loaders:(Optional) 2
|
Loaders:(Optional) 2
|
||||||
New job was successfully created with validation status OK and persistent id 1
|
New job was successfully created with validation status OK and persistent id 1
|
||||||
|
|
||||||
Our new job object was created with assigned id 1.
|
Our new job object was created with assigned id 1. Note that if null value is allowed for the partition column,
|
||||||
|
at least 2 extractors are needed for Sqoop to carry out the data transfer. On specifying 1 extractor in this
|
||||||
|
scenario, Sqoop shall ignore this setting and continue with 2 extractors.
|
||||||
|
|
||||||
Start Job ( a.k.a Data transfer )
|
Start Job ( a.k.a Data transfer )
|
||||||
=================================
|
=================================
|
||||||
|
@ -80,7 +80,9 @@ public List<InputSplit> getSplits(JobContext context)
|
|||||||
splits.add(split);
|
splits.add(split);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(splits.size() > maxPartitions) {
|
//SQOOP-2382: Need to skip this check in case extractors is set to 1
|
||||||
|
// and null values are allowed in partitioning column
|
||||||
|
if(splits.size() > maxPartitions && (false == partitionerContext.getSkipMaxPartitionCheck())) {
|
||||||
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
|
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
|
||||||
String.format("Got %d, max was %d", splits.size(), maxPartitions));
|
String.format("Got %d, max was %d", splits.size(), maxPartitions));
|
||||||
}
|
}
|
||||||
|
@ -270,4 +270,54 @@ public void testDuplicateColumns() throws Exception {
|
|||||||
// Clean up testing table
|
// Clean up testing table
|
||||||
dropTable();
|
dropTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllowNullsWithOneExtractor() throws Exception {
|
||||||
|
//Integration test case for SQOOP-2382
|
||||||
|
//Server must not throw an exception when null values are allowed in the
|
||||||
|
//partitioning column and number of extractors is set to only 1
|
||||||
|
|
||||||
|
createAndLoadTableCities();
|
||||||
|
|
||||||
|
// RDBMS link
|
||||||
|
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
|
||||||
|
fillRdbmsLinkConfig(rdbmsConnection);
|
||||||
|
saveLink(rdbmsConnection);
|
||||||
|
|
||||||
|
// HDFS link
|
||||||
|
MLink hdfsConnection = getClient().createLink("hdfs-connector");
|
||||||
|
fillHdfsLink(hdfsConnection);
|
||||||
|
saveLink(hdfsConnection);
|
||||||
|
|
||||||
|
// Job creation
|
||||||
|
MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId());
|
||||||
|
|
||||||
|
// Set rdbms "FROM" config
|
||||||
|
fillRdbmsFromConfig(job, "id");
|
||||||
|
|
||||||
|
MConfigList configs = job.getFromJobConfig();
|
||||||
|
configs.getBooleanInput("fromJobConfig.allowNullValueInPartitionColumn").setValue(true);
|
||||||
|
|
||||||
|
// fill the hdfs "TO" config
|
||||||
|
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||||
|
// driver config
|
||||||
|
MDriverConfig driverConfig = job.getDriverConfig();
|
||||||
|
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);
|
||||||
|
|
||||||
|
saveJob(job);
|
||||||
|
|
||||||
|
executeJob(job);
|
||||||
|
|
||||||
|
// Assert correct output
|
||||||
|
assertTo(
|
||||||
|
"1,'USA','2004-10-23','San Francisco'",
|
||||||
|
"2,'USA','2004-10-24','Sunnyvale'",
|
||||||
|
"3,'Czech Republic','2004-10-25','Brno'",
|
||||||
|
"4,'USA','2004-10-26','Palo Alto'"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Clean up testing table
|
||||||
|
dropTable();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user