mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 23:12:38 +08:00
SQOOP-844: Sqoop2: HdfsExportPartitioner is not always respecting maximal number of partitions
(Vasanth kumar RJ via Jarek Jarcec Cecho)
This commit is contained in:
parent
3865f7deec
commit
ae075f2e37
@ -73,6 +73,10 @@ public List<Partition> getPartitions(PartitionerContext context,
|
|||||||
long numInputBytes = getInputSize(conf);
|
long numInputBytes = getInputSize(conf);
|
||||||
maxSplitSize = numInputBytes / context.getMaxPartitions();
|
maxSplitSize = numInputBytes / context.getMaxPartitions();
|
||||||
|
|
||||||
|
if(numInputBytes % context.getMaxPartitions() != 0 ) {
|
||||||
|
maxSplitSize += 1;
|
||||||
|
}
|
||||||
|
|
||||||
long minSizeNode = 0;
|
long minSizeNode = 0;
|
||||||
long minSizeRack = 0;
|
long minSizeRack = 0;
|
||||||
long maxSize = 0;
|
long maxSize = 0;
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
@ -38,6 +39,8 @@
|
|||||||
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
|
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
|
||||||
import org.apache.sqoop.job.etl.Loader;
|
import org.apache.sqoop.job.etl.Loader;
|
||||||
import org.apache.sqoop.job.etl.LoaderContext;
|
import org.apache.sqoop.job.etl.LoaderContext;
|
||||||
|
import org.apache.sqoop.job.etl.Partition;
|
||||||
|
import org.apache.sqoop.job.etl.PartitionerContext;
|
||||||
import org.apache.sqoop.job.io.Data;
|
import org.apache.sqoop.job.io.Data;
|
||||||
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
|
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -54,6 +57,32 @@ public TestHdfsExtract() {
|
|||||||
indir = INPUT_ROOT + getClass().getSimpleName();
|
indir = INPUT_ROOT + getClass().getSimpleName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case for validating the number of partitions creation
|
||||||
|
* based on input.
|
||||||
|
* Success if the partitions list size is less or equal to
|
||||||
|
* given max partition.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testHdfsExportPartitioner() throws Exception {
|
||||||
|
FileUtils.delete(indir);
|
||||||
|
FileUtils.mkdirs(indir);
|
||||||
|
createTextInput(null);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
|
||||||
|
|
||||||
|
HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
|
||||||
|
PrefixContext prefixContext = new PrefixContext(conf, "");
|
||||||
|
int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
|
||||||
|
|
||||||
|
for(int maxPartitions : partitionValues) {
|
||||||
|
PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions);
|
||||||
|
List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
|
||||||
|
assertTrue(partitionList.size()<=maxPartitions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUncompressedText() throws Exception {
|
public void testUncompressedText() throws Exception {
|
||||||
FileUtils.delete(indir);
|
FileUtils.delete(indir);
|
||||||
|
Loading…
Reference in New Issue
Block a user