5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-19 02:10:54 +08:00

SQOOP-1487: Sqoop2: From/To: Refactor/Create HDFS connector test cases

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-09-03 09:40:11 +02:00 committed by Abraham Elmahrek
parent 8fee134f7f
commit c0b22b1d62
10 changed files with 637 additions and 498 deletions

View File

@ -34,6 +34,11 @@ limitations under the License.
<!-- TODO: Hardcoding Hadoop200 for now -->
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-spi</artifactId>

View File

@ -42,8 +42,6 @@
* Extract from HDFS.
* Default field delimiter of a record is comma.
*/
public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);

View File

@ -15,15 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.LinkedList;
import java.util.List;
public class FileUtils {
@ -62,6 +65,16 @@ public static OutputStream create(String fileName) throws IOException {
return fs.create(filepath, false);
}
public static Path[] listDir(String directory) throws IOException {
Path dirpath = new Path(directory);
FileSystem fs = dirpath.getFileSystem(new Configuration());
List<Path> paths = new LinkedList<Path>();
for (FileStatus fileStatus : fs.listStatus(dirpath)) {
paths.add(fileStatus.getPath());
}
return paths.toArray(new Path[paths.size()]);
}
private FileUtils() {
// Disable explicit object creation
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
@RunWith(Parameterized.class)
public class TestExtractor extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
private static final int NUMBER_OF_FILES = 5;
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
private OutputFormat outputFileType;
private Class<? extends CompressionCodec> compressionClass;
private final String inputDirectory;
private Extractor extractor;
public TestExtractor(OutputFormat outputFileType,
Class<? extends CompressionCodec> compressionClass)
throws Exception {
this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
this.outputFileType = outputFileType;
this.compressionClass = compressionClass;
this.extractor = new HdfsExtractor();
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<Object[]>();
for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
parameters.add(new Object[]{outputFileType, compressionClass});
}
}
return parameters;
}
@Before
public void setUp() throws Exception {
FileUtils.mkdirs(inputDirectory);
switch (this.outputFileType) {
case TEXT_FILE:
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
break;
case SEQUENCE_FILE:
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
break;
}
}
@After
public void tearDown() throws IOException {
FileUtils.delete(inputDirectory);
}
@Test
public void testExtractor() throws Exception {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
private long index = 1L;
@Override
public void writeArrayRecord(Object[] array) {
throw new AssertionError("Should not be writing array.");
}
@Override
public void writeStringRecord(String text) {
Assert.assertEquals(index + "," + index + ".0,'" + index++ + "'", text);
}
@Override
public void writeRecord(Object obj) {
throw new AssertionError("Should not be writing object.");
}
}, null);
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
extractor.extract(context, connConf, jobConf, partition);
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
public class TestHdfsBase {
protected HdfsPartition createPartition(Path[] paths) throws IOException {
long[] offsets = new long[paths.length];
long[] lengths = new long[paths.length];
String[] locations = new String[paths.length];
FileSystem fs = FileSystem.get(new Configuration());
for (int i = 0; i < offsets.length; ++i) {
locations[i] = paths[i].getName();
lengths[i] = fs.getFileStatus(paths[i]).getLen();
}
return new HdfsPartition(paths, offsets, lengths, locations);
}
protected void createTextInput(String indir,
Class<? extends CompressionCodec> clz,
int numberOfFiles,
int numberOfRows)
throws IOException, InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
String extension = "";
if (clz != null) {
codec = clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
extension = codec.getDefaultExtension();
}
int index = 1;
for (int fi = 0; fi < numberOfFiles; fi++) {
String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
OutputStream filestream = FileUtils.create(fileName);
BufferedWriter filewriter;
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
"UTF-8"));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, "UTF-8"));
}
for (int ri = 0; ri < numberOfRows; ri++) {
String row = index + "," + (double)index + ",'" + index + "'";
filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER);
index++;
}
filewriter.close();
}
}
protected void createSequenceInput(String indir,
Class<? extends CompressionCodec> clz,
int numberOfFiles,
int numberOfRows)
throws IOException, InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
CompressionCodec codec = null;
if (clz != null) {
codec = clz.newInstance();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(conf);
}
}
int index = 1;
for (int fi = 0; fi < numberOfFiles; fi++) {
Path filepath = new Path(indir,
"part-r-" + padZeros(fi, 5) + ".seq");
SequenceFile.Writer filewriter;
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class,
SequenceFile.CompressionType.BLOCK, codec);
} else {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
}
Text text = new Text();
for (int ri = 0; ri < numberOfRows; ri++) {
String row = index + "," + (double)index + ",'" + index + "'";
text.set(row);
filewriter.append(text, NullWritable.get());
index++;
}
filewriter.close();
}
}
private String padZeros(int number, int digits) {
String string = String.valueOf(number);
for (int i = (digits - string.length()); i > 0; i--) {
string = "0" + string;
}
return string;
}
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.OutputCompression;
import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
@RunWith(Parameterized.class)
public class TestLoader extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
private OutputFormat outputFormat;
private OutputCompression compression;
private final String outputDirectory;
private Loader loader;
public TestLoader(OutputFormat outputFormat,
OutputCompression compression)
throws Exception {
this.outputDirectory = INPUT_ROOT + getClass().getSimpleName();
this.outputFormat = outputFormat;
this.compression = compression;
this.loader = new HdfsLoader();
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<Object[]>();
for (OutputCompression compression : new OutputCompression[]{
OutputCompression.DEFAULT,
OutputCompression.BZIP2,
OutputCompression.NONE
}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
parameters.add(new Object[]{outputFileType, compression});
}
}
return parameters;
}
@Before
public void setUp() throws Exception {}
@After
public void tearDown() throws IOException {
FileUtils.delete(outputDirectory);
}
@Test
public void testLoader() throws Exception {
FileSystem fs = FileSystem.get(new Configuration());
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
private long index = 0L;
@Override
public Object[] readArrayRecord() {
return null;
}
@Override
public String readTextRecord() {
if (index++ < NUMBER_OF_ROWS_PER_FILE) {
return index + "," + (double)index + ",'" + index + "'";
} else {
return null;
}
}
@Override
public Object readContent() {
return null;
}
}, null);
ConnectionConfiguration connConf = new ConnectionConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
jobConf.output.outputDirectory = outputDirectory;
jobConf.output.compression = compression;
jobConf.output.outputFormat = outputFormat;
Path outputPath = new Path(outputDirectory);
loader.load(context, connConf, jobConf);
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
verifyOutput(fs, status.getPath());
}
loader.load(context, connConf, jobConf);
Assert.assertEquals(2, fs.listStatus(outputPath).length);
loader.load(context, connConf, jobConf);
loader.load(context, connConf, jobConf);
loader.load(context, connConf, jobConf);
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
private void verifyOutput(FileSystem fs, Path file) throws IOException {
Configuration conf = new Configuration();
FSDataInputStream fsin = fs.open(file);
CompressionCodec codec;
switch(outputFormat) {
case TEXT_FILE:
codec = (new CompressionCodecFactory(conf)).getCodec(file);
// Verify compression
switch(compression) {
case BZIP2:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
break;
case DEFAULT:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Deflate") != -1);
break;
case NONE:
default:
Assert.assertNull(codec);
break;
}
InputStreamReader in;
if (codec == null) {
in = new InputStreamReader(fsin);
} else {
in = new InputStreamReader(codec.createInputStream(fsin, codec.createDecompressor()));
}
BufferedReader textReader = new BufferedReader(in);
for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", textReader.readLine());
}
break;
case SEQUENCE_FILE:
SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, file, conf);
codec = sequenceReader.getCompressionCodec();
// Verify compression
switch(compression) {
case BZIP2:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
break;
case DEFAULT:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
break;
case NONE:
default:
Assert.assertNull(codec);
break;
}
Text line = new Text();
int index = 1;
while (sequenceReader.next(line)) {
Assert.assertEquals(index + "," + (double)index + ",'" + index++ + "'", line.toString());
line = new Text();
}
break;
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.*;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestPartitioner extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
private static final int NUMBER_OF_FILES = 5;
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
private OutputFormat outputFileType;
private Class<? extends CompressionCodec> compressionClass;
private Partitioner partitioner;
private final String inputDirectory;
public TestPartitioner(OutputFormat outputFileType, Class<? extends CompressionCodec> compressionClass) {
this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
this.outputFileType = outputFileType;
this.compressionClass = compressionClass;
}
@Before
public void setUp() throws Exception {
partitioner = new HdfsPartitioner();
FileUtils.mkdirs(inputDirectory);
switch (this.outputFileType) {
case TEXT_FILE:
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
break;
case SEQUENCE_FILE:
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
break;
}
}
@After
public void tearDown() throws IOException {
FileUtils.delete(inputDirectory);
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<Object[]>();
for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
parameters.add(new Object[]{outputFileType, compressionClass});
}
}
return parameters;
}
@Test
public void testPartitioner() {
Configuration conf = new Configuration();
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
ConnectionConfiguration connConf = new ConnectionConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
jobConf.input.inputDirectory = inputDirectory;
List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
if (this.compressionClass == null) {
assertEquals(5, partitions.size());
} else {
assertEquals(3, partitions.size());
}
}
}

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -1,241 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import junit.framework.TestCase;
//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
//import org.apache.sqoop.job.etl.HdfsExportExtractor;
//import org.apache.sqoop.job.etl.HdfsExportPartitioner;
public class TestHdfsExtract extends TestCase {
// private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
// private static final int NUMBER_OF_FILES = 5;
// private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
//
// private final String indir;
//
// public TestHdfsExtract() {
// indir = INPUT_ROOT + getClass().getSimpleName();
// }
//
// @Override
// public void setUp() throws IOException {
// FileUtils.mkdirs(indir);
// }
//
// @Override
// public void tearDown() throws IOException {
// FileUtils.delete(indir);
// }
//
// /**
// * Test case for validating the number of partitions creation
// * based on input.
// * Success if the partitions list size is less or equal to
// * given max partition.
// * @throws Exception
// */
// @Test
// public void testHdfsExportPartitioner() throws Exception {
// createTextInput(null);
// Configuration conf = new Configuration();
// conf.set(JobConstants.HADOOP_INPUTDIR, indir);
//
// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
// PrefixContext prefixContext = new PrefixContext(conf, "");
// int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
//
// for(int maxPartitions : partitionValues) {
// PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null);
// List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
// assertTrue(partitionList.size()<=maxPartitions);
// }
// }
//
// @Test
// public void testUncompressedText() throws Exception {
// createTextInput(null);
//
// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
// }
//
// @Test
// public void testDefaultCompressedText() throws Exception {
// createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
//
// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
// }
//
// @Test
// public void testBZip2CompressedText() throws Exception {
// createTextInput(BZip2Codec.class);
//
// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
// }
//
// @Test
// public void testDefaultCompressedSequence() throws Exception {
// createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
//
// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
// }
//
// @Test
// public void testUncompressedSequence() throws Exception {
// createSequenceInput(null);
//
// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
// }
//
// private Schema createSchema() {
// Schema schema = new Schema("Test");
// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
// return schema;
// }
//
// private Configuration createConf() {
// Configuration conf = new Configuration();
// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
// conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
// HdfsExportPartitioner.class.getName());
// conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
// HdfsExportExtractor.class.getName());
// conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
// conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
// conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
// return conf;
// }
//
// private Job createJob(Configuration conf, Schema schema) throws Exception {
// Job job = new Job(conf);
// ConfigurationUtils.setConnectorSchema(job, schema);
// job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// return job;
// }
//
// private void createTextInput(Class<? extends CompressionCodec> clz)
// throws IOException, InstantiationException, IllegalAccessException {
// Configuration conf = new Configuration();
//
// CompressionCodec codec = null;
// String extension = "";
// if (clz != null) {
// codec = clz.newInstance();
// if (codec instanceof Configurable) {
// ((Configurable) codec).setConf(conf);
// }
// extension = codec.getDefaultExtension();
// }
//
// int index = 1;
// for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
// String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
// OutputStream filestream = FileUtils.create(fileName);
// BufferedWriter filewriter;
// if (codec != null) {
// filewriter = new BufferedWriter(new OutputStreamWriter(
// codec.createOutputStream(filestream, codec.createCompressor()),
// Data.CHARSET_NAME));
// } else {
// filewriter = new BufferedWriter(new OutputStreamWriter(
// filestream, Data.CHARSET_NAME));
// }
//
// for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
// String row = index + "," + (double)index + ",'" + index + "'";
// filewriter.write(row + Data.DEFAULT_RECORD_DELIMITER);
// index++;
// }
//
// filewriter.close();
// }
// }
//
// private void createSequenceInput(Class<? extends CompressionCodec> clz)
// throws IOException, InstantiationException, IllegalAccessException {
// Configuration conf = new Configuration();
//
// CompressionCodec codec = null;
// if (clz != null) {
// codec = clz.newInstance();
// if (codec instanceof Configurable) {
// ((Configurable) codec).setConf(conf);
// }
// }
//
// int index = 1;
// for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
// Path filepath = new Path(indir,
// "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
// SequenceFile.Writer filewriter;
// if (codec != null) {
// filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
// conf, filepath, Text.class, NullWritable.class,
// CompressionType.BLOCK, codec);
// } else {
// filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
// conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
// }
//
// Text text = new Text();
// for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
// String row = index + "," + (double)index + ",'" + index + "'";
// text.set(row);
// filewriter.append(text, NullWritable.get());
// index++;
// }
//
// filewriter.close();
// }
// }
//
// private String padZeros(int number, int digits) {
// String string = String.valueOf(number);
// for (int i=(digits-string.length()); i>0; i--) {
// string = "0" + string;
// }
// return string;
// }
//
// public static class DummyLoader extends Loader {
// @Override
// public void load(LoaderContext context, Object oc, Object oj) throws Exception {
// int index = 1;
// int sum = 0;
// Object[] array;
// while ((array = context.getDataReader().readArrayRecord()) != null) {
// sum += Integer.valueOf(array[0].toString());
// index++;
// };
//
// int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
// assertEquals((1+numbers)*numbers/2, sum);
//
// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
// }
// }
}

View File

@ -1,250 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import junit.framework.TestCase;
//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
//import org.apache.sqoop.job.etl.HdfsTextImportLoader;
public class TestHdfsLoad extends TestCase {
// private static final String OUTPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
// private static final String OUTPUT_FILE = "part-r-00000";
// private static final int START_ID = 1;
// private static final int NUMBER_OF_IDS = 9;
// private static final int NUMBER_OF_ROWS_PER_ID = 10;
//
// private String outdir;
//
// public TestHdfsLoad() {
// outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
// }
//
// public void testUncompressedText() throws Exception {
// FileUtils.delete(outdir);
//
// Configuration conf = new Configuration();
// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
// Schema schema = new Schema("Test");
// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
//
// Job job = new Job(conf);
// ConfigurationUtils.setConnectorSchema(job, schema);
// JobUtils.runJob(job.getConfiguration());
//
// String fileName = outdir + "/" + OUTPUT_FILE;
// InputStream filestream = FileUtils.open(fileName);
// BufferedReader filereader = new BufferedReader(new InputStreamReader(
// filestream, Charsets.UTF_8));
// verifyOutputText(filereader);
// }
//
// public void testCompressedText() throws Exception {
// FileUtils.delete(outdir);
//
// Configuration conf = new Configuration();
// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
//
// Schema schema = new Schema("Test");
// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
//
// Job job = new Job(conf);
// ConfigurationUtils.setConnectorSchema(job, schema);
// JobUtils.runJob(job.getConfiguration());
//
// Class<? extends CompressionCodec> codecClass = conf.getClass(
// JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
// .asSubclass(CompressionCodec.class);
// CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
// String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
// InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
// BufferedReader filereader = new BufferedReader(new InputStreamReader(
// filestream, Charsets.UTF_8));
// verifyOutputText(filereader);
// }
//
// private void verifyOutputText(BufferedReader reader) throws IOException {
// String actual = null;
// String expected;
// Data data = new Data();
// int index = START_ID*NUMBER_OF_ROWS_PER_ID;
// while ((actual = reader.readLine()) != null){
// data.setContent(new Object[] {
// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
// Data.ARRAY_RECORD);
// expected = data.toString();
// index++;
//
// assertEquals(expected, actual);
// }
// reader.close();
//
// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
// index-START_ID*NUMBER_OF_ROWS_PER_ID);
// }
//
// public void testUncompressedSequence() throws Exception {
// FileUtils.delete(outdir);
//
// Configuration conf = new Configuration();
// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
//
// Schema schema = new Schema("Test");
// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
//
// Job job = new Job(conf);
// ConfigurationUtils.setConnectorSchema(job, schema);
// JobUtils.runJob(job.getConfiguration());
//
// Path filepath = new Path(outdir,
// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
// SequenceFile.Reader filereader = new SequenceFile.Reader(
// filepath.getFileSystem(conf), filepath, conf);
// verifyOutputSequence(filereader);
// }
//
// public void testCompressedSequence() throws Exception {
// FileUtils.delete(outdir);
//
// Configuration conf = new Configuration();
// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
// CSVIntermediateDataFormat.class.getName());
// conf.set(JobConstants.HADOOP_OUTDIR, outdir);
// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
//
// Schema schema = new Schema("Test");
// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
// .addColumn(new org.apache.sqoop.schema.type.Text("3"));
//
// Job job = new Job(conf);
// ConfigurationUtils.setConnectorSchema(job, schema);
// JobUtils.runJob(job.getConfiguration());
// Path filepath = new Path(outdir,
// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
// SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf);
// verifyOutputSequence(filereader);
// }
//
// private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
// int index = START_ID*NUMBER_OF_ROWS_PER_ID;
// Text actual = new Text();
// Text expected = new Text();
// Data data = new Data();
// while (reader.next(actual)){
// data.setContent(new Object[] {
// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
// Data.ARRAY_RECORD);
// expected.set(data.toString());
// index++;
//
// assertEquals(expected.toString(), actual.toString());
// }
// reader.close();
//
// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
// index-START_ID*NUMBER_OF_ROWS_PER_ID);
// }
//
// public static class DummyPartition extends Partition {
// private int id;
//
// public void setId(int id) {
// this.id = id;
// }
//
// public int getId() {
// return id;
// }
//
// @Override
// public void readFields(DataInput in) throws IOException {
// id = in.readInt();
// }
//
// @Override
// public void write(DataOutput out) throws IOException {
// out.writeInt(id);
// }
//
// @Override
// public String toString() {
// return Integer.toString(id);
// }
// }
//
// public static class DummyPartitioner extends Partitioner {
// @Override
// public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
// List<Partition> partitions = new LinkedList<Partition>();
// for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
// DummyPartition partition = new DummyPartition();
// partition.setId(id);
// partitions.add(partition);
// }
// return partitions;
// }
// }
//
// public static class DummyExtractor extends Extractor {
// @Override
// public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
// int id = ((DummyPartition)partition).getId();
// for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
// Object[] array = new Object[] {
// id * NUMBER_OF_ROWS_PER_ID + row,
// (double) (id * NUMBER_OF_ROWS_PER_ID + row),
// new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
// };
// context.getDataWriter().writeArrayRecord(array);
// }
// }
//
// @Override
// public long getRowsRead() {
// return NUMBER_OF_ROWS_PER_ID;
// }
// }
}