diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java index 40c362c..3aa3aea 100644 --- a/common/src/main/java/org/apache/sqoop/schema/Schema.java +++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java @@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) { } public boolean isEmpty() { - if (columns.size()==0) { - return true; - } else { - return false; - } - + return columns.size() == 0; } public String toString() { diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index e0e4061..e65edd9 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -67,9 +67,15 @@ private final List stringFieldIndices = new ArrayList(); private final List byteFieldIndices = new ArrayList(); - private Schema schema; + public CSVIntermediateDataFormat() { + } + + public CSVIntermediateDataFormat(Schema schema) { + setSchema(schema); + } + /** * {@inheritDoc} */ @@ -166,7 +172,7 @@ public void setSchema(Schema schema) { */ @Override public Object[] getObjectData() { - if (schema.isEmpty()) { + if (schema == null || schema.isEmpty()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); } diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 72e95ed..fcf6c3c 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -41,11 +41,11 @@ private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - private IntermediateDataFormat data; + private IntermediateDataFormat dataFormat; @Before public void setUp() { - data = new CSVIntermediateDataFormat(); + dataFormat = new CSVIntermediateDataFormat(); } private String getByteFieldString(byte[] byteFieldData) { @@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) { public void testStringInStringOut() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'" + String.valueOf(0x0A) + "'"; - data.setTextData(testData); - assertEquals(testData, data.getTextData()); + dataFormat.setTextData(testData); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -74,10 +74,10 @@ public void testNullStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(null); + dataFormat.setSchema(schema); + dataFormat.setTextData(null); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertNull(out); } @@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(""); + dataFormat.setSchema(schema); + dataFormat.setTextData(""); - data.getObjectData(); + dataFormat.getObjectData(); } @Test @@ -111,10 +111,10 @@ public void testStringInObjectOut() { .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertEquals(new Long(10),out[0]); assertEquals(new Long(34),out[1]); @@ -134,7 +134,7 @@ public void testObjectInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -145,12 +145,12 @@ public void testObjectInStringOut() { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -164,7 +164,7 @@ public void testObjectInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); Object[] in = new Object[6]; in[0] = new Long(10); @@ -177,9 +177,9 @@ public void testObjectInObjectOut() { System.arraycopy(in,0,inCopy,0,in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test @@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertEquals(strData, data.getObjectData()[0]); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertEquals(strData, dataFormat.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] allCharByteArr = new byte[256]; for (int i = 0; i < allCharByteArr.length; ++i) { @@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + dataFormat.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testDate() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); - data.setTextData("2014-10-01"); - assertEquals("2014-10-01", data.getObjectData()[0].toString()); + dataFormat.setTextData("2014-10-01"); + assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); } @Test public void testDateTime() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01T12:00:00", "2014-10-01T12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -289,14 +289,14 @@ public void testDateTime() { public void testDateTimeISO8601Alternative() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01 12:00:00", "2014-10-01 12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() { public void testBit() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String trueBit : new String[]{ "true", "TRUE", "1" }) { - data.setTextData(trueBit); - assertTrue((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(trueBit); + assertTrue((Boolean) dataFormat.getObjectData()[0]); } for (String falseBit : new String[]{ "false", "FALSE", "0" }) { - data.setTextData(falseBit); - assertFalse((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(falseBit); + assertFalse((Boolean) dataFormat.getObjectData()[0]); } } @@ -326,9 +326,23 @@ public void testEmptySchema() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'\\n'"; Schema schema = new Schema("Test"); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNullSchema() { + dataFormat.setSchema(null); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNotSettingSchema() { + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java deleted file mode 100644 index 139883e..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +++ /dev/null @@ -1,529 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.regex.Matcher; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableUtils; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MRExecutionError; - -public class Data implements WritableComparable { - - // The content is an Object to accommodate different kinds of data. - // For example, it can be: - // - Object[] for an array of object record - // - String for a text of CSV record - private volatile Object content = null; - - public static final int EMPTY_DATA = 0; - public static final int CSV_RECORD = 1; - public static final int ARRAY_RECORD = 2; - private int type = EMPTY_DATA; - - public static final String CHARSET_NAME = "UTF-8"; - - public static final char DEFAULT_RECORD_DELIMITER = '\n'; - public static final char DEFAULT_FIELD_DELIMITER = ','; - public static final char DEFAULT_STRING_DELIMITER = '\''; - public static final char DEFAULT_STRING_ESCAPE = '\\'; - private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; - private char stringDelimiter = DEFAULT_STRING_DELIMITER; - private char stringEscape = DEFAULT_STRING_ESCAPE; - private String escapedStringDelimiter = String.valueOf(new char[] { - stringEscape, stringDelimiter - }); - - private int[] fieldTypes = null; - - public void setFieldDelimiter(char fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; - } - - public void setFieldTypes(int[] fieldTypes) { - this.fieldTypes = fieldTypes; - } - - public void setContent(Object content, int type) { - switch (type) { - case EMPTY_DATA: - case CSV_RECORD: - case ARRAY_RECORD: - this.type = type; - this.content = content; - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - public Object getContent(int targetType) { - switch (targetType) { - case CSV_RECORD: - return format(); - case ARRAY_RECORD: - return parse(); - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); - } - } - - public int getType() { - return type; - } - - public boolean isEmpty() { - return (type == EMPTY_DATA); - } - - @Override - public String toString() { - return (String)getContent(CSV_RECORD); - } - - @Override - public int compareTo(Data other) { - byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); - byte[] otherBytes = other.toString().getBytes( - Charset.forName(CHARSET_NAME)); - return WritableComparator.compareBytes( - myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof Data)) { - return false; - } - - Data data = (Data)other; - if (type != data.getType()) { - return false; - } - - return toString().equals(data.toString()); - } - - @Override - public int hashCode() { - int result = super.hashCode(); - switch (type) { - case CSV_RECORD: - result += 31 * content.hashCode(); - return result; - case ARRAY_RECORD: - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - result += 31 * array[i].hashCode(); - } - return result; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - type = readType(in); - switch (type) { - case CSV_RECORD: - readCsv(in); - break; - case ARRAY_RECORD: - readArray(in); - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - @Override - public void write(DataOutput out) throws IOException { - writeType(out, type); - switch (type) { - case CSV_RECORD: - writeCsv(out); - break; - case ARRAY_RECORD: - writeArray(out); - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private int readType(DataInput in) throws IOException { - return WritableUtils.readVInt(in); - } - - private void writeType(DataOutput out, int type) throws IOException { - WritableUtils.writeVInt(out, type); - } - - private void readCsv(DataInput in) throws IOException { - content = in.readUTF(); - } - - private void writeCsv(DataOutput out) throws IOException { - out.writeUTF((String)content); - } - - private void readArray(DataInput in) throws IOException { - // read number of columns - int columns = in.readInt(); - content = new Object[columns]; - Object[] array = (Object[])content; - // read each column - for (int i = 0; i < array.length; i++) { - int type = readType(in); - switch (type) { - case FieldTypes.UTF: - array[i] = in.readUTF(); - break; - - case FieldTypes.BIN: - int length = in.readInt(); - byte[] bytes = new byte[length]; - in.readFully(bytes); - array[i] = bytes; - break; - - case FieldTypes.DOUBLE: - array[i] = in.readDouble(); - break; - - case FieldTypes.FLOAT: - array[i] = in.readFloat(); - break; - - case FieldTypes.LONG: - array[i] = in.readLong(); - break; - - case FieldTypes.INT: - array[i] = in.readInt(); - break; - - case FieldTypes.SHORT: - array[i] = in.readShort(); - break; - - case FieldTypes.CHAR: - array[i] = in.readChar(); - break; - - case FieldTypes.BYTE: - array[i] = in.readByte(); - break; - - case FieldTypes.BOOLEAN: - array[i] = in.readBoolean(); - break; - - case FieldTypes.NULL: - array[i] = null; - break; - - default: - throw new IOException( - new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) - ); - } - } - } - - private void writeArray(DataOutput out) throws IOException { - Object[] array = (Object[])content; - // write number of columns - out.writeInt(array.length); - // write each column - for (int i = 0; i < array.length; i++) { - if (array[i] instanceof String) { - writeType(out, FieldTypes.UTF); - out.writeUTF((String)array[i]); - - } else if (array[i] instanceof byte[]) { - writeType(out, FieldTypes.BIN); - out.writeInt(((byte[])array[i]).length); - out.write((byte[])array[i]); - - } else if (array[i] instanceof Double) { - writeType(out, FieldTypes.DOUBLE); - out.writeDouble((Double)array[i]); - - } else if (array[i] instanceof Float) { - writeType(out, FieldTypes.FLOAT); - out.writeFloat((Float)array[i]); - - } else if (array[i] instanceof Long) { - writeType(out, FieldTypes.LONG); - out.writeLong((Long)array[i]); - - } else if (array[i] instanceof Integer) { - writeType(out, FieldTypes.INT); - out.writeInt((Integer)array[i]); - - } else if (array[i] instanceof Short) { - writeType(out, FieldTypes.SHORT); - out.writeShort((Short)array[i]); - - } else if (array[i] instanceof Character) { - writeType(out, FieldTypes.CHAR); - out.writeChar((Character)array[i]); - - } else if (array[i] instanceof Byte) { - writeType(out, FieldTypes.BYTE); - out.writeByte((Byte)array[i]); - - } else if (array[i] instanceof Boolean) { - writeType(out, FieldTypes.BOOLEAN); - out.writeBoolean((Boolean)array[i]); - - } else if (array[i] == null) { - writeType(out, FieldTypes.NULL); - - } else { - throw new IOException( - new SqoopException(MRExecutionError.MAPRED_EXEC_0012, - array[i].getClass().getName() - ) - ); - } - } - } - - private String format() { - switch (type) { - case EMPTY_DATA: - return null; - - case CSV_RECORD: - if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { - return (String)content; - } else { - // TODO: need to exclude the case where comma is part of a string. - return ((String)content).replaceAll( - String.valueOf(DEFAULT_FIELD_DELIMITER), - String.valueOf(fieldDelimiter)); - } - - case ARRAY_RECORD: - StringBuilder sb = new StringBuilder(); - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - if (i != 0) { - sb.append(fieldDelimiter); - } - - if (array[i] instanceof String) { - sb.append(stringDelimiter); - sb.append(escape((String)array[i])); - sb.append(stringDelimiter); - } else if (array[i] instanceof byte[]) { - sb.append(Arrays.toString((byte[])array[i])); - } else { - sb.append(String.valueOf(array[i])); - } - } - return sb.toString(); - - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private Object[] parse() { - switch (type) { - case EMPTY_DATA: - return null; - - case CSV_RECORD: - ArrayList list = new ArrayList(); - char[] record = ((String)content).toCharArray(); - int start = 0; - int position = start; - boolean stringDelimited = false; - boolean arrayDelimited = false; - int index = 0; - while (position < record.length) { - if (record[position] == fieldDelimiter) { - if (!stringDelimited && !arrayDelimited) { - index = parseField(list, record, start, position, index); - start = position + 1; - } - } else if (record[position] == stringDelimiter) { - if (!stringDelimited) { - stringDelimited = true; - } - else if (position > 0 && record[position-1] != stringEscape) { - stringDelimited = false; - } - } else if (record[position] == '[') { - if (!stringDelimited) { - arrayDelimited = true; - } - } else if (record[position] == ']') { - if (!stringDelimited) { - arrayDelimited = false; - } - } - position++; - } - parseField(list, record, start, position, index); - return list.toArray(); - - case ARRAY_RECORD: - return (Object[])content; - - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private int parseField(ArrayList list, char[] record, - int start, int end, int index) { - String field = String.valueOf(record, start, end-start).trim(); - - int fieldType; - if (fieldTypes == null) { - fieldType = guessType(field); - } else { - fieldType = fieldTypes[index]; - } - - switch (fieldType) { - case FieldTypes.UTF: - if (field.charAt(0) != stringDelimiter || - field.charAt(field.length()-1) != stringDelimiter) { - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); - } - list.add(index, unescape(field.substring(1, field.length()-1))); - break; - - case FieldTypes.BIN: - if (field.charAt(0) != '[' || - field.charAt(field.length()-1) != ']') { - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); - } - String[] splits = - field.substring(1, field.length()-1).split(String.valueOf(',')); - byte[] bytes = new byte[splits.length]; - for (int i=0; i> input, - Class> mapper, - Class> output) - throws IOException, InterruptedException, ClassNotFoundException { - Job job = new Job(conf); - job.setInputFormatClass(input); - job.setMapperClass(mapper); - job.setMapOutputKeyClass(SqoopWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(output); - job.setOutputKeyClass(SqoopWritable.class); - job.setOutputValueClass(NullWritable.class); - - boolean ret = job.waitForCompletion(true); - - // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner - if (isHadoop1()) { - callOutputCommitter(job, output); - } - - return ret; - } - - /** - * Call output format on given job manually. - */ - private static void callOutputCommitter(Job job, Class> outputFormat) throws IOException, InterruptedException { - OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null); - - JobContext jobContext = mock(JobContext.class); - when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); - - committer.commitJob(jobContext); - } - - /** - * Detect Hadoop 1.0 installation - * - * @return True if and only if this is Hadoop 1 and below - */ - public static boolean isHadoop1() { - String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b0\\.20\\..+\\b") - || version.matches("\\b1\\.\\d\\.\\d")) { - return true; - } - return false; - } - - private JobUtils() { - // Disable explicit object creation - } - -} diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 78ae4ec..bbac7d2 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -37,6 +37,7 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.Extractor; @@ -46,17 +47,13 @@ import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopSplit; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.job.util.MRJobTestUtil; import org.junit.Assert; import org.junit.Test; @@ -67,11 +64,10 @@ private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; @Test - public void testInputFormat() throws Exception { + public void testSqoopInputFormat() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, - CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); SqoopInputFormat inputformat = new SqoopInputFormat(); @@ -79,51 +75,47 @@ public void testInputFormat() throws Exception { assertEquals(9, splits.size()); for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { - SqoopSplit split = (SqoopSplit)splits.get(id-1); - DummyPartition partition = (DummyPartition)split.getPartition(); + SqoopSplit split = (SqoopSplit) splits.get(id - 1); + DummyPartition partition = (DummyPartition) split.getPartition(); assertEquals(id, partition.getId()); } } @Test - public void testMapper() throws Exception { + public void testSqoopMapper() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, - CSVIntermediateDataFormat.class.getName()); - Schema schema = new Schema("Test"); - schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) - .addColumn(new org.apache.sqoop.schema.type.Text("3")); - + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); - MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); - boolean success = JobUtils.runJob(job.getConfiguration(), - SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); + // from and to have the same schema in this test case + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); + boolean success = MRJobTestUtil.runJob(job.getConfiguration(), + SqoopInputFormat.class, + SqoopMapper.class, + DummyOutputFormat.class); Assert.assertEquals("Job failed!", true, success); } @Test - public void testOutputFormat() throws Exception { + public void testNullOutputFormat() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, - CSVIntermediateDataFormat.class.getName()); - Schema schema = new Schema("Test"); - schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) - .addColumn(new Text("3")); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); - MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); - boolean success = JobUtils.runJob(job.getConfiguration(), - SqoopInputFormat.class, SqoopMapper.class, - SqoopNullOutputFormat.class); + // from and to have the same schema in this test case + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); + boolean success = MRJobTestUtil.runJob(job.getConfiguration(), + SqoopInputFormat.class, + SqoopMapper.class, + SqoopNullOutputFormat.class); Assert.assertEquals("Job failed!", true, success); // Make sure both destroyers get called. @@ -171,15 +163,17 @@ public String toString() { } } - public static class DummyExtractor extends Extractor { + public static class DummyExtractor extends + Extractor { @Override - public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { - int id = ((DummyPartition)partition).getId(); + public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, + DummyPartition partition) { + int id = ((DummyPartition) partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { - context.getDataWriter().writeArrayRecord(new Object[] { - id * NUMBER_OF_ROWS_PER_PARTITION + row, - (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), - String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + context.getDataWriter().writeArrayRecord( + new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row, + (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), + String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) }); } } @@ -189,16 +183,14 @@ public long getRowsRead() { } } - public static class DummyOutputFormat - extends OutputFormat { + public static class DummyOutputFormat extends OutputFormat { @Override public void checkOutputSpecs(JobContext context) { // do nothing } @Override - public RecordWriter getRecordWriter( - TaskAttemptContext context) { + public RecordWriter getRecordWriter(TaskAttemptContext context) { return new DummyRecordWriter(); } @@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { return new DummyOutputCommitter(); } - public static class DummyRecordWriter - extends RecordWriter { - private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data data = new Data(); + public static class DummyRecordWriter extends RecordWriter { + private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; + // should I use a dummy IDF for testing? + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override public void write(SqoopWritable key, NullWritable value) { - - data.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - - assertEquals(data.toString(), key.toString()); + assertEquals(dataFormat.getTextData().toString(), key.toString()); } @Override @@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) { public static class DummyOutputCommitter extends OutputCommitter { @Override - public void setupJob(JobContext jobContext) { } + public void setupJob(JobContext jobContext) { + } @Override - public void setupTask(TaskAttemptContext taskContext) { } + public void setupTask(TaskAttemptContext taskContext) { + } @Override - public void commitTask(TaskAttemptContext taskContext) { } + public void commitTask(TaskAttemptContext taskContext) { + } @Override - public void abortTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { + } @Override public boolean needsTaskCommit(TaskAttemptContext taskContext) { @@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { } } + // it is writing to the target. public static class DummyLoader extends Loader { - private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data expected = new Data(); + private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override - public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ + public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) + throws Exception { String data; while ((data = context.getDataReader().readTextRecord()) != null) { - expected.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - assertEquals(expected.toString(), data); + assertEquals(dataFormat.getTextData().toString(), data); } } } public static class DummyFromDestroyer extends Destroyer { - public static int count = 0; - @Override public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; } } - public static class DummyToDestroyer extends Destroyer { - + public static class DummyToDestroyer extends Destroyer { public static int count = 0; - @Override public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 04fb692..a64a4a6 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -38,16 +38,17 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.util.MRJobTestUtil; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -121,6 +122,7 @@ public TestMatching(Schema from, return parameters; } + @SuppressWarnings("deprecation") @Test public void testSchemaMatching() throws Exception { Configuration conf = new Configuration(); @@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception { Job job = new Job(conf); MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); - JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); - boolean success = JobUtils.runJob(job.getConfiguration(), + boolean success = MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); if (from.getName().split("-")[1].equals("EMPTY")) { @@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { public static class DummyRecordWriter extends RecordWriter { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data data = new Data(); + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override public void write(SqoopWritable key, NullWritable value) { - - data.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - - assertEquals(data.toString(), key.toString()); + assertEquals(dataFormat.getTextData().toString(), key.toString()); } @Override diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java deleted file mode 100644 index 68ce5ed..0000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.job.io; - -import com.google.common.base.Charsets; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.MRJobConstants; -import org.junit.Assert; -import org.junit.Test; - -public class SqoopWritableTest { - - private final SqoopWritable writable = new SqoopWritable(); - - @Test - public void testStringInStringOut() { - String testData = "Live Long and prosper"; - writable.setString(testData); - Assert.assertEquals(testData,writable.getString()); - } - - @Test - public void testDataWritten() throws IOException { - String testData = "One ring to rule them all"; - byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); - writable.setString(testData); - ByteArrayOutputStream ostream = new ByteArrayOutputStream(); - DataOutput out = new DataOutputStream(ostream); - writable.write(out); - byte[] written = ostream.toByteArray(); - InputStream instream = new ByteArrayInputStream(written); - DataInput in = new DataInputStream(instream); - String readData = in.readUTF(); - Assert.assertEquals(testData, readData); - } - - @Test - public void testDataRead() throws IOException { - String testData = "Brandywine Bridge - 20 miles!"; - ByteArrayOutputStream ostream = new ByteArrayOutputStream(); - DataOutput out = new DataOutputStream(ostream); - out.writeUTF(testData); - InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); - DataInput in = new DataInputStream(instream); - writable.readFields(in); - Assert.assertEquals(testData, writable.getString()); - } - - @Test - public void testWriteReadUsingStream() throws IOException { - String testData = "You shall not pass"; - ByteArrayOutputStream ostream = new ByteArrayOutputStream(); - DataOutput out = new DataOutputStream(ostream); - writable.setString(testData); - writable.write(out); - byte[] written = ostream.toByteArray(); - - //Don't test what the data is, test that SqoopWritable can read it. - InputStream instream = new ByteArrayInputStream(written); - SqoopWritable newWritable = new SqoopWritable(); - DataInput in = new DataInputStream(instream); - newWritable.readFields(in); - Assert.assertEquals(testData, newWritable.getString()); - ostream.close(); - instream.close(); - } - -} diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java deleted file mode 100644 index 4e23bcb..0000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -import java.util.Arrays; - -import org.junit.Assert; -import org.junit.Test; - -public class TestData { - - private static final double TEST_NUMBER = Math.PI + 100; - @Test - public void testArrayToCsv() throws Exception { - Data data = new Data(); - String expected; - String actual; - - // with special characters: - expected = - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}); - data.setContent(new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - String.valueOf(TEST_NUMBER) + "',s", - new byte[] {1, 2, 3, 4, 5} }, - Data.ARRAY_RECORD); - actual = (String)data.getContent(Data.CSV_RECORD); - assertEquals(expected, actual); - - // with null characters: - expected = - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "null" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}); - data.setContent(new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - null, - new byte[] {1, 2, 3, 4, 5} }, - Data.ARRAY_RECORD); - actual = (String)data.getContent(Data.CSV_RECORD); - assertEquals(expected, actual); - } - - @Test - public void testCsvToArray() throws Exception { - Data data = new Data(); - Object[] expected; - Object[] actual; - - // with special characters: - expected = new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - String.valueOf(TEST_NUMBER) + "',s", - new byte[] {1, 2, 3, 4, 5} }; - data.setContent( - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}), - Data.CSV_RECORD); - actual = (Object[])data.getContent(Data.ARRAY_RECORD); - assertEquals(expected.length, actual.length); - for (int c=0; c writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); try { for (int count = 0; count < 100; count++) { - data.setTextData(String.valueOf(count)); - writable.setString(data.getTextData()); + dataFormat.setTextData(String.valueOf(count)); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } } catch (SqoopException ex) { @@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); @@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); //Allow writer to complete. @@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); try { for (int i = 0; i < 10; i++) { @@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java new file mode 100644 index 0000000..5d5359e --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.job.io.SqoopWritable; +import org.apache.sqoop.job.mr.SqoopSplit; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.utils.ClassUtils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MRJobTestUtil { + + @SuppressWarnings("deprecation") + public static boolean runJob(Configuration conf, + Class> input, + Class> mapper, + Class> output) throws IOException, + InterruptedException, ClassNotFoundException { + Job job = new Job(conf); + job.setInputFormatClass(input); + job.setMapperClass(mapper); + job.setMapOutputKeyClass(SqoopWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(output); + job.setOutputKeyClass(SqoopWritable.class); + job.setOutputValueClass(NullWritable.class); + + boolean ret = job.waitForCompletion(true); + + // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in + // LocalJobRuner + if (isHadoop1()) { + callOutputCommitter(job, output); + } + + return ret; + } + + public static Schema getTestSchema() { + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new Text("3")); + return schema; + } + + public static IntermediateDataFormat getTestIDF() { + return new CSVIntermediateDataFormat(getTestSchema()); + } + + /** + * Call output format on given job manually. + */ + private static void callOutputCommitter(Job job, + Class> outputFormat) throws IOException, + InterruptedException { + OutputCommitter committer = ((OutputFormat) ClassUtils.instantiate(outputFormat)) + .getOutputCommitter(null); + + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); + + committer.commitJob(jobContext); + } + + /** + * Detect Hadoop 1.0 installation + * + * @return True if and only if this is Hadoop 1 and below + */ + public static boolean isHadoop1() { + String version = org.apache.hadoop.util.VersionInfo.getVersion(); + if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) { + return true; + } + return false; + } + + private MRJobTestUtil() { + // Disable explicit object creation + } + +}