diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index 8df9f11a..fa4330a1 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -34,6 +34,11 @@ limitations under the License. + + junit + junit + + org.apache.sqoop sqoop-spi diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index fc12381a..74470710 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -42,8 +42,6 @@ * Extract from HDFS. * Default field delimiter of a record is comma. */ - - public class HdfsExtractor extends Extractor { public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java similarity index 81% rename from execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java rename to connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java index e685883e..8c19d016 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java @@ -15,15 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.job; +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import java.util.LinkedList; +import java.util.List; public class FileUtils { @@ -62,6 +65,16 @@ public static OutputStream create(String fileName) throws IOException { return fs.create(filepath, false); } + public static Path[] listDir(String directory) throws IOException { + Path dirpath = new Path(directory); + FileSystem fs = dirpath.getFileSystem(new Configuration()); + List paths = new LinkedList(); + for (FileStatus fileStatus : fs.listStatus(dirpath)) { + paths.add(fileStatus.getPath()); + } + return paths.toArray(new Path[paths.size()]); + } + private FileUtils() { // Disable explicit object creation } diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java new file mode 100644 index 00000000..6ed40872 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.sqoop.common.PrefixContext; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE; + +@RunWith(Parameterized.class) +public class TestExtractor extends TestHdfsBase { + private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; + private static final int NUMBER_OF_FILES = 5; + private static final int NUMBER_OF_ROWS_PER_FILE = 1000; + + private OutputFormat outputFileType; + private Class compressionClass; + private final String inputDirectory; + private Extractor extractor; + + public TestExtractor(OutputFormat outputFileType, + Class compressionClass) + throws Exception { + this.inputDirectory = INPUT_ROOT + getClass().getSimpleName(); + this.outputFileType = outputFileType; + this.compressionClass = compressionClass; + this.extractor = new HdfsExtractor(); + } + + @Parameterized.Parameters + public static Collection data() { + List parameters = new ArrayList(); + for (Class compressionClass : new Class[]{null, DefaultCodec.class, BZip2Codec.class}) { + for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { + parameters.add(new Object[]{outputFileType, compressionClass}); + } + } + return parameters; + } + + @Before + public void setUp() throws Exception { + FileUtils.mkdirs(inputDirectory); + switch (this.outputFileType) { + case TEXT_FILE: + createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + break; + + case SEQUENCE_FILE: + createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + break; + } + } + + @After + public void tearDown() throws IOException { + FileUtils.delete(inputDirectory); + } + + @Test + public void testExtractor() throws Exception { + Configuration conf = new Configuration(); + PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); + ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() { + private long index = 1L; + + @Override + public void writeArrayRecord(Object[] array) { + throw new AssertionError("Should not be writing array."); + } + + @Override + public void writeStringRecord(String text) { + Assert.assertEquals(index + "," + index + ".0,'" + index++ + "'", text); + } + + @Override + public void writeRecord(Object obj) { + throw new AssertionError("Should not be writing object."); + } + }, null); + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); + + extractor.extract(context, connConf, jobConf, partition); + } +} diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java new file mode 100644 index 00000000..0cc2b8b1 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; + +public class TestHdfsBase { + + protected HdfsPartition createPartition(Path[] paths) throws IOException { + long[] offsets = new long[paths.length]; + long[] lengths = new long[paths.length]; + String[] locations = new String[paths.length]; + FileSystem fs = FileSystem.get(new Configuration()); + + for (int i = 0; i < offsets.length; ++i) { + locations[i] = paths[i].getName(); + lengths[i] = fs.getFileStatus(paths[i]).getLen(); + } + + return new HdfsPartition(paths, offsets, lengths, locations); + } + + protected void createTextInput(String indir, + Class clz, + int numberOfFiles, + int numberOfRows) + throws IOException, InstantiationException, IllegalAccessException { + Configuration conf = new Configuration(); + + CompressionCodec codec = null; + String extension = ""; + if (clz != null) { + codec = clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + extension = codec.getDefaultExtension(); + } + + int index = 1; + for (int fi = 0; fi < numberOfFiles; fi++) { + String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension; + OutputStream filestream = FileUtils.create(fileName); + BufferedWriter filewriter; + if (codec != null) { + filewriter = new BufferedWriter(new OutputStreamWriter( + codec.createOutputStream(filestream, codec.createCompressor()), + "UTF-8")); + } else { + filewriter = new BufferedWriter(new OutputStreamWriter( + filestream, "UTF-8")); + } + + for (int ri = 0; ri < numberOfRows; ri++) { + String row = index + "," + (double)index + ",'" + index + "'"; + filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER); + index++; + } + + filewriter.close(); + } + } + + protected void createSequenceInput(String indir, + Class clz, + int numberOfFiles, + int numberOfRows) + throws IOException, InstantiationException, IllegalAccessException { + Configuration conf = new Configuration(); + + CompressionCodec codec = null; + if (clz != null) { + codec = clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } + + int index = 1; + for (int fi = 0; fi < numberOfFiles; fi++) { + Path filepath = new Path(indir, + "part-r-" + padZeros(fi, 5) + ".seq"); + SequenceFile.Writer filewriter; + if (codec != null) { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + SequenceFile.CompressionType.BLOCK, codec); + } else { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE); + } + + Text text = new Text(); + for (int ri = 0; ri < numberOfRows; ri++) { + String row = index + "," + (double)index + ",'" + index + "'"; + text.set(row); + filewriter.append(text, NullWritable.get()); + index++; + } + + filewriter.close(); + } + } + + private String padZeros(int number, int digits) { + String string = String.valueOf(number); + for (int i = (digits - string.length()); i > 0; i--) { + string = "0" + string; + } + return string; + } +} diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java new file mode 100644 index 00000000..79cf1f18 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.sqoop.common.PrefixContext; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.OutputCompression; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE; + +@RunWith(Parameterized.class) +public class TestLoader extends TestHdfsBase { + private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; + private static final int NUMBER_OF_ROWS_PER_FILE = 1000; + + private OutputFormat outputFormat; + private OutputCompression compression; + private final String outputDirectory; + private Loader loader; + + public TestLoader(OutputFormat outputFormat, + OutputCompression compression) + throws Exception { + this.outputDirectory = INPUT_ROOT + getClass().getSimpleName(); + this.outputFormat = outputFormat; + this.compression = compression; + this.loader = new HdfsLoader(); + } + + @Parameterized.Parameters + public static Collection data() { + List parameters = new ArrayList(); + for (OutputCompression compression : new OutputCompression[]{ + OutputCompression.DEFAULT, + OutputCompression.BZIP2, + OutputCompression.NONE + }) { + for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { + parameters.add(new Object[]{outputFileType, compression}); + } + } + return parameters; + } + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws IOException { + FileUtils.delete(outputDirectory); + } + + @Test + public void testLoader() throws Exception { + FileSystem fs = FileSystem.get(new Configuration()); + + Configuration conf = new Configuration(); + PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); + LoaderContext context = new LoaderContext(prefixContext, new DataReader() { + private long index = 0L; + + @Override + public Object[] readArrayRecord() { + return null; + } + + @Override + public String readTextRecord() { + if (index++ < NUMBER_OF_ROWS_PER_FILE) { + return index + "," + (double)index + ",'" + index + "'"; + } else { + return null; + } + } + + @Override + public Object readContent() { + return null; + } + }, null); + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + jobConf.output.outputDirectory = outputDirectory; + jobConf.output.compression = compression; + jobConf.output.outputFormat = outputFormat; + Path outputPath = new Path(outputDirectory); + + loader.load(context, connConf, jobConf); + Assert.assertEquals(1, fs.listStatus(outputPath).length); + + for (FileStatus status : fs.listStatus(outputPath)) { + verifyOutput(fs, status.getPath()); + } + + loader.load(context, connConf, jobConf); + Assert.assertEquals(2, fs.listStatus(outputPath).length); + loader.load(context, connConf, jobConf); + loader.load(context, connConf, jobConf); + loader.load(context, connConf, jobConf); + Assert.assertEquals(5, fs.listStatus(outputPath).length); + } + + private void verifyOutput(FileSystem fs, Path file) throws IOException { + Configuration conf = new Configuration(); + FSDataInputStream fsin = fs.open(file); + CompressionCodec codec; + + switch(outputFormat) { + case TEXT_FILE: + codec = (new CompressionCodecFactory(conf)).getCodec(file); + + // Verify compression + switch(compression) { + case BZIP2: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1); + break; + + case DEFAULT: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Deflate") != -1); + break; + + case NONE: + default: + Assert.assertNull(codec); + break; + } + + InputStreamReader in; + if (codec == null) { + in = new InputStreamReader(fsin); + } else { + in = new InputStreamReader(codec.createInputStream(fsin, codec.createDecompressor())); + } + BufferedReader textReader = new BufferedReader(in); + + for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) { + Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", textReader.readLine()); + } + break; + + case SEQUENCE_FILE: + SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, file, conf); + codec = sequenceReader.getCompressionCodec(); + + // Verify compression + switch(compression) { + case BZIP2: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1); + break; + + case DEFAULT: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1); + break; + + case NONE: + default: + Assert.assertNull(codec); + break; + } + + Text line = new Text(); + int index = 1; + while (sequenceReader.next(line)) { + Assert.assertEquals(index + "," + (double)index + ",'" + index++ + "'", line.toString()); + line = new Text(); + } + break; + } + } +} diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java new file mode 100644 index 00000000..ae93b0ab --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.sqoop.common.PrefixContext; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.*; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestPartitioner extends TestHdfsBase { + private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; + private static final int NUMBER_OF_FILES = 5; + private static final int NUMBER_OF_ROWS_PER_FILE = 1000; + + private OutputFormat outputFileType; + private Class compressionClass; + private Partitioner partitioner; + + private final String inputDirectory; + + public TestPartitioner(OutputFormat outputFileType, Class compressionClass) { + this.inputDirectory = INPUT_ROOT + getClass().getSimpleName(); + this.outputFileType = outputFileType; + this.compressionClass = compressionClass; + } + + @Before + public void setUp() throws Exception { + partitioner = new HdfsPartitioner(); + FileUtils.mkdirs(inputDirectory); + + switch (this.outputFileType) { + case TEXT_FILE: + createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + break; + + case SEQUENCE_FILE: + createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE); + break; + } + } + + @After + public void tearDown() throws IOException { + FileUtils.delete(inputDirectory); + } + + @Parameterized.Parameters + public static Collection data() { + List parameters = new ArrayList(); + for (Class compressionClass : new Class[]{null, DefaultCodec.class, BZip2Codec.class}) { + for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { + parameters.add(new Object[]{outputFileType, compressionClass}); + } + } + return parameters; + } + + @Test + public void testPartitioner() { + Configuration conf = new Configuration(); + PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); + PartitionerContext context = new PartitionerContext(prefixContext, 5, null); + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + jobConf.input.inputDirectory = inputDirectory; + + List partitions = partitioner.getPartitions(context, connConf, jobConf); + + if (this.compressionClass == null) { + assertEquals(5, partitions.size()); + } else { + assertEquals(3, partitions.size()); + } + } +} diff --git a/connector/connector-hdfs/src/test/resources/log4j.properties b/connector/connector-hdfs/src/test/resources/log4j.properties new file mode 100644 index 00000000..44ffced2 --- /dev/null +++ b/connector/connector-hdfs/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java deleted file mode 100644 index 2accf771..00000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job; - -import junit.framework.TestCase; - -//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -//import org.apache.sqoop.job.etl.HdfsExportExtractor; -//import org.apache.sqoop.job.etl.HdfsExportPartitioner; - -public class TestHdfsExtract extends TestCase { - -// private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; -// private static final int NUMBER_OF_FILES = 5; -// private static final int NUMBER_OF_ROWS_PER_FILE = 1000; -// -// private final String indir; -// -// public TestHdfsExtract() { -// indir = INPUT_ROOT + getClass().getSimpleName(); -// } -// -// @Override -// public void setUp() throws IOException { -// FileUtils.mkdirs(indir); -// } -// -// @Override -// public void tearDown() throws IOException { -// FileUtils.delete(indir); -// } -// -// /** -// * Test case for validating the number of partitions creation -// * based on input. -// * Success if the partitions list size is less or equal to -// * given max partition. -// * @throws Exception -// */ -// @Test -// public void testHdfsExportPartitioner() throws Exception { -// createTextInput(null); -// Configuration conf = new Configuration(); -// conf.set(JobConstants.HADOOP_INPUTDIR, indir); -// -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// HdfsExportPartitioner partitioner = new HdfsExportPartitioner(); -// PrefixContext prefixContext = new PrefixContext(conf, ""); -// int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17}; -// -// for(int maxPartitions : partitionValues) { -// PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null); -// List partitionList = partitioner.getPartitions(partCont, null, null); -// assertTrue(partitionList.size()<=maxPartitions); -// } -// } -// -// @Test -// public void testUncompressedText() throws Exception { -// createTextInput(null); -// -// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); -// } -// -// @Test -// public void testDefaultCompressedText() throws Exception { -// createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC); -// -// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); -// } -// -// @Test -// public void testBZip2CompressedText() throws Exception { -// createTextInput(BZip2Codec.class); -// -// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); -// } -// -// @Test -// public void testDefaultCompressedSequence() throws Exception { -// createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); -// -// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); -// } -// -// @Test -// public void testUncompressedSequence() throws Exception { -// createSequenceInput(null); -// -// JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); -// } -// -// private Schema createSchema() { -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// return schema; -// } -// -// private Configuration createConf() { -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); -// conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER, -// HdfsExportPartitioner.class.getName()); -// conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR, -// HdfsExportExtractor.class.getName()); -// conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); -// conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); -// conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir); -// return conf; -// } -// -// private Job createJob(Configuration conf, Schema schema) throws Exception { -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// return job; -// } -// -// private void createTextInput(Class clz) -// throws IOException, InstantiationException, IllegalAccessException { -// Configuration conf = new Configuration(); -// -// CompressionCodec codec = null; -// String extension = ""; -// if (clz != null) { -// codec = clz.newInstance(); -// if (codec instanceof Configurable) { -// ((Configurable) codec).setConf(conf); -// } -// extension = codec.getDefaultExtension(); -// } -// -// int index = 1; -// for (int fi=0; fi clz) -// throws IOException, InstantiationException, IllegalAccessException { -// Configuration conf = new Configuration(); -// -// CompressionCodec codec = null; -// if (clz != null) { -// codec = clz.newInstance(); -// if (codec instanceof Configurable) { -// ((Configurable) codec).setConf(conf); -// } -// } -// -// int index = 1; -// for (int fi=0; fi0; i--) { -// string = "0" + string; -// } -// return string; -// } -// -// public static class DummyLoader extends Loader { -// @Override -// public void load(LoaderContext context, Object oc, Object oj) throws Exception { -// int index = 1; -// int sum = 0; -// Object[] array; -// while ((array = context.getDataReader().readArrayRecord()) != null) { -// sum += Integer.valueOf(array[0].toString()); -// index++; -// }; -// -// int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE; -// assertEquals((1+numbers)*numbers/2, sum); -// -// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); -// } -// } - -} diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java deleted file mode 100644 index 8eba0490..00000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job; - -import junit.framework.TestCase; - -//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -//import org.apache.sqoop.job.etl.HdfsTextImportLoader; - -public class TestHdfsLoad extends TestCase { - -// private static final String OUTPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; -// private static final String OUTPUT_FILE = "part-r-00000"; -// private static final int START_ID = 1; -// private static final int NUMBER_OF_IDS = 9; -// private static final int NUMBER_OF_ROWS_PER_ID = 10; -// -// private String outdir; -// -// public TestHdfsLoad() { -// outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName(); -// } -// -// public void testUncompressedText() throws Exception { -// FileUtils.delete(outdir); -// -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// conf.set(JobConstants.HADOOP_OUTDIR, outdir); -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration()); -// -// String fileName = outdir + "/" + OUTPUT_FILE; -// InputStream filestream = FileUtils.open(fileName); -// BufferedReader filereader = new BufferedReader(new InputStreamReader( -// filestream, Charsets.UTF_8)); -// verifyOutputText(filereader); -// } -// -// public void testCompressedText() throws Exception { -// FileUtils.delete(outdir); -// -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// conf.set(JobConstants.HADOOP_OUTDIR, outdir); -// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); -// -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration()); -// -// Class codecClass = conf.getClass( -// JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) -// .asSubclass(CompressionCodec.class); -// CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); -// String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); -// InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); -// BufferedReader filereader = new BufferedReader(new InputStreamReader( -// filestream, Charsets.UTF_8)); -// verifyOutputText(filereader); -// } -// -// private void verifyOutputText(BufferedReader reader) throws IOException { -// String actual = null; -// String expected; -// Data data = new Data(); -// int index = START_ID*NUMBER_OF_ROWS_PER_ID; -// while ((actual = reader.readLine()) != null){ -// data.setContent(new Object[] { -// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, -// Data.ARRAY_RECORD); -// expected = data.toString(); -// index++; -// -// assertEquals(expected, actual); -// } -// reader.close(); -// -// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, -// index-START_ID*NUMBER_OF_ROWS_PER_ID); -// } -// -// public void testUncompressedSequence() throws Exception { -// FileUtils.delete(outdir); -// -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// conf.set(JobConstants.HADOOP_OUTDIR, outdir); -// -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration()); -// -// Path filepath = new Path(outdir, -// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); -// SequenceFile.Reader filereader = new SequenceFile.Reader( -// filepath.getFileSystem(conf), filepath, conf); -// verifyOutputSequence(filereader); -// } -// -// public void testCompressedSequence() throws Exception { -// FileUtils.delete(outdir); -// -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// conf.set(JobConstants.HADOOP_OUTDIR, outdir); -// conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); -// -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration()); -// Path filepath = new Path(outdir, -// OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); -// SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf); -// verifyOutputSequence(filereader); -// } -// -// private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException { -// int index = START_ID*NUMBER_OF_ROWS_PER_ID; -// Text actual = new Text(); -// Text expected = new Text(); -// Data data = new Data(); -// while (reader.next(actual)){ -// data.setContent(new Object[] { -// index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, -// Data.ARRAY_RECORD); -// expected.set(data.toString()); -// index++; -// -// assertEquals(expected.toString(), actual.toString()); -// } -// reader.close(); -// -// assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, -// index-START_ID*NUMBER_OF_ROWS_PER_ID); -// } -// -// public static class DummyPartition extends Partition { -// private int id; -// -// public void setId(int id) { -// this.id = id; -// } -// -// public int getId() { -// return id; -// } -// -// @Override -// public void readFields(DataInput in) throws IOException { -// id = in.readInt(); -// } -// -// @Override -// public void write(DataOutput out) throws IOException { -// out.writeInt(id); -// } -// -// @Override -// public String toString() { -// return Integer.toString(id); -// } -// } -// -// public static class DummyPartitioner extends Partitioner { -// @Override -// public List getPartitions(PartitionerContext context, Object oc, Object oj) { -// List partitions = new LinkedList(); -// for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { -// DummyPartition partition = new DummyPartition(); -// partition.setId(id); -// partitions.add(partition); -// } -// return partitions; -// } -// } -// -// public static class DummyExtractor extends Extractor { -// @Override -// public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { -// int id = ((DummyPartition)partition).getId(); -// for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { -// Object[] array = new Object[] { -// id * NUMBER_OF_ROWS_PER_ID + row, -// (double) (id * NUMBER_OF_ROWS_PER_ID + row), -// new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1) -// }; -// context.getDataWriter().writeArrayRecord(array); -// } -// } -// -// @Override -// public long getRowsRead() { -// return NUMBER_OF_ROWS_PER_ID; -// } -// } -}