From ae075f2e3769da9e43736a865f51eca96c8db61a Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sat, 13 Apr 2013 11:41:04 -0700 Subject: [PATCH] SQOOP-844: Sqoop2: HdfsExportPartitioner is not always respecting maximal number of partitions (Vasanth kumar RJ via Jarek Jarcec Cecho) --- .../sqoop/job/etl/HdfsExportPartitioner.java | 4 +++ .../org/apache/sqoop/job/TestHdfsExtract.java | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java index 115ca542..b3590dc3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java @@ -73,6 +73,10 @@ public List getPartitions(PartitionerContext context, long numInputBytes = getInputSize(conf); maxSplitSize = numInputBytes / context.getMaxPartitions(); + if(numInputBytes % context.getMaxPartitions() != 0 ) { + maxSplitSize += 1; + } + long minSizeNode = 0; long minSizeRack = 0; long maxSize = 0; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index 62f3a038..b3e60509 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.util.List; import junit.framework.TestCase; @@ -38,6 +39,8 @@ import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.junit.Test; @@ -54,6 +57,32 @@ public TestHdfsExtract() { indir = INPUT_ROOT + getClass().getSimpleName(); } + /** + * Test case for validating the number of partitions creation + * based on input. + * Success if the partitions list size is less or equal to + * given max partition. + * @throws Exception + */ + @Test + public void testHdfsExportPartitioner() throws Exception { + FileUtils.delete(indir); + FileUtils.mkdirs(indir); + createTextInput(null); + Configuration conf = new Configuration(); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); + + HdfsExportPartitioner partitioner = new HdfsExportPartitioner(); + PrefixContext prefixContext = new PrefixContext(conf, ""); + int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17}; + + for(int maxPartitions : partitionValues) { + PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions); + List partitionList = partitioner.getPartitions(partCont, null, null); + assertTrue(partitionList.size()<=maxPartitions); + } + } + @Test public void testUncompressedText() throws Exception { FileUtils.delete(indir);