diff --git a/SQOOP-1348.patch b/SQOOP-1348.patch new file mode 100644 index 00000000..7834a3f3 --- /dev/null +++ b/SQOOP-1348.patch @@ -0,0 +1,1844 @@ +diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java +index 40c362c..3aa3aea 100644 +--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java ++++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java +@@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) { + } + + public boolean isEmpty() { +- if (columns.size()==0) { +- return true; +- } else { +- return false; +- } +- ++ return columns.size() == 0; + } + + public String toString() { +diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +index e0e4061..e65edd9 100644 +--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ++++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +@@ -67,9 +67,15 @@ + + private final List stringFieldIndices = new ArrayList(); + private final List byteFieldIndices = new ArrayList(); +- + private Schema schema; + ++ public CSVIntermediateDataFormat() { ++ } ++ ++ public CSVIntermediateDataFormat(Schema schema) { ++ setSchema(schema); ++ } ++ + /** + * {@inheritDoc} + */ +@@ -166,7 +172,7 @@ public void setSchema(Schema schema) { + */ + @Override + public Object[] getObjectData() { +- if (schema.isEmpty()) { ++ if (schema == null || schema.isEmpty()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + } + +diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +index 72e95ed..fcf6c3c 100644 +--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ++++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +@@ -41,11 +41,11 @@ + + private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; + +- private IntermediateDataFormat data; ++ private IntermediateDataFormat dataFormat; + + @Before + public void setUp() { +- data = new CSVIntermediateDataFormat(); ++ dataFormat = new CSVIntermediateDataFormat(); + } + + private String getByteFieldString(byte[] byteFieldData) { +@@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) { + public void testStringInStringOut() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'" + String.valueOf(0x0A) + "'"; +- data.setTextData(testData); +- assertEquals(testData, data.getTextData()); ++ dataFormat.setTextData(testData); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -74,10 +74,10 @@ public void testNullStringInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); +- data.setTextData(null); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(null); + +- Object[] out = data.getObjectData(); ++ Object[] out = dataFormat.getObjectData(); + + assertNull(out); + } +@@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); +- data.setTextData(""); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(""); + +- data.getObjectData(); ++ dataFormat.getObjectData(); + } + + @Test +@@ -111,10 +111,10 @@ public void testStringInObjectOut() { + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + +- data.setSchema(schema); +- data.setTextData(testData); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(testData); + +- Object[] out = data.getObjectData(); ++ Object[] out = dataFormat.getObjectData(); + + assertEquals(new Long(10),out[0]); + assertEquals(new Long(34),out[1]); +@@ -134,7 +134,7 @@ public void testObjectInStringOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; +@@ -145,12 +145,12 @@ public void testObjectInStringOut() { + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; +- assertEquals(testData, data.getTextData()); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -164,7 +164,7 @@ public void testObjectInObjectOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + Object[] in = new Object[6]; + in[0] = new Long(10); +@@ -177,9 +177,9 @@ public void testObjectInObjectOut() { + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test +@@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() { + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; +@@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() { + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,NULL,'random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; +- assertEquals(testData, data.getTextData()); ++ assertEquals(testData, dataFormat.getTextData()); + } + + @Test +@@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Text("1")); + +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + char[] allCharArr = new char[256]; + for(int i = 0; i < allCharArr.length; ++i) { +@@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() { + System.arraycopy(in, 0, inCopy, 0, in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); ++ dataFormat.setObjectData(in); + +- assertEquals(strData, data.getObjectData()[0]); +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ assertEquals(strData, dataFormat.getObjectData()[0]); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test + public void testByteArrayFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Binary("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + byte[] allCharByteArr = new byte[256]; + for (int i = 0; i < allCharByteArr.length; ++i) { +@@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() { + System.arraycopy(in, 0, inCopy, 0, in.length); + + // Modifies the input array, so we use the copy to confirm +- data.setObjectData(in); +- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); ++ dataFormat.setObjectData(in); ++ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); + } + + @Test + public void testDate() { + Schema schema = new Schema("test"); + schema.addColumn(new Date("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + +- data.setTextData("2014-10-01"); +- assertEquals("2014-10-01", data.getObjectData()[0].toString()); ++ dataFormat.setTextData("2014-10-01"); ++ assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); + } + + @Test + public void testDateTime() { + Schema schema = new Schema("test"); + schema.addColumn(new DateTime("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String dateTime : new String[]{ + "2014-10-01T12:00:00", + "2014-10-01T12:00:00.000" + }) { +- data.setTextData(dateTime); +- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); ++ dataFormat.setTextData(dateTime); ++ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); + } + } + +@@ -289,14 +289,14 @@ public void testDateTime() { + public void testDateTimeISO8601Alternative() { + Schema schema = new Schema("test"); + schema.addColumn(new DateTime("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String dateTime : new String[]{ + "2014-10-01 12:00:00", + "2014-10-01 12:00:00.000" + }) { +- data.setTextData(dateTime); +- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); ++ dataFormat.setTextData(dateTime); ++ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); + } + } + +@@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() { + public void testBit() { + Schema schema = new Schema("test"); + schema.addColumn(new Bit("1")); +- data.setSchema(schema); ++ dataFormat.setSchema(schema); + + for (String trueBit : new String[]{ + "true", "TRUE", "1" + }) { +- data.setTextData(trueBit); +- assertTrue((Boolean) data.getObjectData()[0]); ++ dataFormat.setTextData(trueBit); ++ assertTrue((Boolean) dataFormat.getObjectData()[0]); + } + + for (String falseBit : new String[]{ + "false", "FALSE", "0" + }) { +- data.setTextData(falseBit); +- assertFalse((Boolean) data.getObjectData()[0]); ++ dataFormat.setTextData(falseBit); ++ assertFalse((Boolean) dataFormat.getObjectData()[0]); + } + } + +@@ -326,9 +326,23 @@ public void testEmptySchema() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("Test"); +- data.setSchema(schema); +- data.setTextData(testData); ++ dataFormat.setSchema(schema); ++ dataFormat.setTextData(testData); + +- Object[] out = data.getObjectData(); ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); ++ } ++ ++ @Test(expected = SqoopException.class) ++ public void testNullSchema() { ++ dataFormat.setSchema(null); ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); ++ } ++ ++ @Test(expected = SqoopException.class) ++ public void testNotSettingSchema() { ++ @SuppressWarnings("unused") ++ Object[] out = dataFormat.getObjectData(); + } + } +diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +deleted file mode 100644 +index 139883e..0000000 +--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java ++++ /dev/null +@@ -1,529 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import java.io.DataInput; +-import java.io.DataOutput; +-import java.io.IOException; +-import java.nio.charset.Charset; +-import java.util.ArrayList; +-import java.util.Arrays; +-import java.util.regex.Matcher; +- +-import org.apache.hadoop.io.WritableComparable; +-import org.apache.hadoop.io.WritableComparator; +-import org.apache.hadoop.io.WritableUtils; +-import org.apache.sqoop.common.SqoopException; +-import org.apache.sqoop.job.MRExecutionError; +- +-public class Data implements WritableComparable { +- +- // The content is an Object to accommodate different kinds of data. +- // For example, it can be: +- // - Object[] for an array of object record +- // - String for a text of CSV record +- private volatile Object content = null; +- +- public static final int EMPTY_DATA = 0; +- public static final int CSV_RECORD = 1; +- public static final int ARRAY_RECORD = 2; +- private int type = EMPTY_DATA; +- +- public static final String CHARSET_NAME = "UTF-8"; +- +- public static final char DEFAULT_RECORD_DELIMITER = '\n'; +- public static final char DEFAULT_FIELD_DELIMITER = ','; +- public static final char DEFAULT_STRING_DELIMITER = '\''; +- public static final char DEFAULT_STRING_ESCAPE = '\\'; +- private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; +- private char stringDelimiter = DEFAULT_STRING_DELIMITER; +- private char stringEscape = DEFAULT_STRING_ESCAPE; +- private String escapedStringDelimiter = String.valueOf(new char[] { +- stringEscape, stringDelimiter +- }); +- +- private int[] fieldTypes = null; +- +- public void setFieldDelimiter(char fieldDelimiter) { +- this.fieldDelimiter = fieldDelimiter; +- } +- +- public void setFieldTypes(int[] fieldTypes) { +- this.fieldTypes = fieldTypes; +- } +- +- public void setContent(Object content, int type) { +- switch (type) { +- case EMPTY_DATA: +- case CSV_RECORD: +- case ARRAY_RECORD: +- this.type = type; +- this.content = content; +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- public Object getContent(int targetType) { +- switch (targetType) { +- case CSV_RECORD: +- return format(); +- case ARRAY_RECORD: +- return parse(); +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); +- } +- } +- +- public int getType() { +- return type; +- } +- +- public boolean isEmpty() { +- return (type == EMPTY_DATA); +- } +- +- @Override +- public String toString() { +- return (String)getContent(CSV_RECORD); +- } +- +- @Override +- public int compareTo(Data other) { +- byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); +- byte[] otherBytes = other.toString().getBytes( +- Charset.forName(CHARSET_NAME)); +- return WritableComparator.compareBytes( +- myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); +- } +- +- @Override +- public boolean equals(Object other) { +- if (!(other instanceof Data)) { +- return false; +- } +- +- Data data = (Data)other; +- if (type != data.getType()) { +- return false; +- } +- +- return toString().equals(data.toString()); +- } +- +- @Override +- public int hashCode() { +- int result = super.hashCode(); +- switch (type) { +- case CSV_RECORD: +- result += 31 * content.hashCode(); +- return result; +- case ARRAY_RECORD: +- Object[] array = (Object[])content; +- for (int i = 0; i < array.length; i++) { +- result += 31 * array[i].hashCode(); +- } +- return result; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- @Override +- public void readFields(DataInput in) throws IOException { +- type = readType(in); +- switch (type) { +- case CSV_RECORD: +- readCsv(in); +- break; +- case ARRAY_RECORD: +- readArray(in); +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- @Override +- public void write(DataOutput out) throws IOException { +- writeType(out, type); +- switch (type) { +- case CSV_RECORD: +- writeCsv(out); +- break; +- case ARRAY_RECORD: +- writeArray(out); +- break; +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private int readType(DataInput in) throws IOException { +- return WritableUtils.readVInt(in); +- } +- +- private void writeType(DataOutput out, int type) throws IOException { +- WritableUtils.writeVInt(out, type); +- } +- +- private void readCsv(DataInput in) throws IOException { +- content = in.readUTF(); +- } +- +- private void writeCsv(DataOutput out) throws IOException { +- out.writeUTF((String)content); +- } +- +- private void readArray(DataInput in) throws IOException { +- // read number of columns +- int columns = in.readInt(); +- content = new Object[columns]; +- Object[] array = (Object[])content; +- // read each column +- for (int i = 0; i < array.length; i++) { +- int type = readType(in); +- switch (type) { +- case FieldTypes.UTF: +- array[i] = in.readUTF(); +- break; +- +- case FieldTypes.BIN: +- int length = in.readInt(); +- byte[] bytes = new byte[length]; +- in.readFully(bytes); +- array[i] = bytes; +- break; +- +- case FieldTypes.DOUBLE: +- array[i] = in.readDouble(); +- break; +- +- case FieldTypes.FLOAT: +- array[i] = in.readFloat(); +- break; +- +- case FieldTypes.LONG: +- array[i] = in.readLong(); +- break; +- +- case FieldTypes.INT: +- array[i] = in.readInt(); +- break; +- +- case FieldTypes.SHORT: +- array[i] = in.readShort(); +- break; +- +- case FieldTypes.CHAR: +- array[i] = in.readChar(); +- break; +- +- case FieldTypes.BYTE: +- array[i] = in.readByte(); +- break; +- +- case FieldTypes.BOOLEAN: +- array[i] = in.readBoolean(); +- break; +- +- case FieldTypes.NULL: +- array[i] = null; +- break; +- +- default: +- throw new IOException( +- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) +- ); +- } +- } +- } +- +- private void writeArray(DataOutput out) throws IOException { +- Object[] array = (Object[])content; +- // write number of columns +- out.writeInt(array.length); +- // write each column +- for (int i = 0; i < array.length; i++) { +- if (array[i] instanceof String) { +- writeType(out, FieldTypes.UTF); +- out.writeUTF((String)array[i]); +- +- } else if (array[i] instanceof byte[]) { +- writeType(out, FieldTypes.BIN); +- out.writeInt(((byte[])array[i]).length); +- out.write((byte[])array[i]); +- +- } else if (array[i] instanceof Double) { +- writeType(out, FieldTypes.DOUBLE); +- out.writeDouble((Double)array[i]); +- +- } else if (array[i] instanceof Float) { +- writeType(out, FieldTypes.FLOAT); +- out.writeFloat((Float)array[i]); +- +- } else if (array[i] instanceof Long) { +- writeType(out, FieldTypes.LONG); +- out.writeLong((Long)array[i]); +- +- } else if (array[i] instanceof Integer) { +- writeType(out, FieldTypes.INT); +- out.writeInt((Integer)array[i]); +- +- } else if (array[i] instanceof Short) { +- writeType(out, FieldTypes.SHORT); +- out.writeShort((Short)array[i]); +- +- } else if (array[i] instanceof Character) { +- writeType(out, FieldTypes.CHAR); +- out.writeChar((Character)array[i]); +- +- } else if (array[i] instanceof Byte) { +- writeType(out, FieldTypes.BYTE); +- out.writeByte((Byte)array[i]); +- +- } else if (array[i] instanceof Boolean) { +- writeType(out, FieldTypes.BOOLEAN); +- out.writeBoolean((Boolean)array[i]); +- +- } else if (array[i] == null) { +- writeType(out, FieldTypes.NULL); +- +- } else { +- throw new IOException( +- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, +- array[i].getClass().getName() +- ) +- ); +- } +- } +- } +- +- private String format() { +- switch (type) { +- case EMPTY_DATA: +- return null; +- +- case CSV_RECORD: +- if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { +- return (String)content; +- } else { +- // TODO: need to exclude the case where comma is part of a string. +- return ((String)content).replaceAll( +- String.valueOf(DEFAULT_FIELD_DELIMITER), +- String.valueOf(fieldDelimiter)); +- } +- +- case ARRAY_RECORD: +- StringBuilder sb = new StringBuilder(); +- Object[] array = (Object[])content; +- for (int i = 0; i < array.length; i++) { +- if (i != 0) { +- sb.append(fieldDelimiter); +- } +- +- if (array[i] instanceof String) { +- sb.append(stringDelimiter); +- sb.append(escape((String)array[i])); +- sb.append(stringDelimiter); +- } else if (array[i] instanceof byte[]) { +- sb.append(Arrays.toString((byte[])array[i])); +- } else { +- sb.append(String.valueOf(array[i])); +- } +- } +- return sb.toString(); +- +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private Object[] parse() { +- switch (type) { +- case EMPTY_DATA: +- return null; +- +- case CSV_RECORD: +- ArrayList list = new ArrayList(); +- char[] record = ((String)content).toCharArray(); +- int start = 0; +- int position = start; +- boolean stringDelimited = false; +- boolean arrayDelimited = false; +- int index = 0; +- while (position < record.length) { +- if (record[position] == fieldDelimiter) { +- if (!stringDelimited && !arrayDelimited) { +- index = parseField(list, record, start, position, index); +- start = position + 1; +- } +- } else if (record[position] == stringDelimiter) { +- if (!stringDelimited) { +- stringDelimited = true; +- } +- else if (position > 0 && record[position-1] != stringEscape) { +- stringDelimited = false; +- } +- } else if (record[position] == '[') { +- if (!stringDelimited) { +- arrayDelimited = true; +- } +- } else if (record[position] == ']') { +- if (!stringDelimited) { +- arrayDelimited = false; +- } +- } +- position++; +- } +- parseField(list, record, start, position, index); +- return list.toArray(); +- +- case ARRAY_RECORD: +- return (Object[])content; +- +- default: +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); +- } +- } +- +- private int parseField(ArrayList list, char[] record, +- int start, int end, int index) { +- String field = String.valueOf(record, start, end-start).trim(); +- +- int fieldType; +- if (fieldTypes == null) { +- fieldType = guessType(field); +- } else { +- fieldType = fieldTypes[index]; +- } +- +- switch (fieldType) { +- case FieldTypes.UTF: +- if (field.charAt(0) != stringDelimiter || +- field.charAt(field.length()-1) != stringDelimiter) { +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); +- } +- list.add(index, unescape(field.substring(1, field.length()-1))); +- break; +- +- case FieldTypes.BIN: +- if (field.charAt(0) != '[' || +- field.charAt(field.length()-1) != ']') { +- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); +- } +- String[] splits = +- field.substring(1, field.length()-1).split(String.valueOf(',')); +- byte[] bytes = new byte[splits.length]; +- for (int i=0; i> input, +- Class> mapper, +- Class> output) +- throws IOException, InterruptedException, ClassNotFoundException { +- Job job = new Job(conf); +- job.setInputFormatClass(input); +- job.setMapperClass(mapper); +- job.setMapOutputKeyClass(SqoopWritable.class); +- job.setMapOutputValueClass(NullWritable.class); +- job.setOutputFormatClass(output); +- job.setOutputKeyClass(SqoopWritable.class); +- job.setOutputValueClass(NullWritable.class); +- +- boolean ret = job.waitForCompletion(true); +- +- // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner +- if (isHadoop1()) { +- callOutputCommitter(job, output); +- } +- +- return ret; +- } +- +- /** +- * Call output format on given job manually. +- */ +- private static void callOutputCommitter(Job job, Class> outputFormat) throws IOException, InterruptedException { +- OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null); +- +- JobContext jobContext = mock(JobContext.class); +- when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); +- +- committer.commitJob(jobContext); +- } +- +- /** +- * Detect Hadoop 1.0 installation +- * +- * @return True if and only if this is Hadoop 1 and below +- */ +- public static boolean isHadoop1() { +- String version = org.apache.hadoop.util.VersionInfo.getVersion(); +- if (version.matches("\\b0\\.20\\..+\\b") +- || version.matches("\\b1\\.\\d\\.\\d")) { +- return true; +- } +- return false; +- } +- +- private JobUtils() { +- // Disable explicit object creation +- } +- +-} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +index 78ae4ec..bbac7d2 100644 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +@@ -37,6 +37,7 @@ + import org.apache.sqoop.common.Direction; + import org.apache.sqoop.connector.common.EmptyConfiguration; + import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; + import org.apache.sqoop.job.etl.Destroyer; + import org.apache.sqoop.job.etl.DestroyerContext; + import org.apache.sqoop.job.etl.Extractor; +@@ -46,17 +47,13 @@ + import org.apache.sqoop.job.etl.Partition; + import org.apache.sqoop.job.etl.Partitioner; + import org.apache.sqoop.job.etl.PartitionerContext; +-import org.apache.sqoop.job.io.Data; + import org.apache.sqoop.job.io.SqoopWritable; + import org.apache.sqoop.job.mr.MRConfigurationUtils; + import org.apache.sqoop.job.mr.SqoopInputFormat; + import org.apache.sqoop.job.mr.SqoopMapper; + import org.apache.sqoop.job.mr.SqoopNullOutputFormat; + import org.apache.sqoop.job.mr.SqoopSplit; +-import org.apache.sqoop.schema.Schema; +-import org.apache.sqoop.schema.type.FixedPoint; +-import org.apache.sqoop.schema.type.FloatingPoint; +-import org.apache.sqoop.schema.type.Text; ++import org.apache.sqoop.job.util.MRJobTestUtil; + import org.junit.Assert; + import org.junit.Test; + +@@ -67,11 +64,10 @@ + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + @Test +- public void testInputFormat() throws Exception { ++ public void testSqoopInputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); + + SqoopInputFormat inputformat = new SqoopInputFormat(); +@@ -79,51 +75,47 @@ public void testInputFormat() throws Exception { + assertEquals(9, splits.size()); + + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { +- SqoopSplit split = (SqoopSplit)splits.get(id-1); +- DummyPartition partition = (DummyPartition)split.getPartition(); ++ SqoopSplit split = (SqoopSplit) splits.get(id - 1); ++ DummyPartition partition = (DummyPartition) split.getPartition(); + assertEquals(id, partition.getId()); + } + } + + @Test +- public void testMapper() throws Exception { ++ public void testSqoopMapper() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); +- Schema schema = new Schema("Test"); +- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) +- .addColumn(new org.apache.sqoop.schema.type.Text("3")); +- ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); +- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); +- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); +- boolean success = JobUtils.runJob(job.getConfiguration(), +- SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); ++ // from and to have the same schema in this test case ++ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); ++ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), ++ SqoopInputFormat.class, ++ SqoopMapper.class, ++ DummyOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); + } + + @Test +- public void testOutputFormat() throws Exception { ++ public void testNullOutputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); + conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); +- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, +- CSVIntermediateDataFormat.class.getName()); +- Schema schema = new Schema("Test"); +- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) +- .addColumn(new Text("3")); ++ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + + Job job = new Job(conf); +- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); +- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); +- boolean success = JobUtils.runJob(job.getConfiguration(), +- SqoopInputFormat.class, SqoopMapper.class, +- SqoopNullOutputFormat.class); ++ // from and to have the same schema in this test case ++ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); ++ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), ++ SqoopInputFormat.class, ++ SqoopMapper.class, ++ SqoopNullOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); + + // Make sure both destroyers get called. +@@ -171,15 +163,17 @@ public String toString() { + } + } + +- public static class DummyExtractor extends Extractor { ++ public static class DummyExtractor extends ++ Extractor { + @Override +- public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { +- int id = ((DummyPartition)partition).getId(); ++ public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, ++ DummyPartition partition) { ++ int id = ((DummyPartition) partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { +- context.getDataWriter().writeArrayRecord(new Object[] { +- id * NUMBER_OF_ROWS_PER_PARTITION + row, +- (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), +- String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); ++ context.getDataWriter().writeArrayRecord( ++ new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row, ++ (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), ++ String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) }); + } + } + +@@ -189,16 +183,14 @@ public long getRowsRead() { + } + } + +- public static class DummyOutputFormat +- extends OutputFormat { ++ public static class DummyOutputFormat extends OutputFormat { + @Override + public void checkOutputSpecs(JobContext context) { + // do nothing + } + + @Override +- public RecordWriter getRecordWriter( +- TaskAttemptContext context) { ++ public RecordWriter getRecordWriter(TaskAttemptContext context) { + return new DummyRecordWriter(); + } + +@@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new DummyOutputCommitter(); + } + +- public static class DummyRecordWriter +- extends RecordWriter { +- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data data = new Data(); ++ public static class DummyRecordWriter extends RecordWriter { ++ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; ++ // should I use a dummy IDF for testing? ++ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + + @Override + public void write(SqoopWritable key, NullWritable value) { +- +- data.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- +- assertEquals(data.toString(), key.toString()); ++ assertEquals(dataFormat.getTextData().toString(), key.toString()); + } + + @Override +@@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) { + + public static class DummyOutputCommitter extends OutputCommitter { + @Override +- public void setupJob(JobContext jobContext) { } ++ public void setupJob(JobContext jobContext) { ++ } + + @Override +- public void setupTask(TaskAttemptContext taskContext) { } ++ public void setupTask(TaskAttemptContext taskContext) { ++ } + + @Override +- public void commitTask(TaskAttemptContext taskContext) { } ++ public void commitTask(TaskAttemptContext taskContext) { ++ } + + @Override +- public void abortTask(TaskAttemptContext taskContext) { } ++ public void abortTask(TaskAttemptContext taskContext) { ++ } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { +@@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { + } + } + ++ // it is writing to the target. + public static class DummyLoader extends Loader { +- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data expected = new Data(); ++ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; ++ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + + @Override +- public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ ++ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) ++ throws Exception { + String data; + while ((data = context.getDataReader().readTextRecord()) != null) { +- expected.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- assertEquals(expected.toString(), data); ++ assertEquals(dataFormat.getTextData().toString(), data); + } + } + } + + public static class DummyFromDestroyer extends Destroyer { +- + public static int count = 0; +- + @Override + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { + count++; + } + } + +- public static class DummyToDestroyer extends Destroyer { +- ++ public static class DummyToDestroyer extends Destroyer { + public static int count = 0; +- + @Override + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { + count++; +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +index 04fb692..a64a4a6 100644 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +@@ -38,16 +38,17 @@ + import org.apache.sqoop.common.Direction; + import org.apache.sqoop.connector.common.EmptyConfiguration; + import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; + import org.apache.sqoop.job.etl.Extractor; + import org.apache.sqoop.job.etl.ExtractorContext; + import org.apache.sqoop.job.etl.Partition; + import org.apache.sqoop.job.etl.Partitioner; + import org.apache.sqoop.job.etl.PartitionerContext; +-import org.apache.sqoop.job.io.Data; + import org.apache.sqoop.job.io.SqoopWritable; + import org.apache.sqoop.job.mr.MRConfigurationUtils; + import org.apache.sqoop.job.mr.SqoopInputFormat; + import org.apache.sqoop.job.mr.SqoopMapper; ++import org.apache.sqoop.job.util.MRJobTestUtil; + import org.apache.sqoop.schema.Schema; + import org.apache.sqoop.schema.type.FixedPoint; + import org.apache.sqoop.schema.type.FloatingPoint; +@@ -121,6 +122,7 @@ public TestMatching(Schema from, + return parameters; + } + ++ @SuppressWarnings("deprecation") + @Test + public void testSchemaMatching() throws Exception { + Configuration conf = new Configuration(); +@@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception { + Job job = new Job(conf); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); +- JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, ++ MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); +- boolean success = JobUtils.runJob(job.getConfiguration(), ++ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), + SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + if (from.getName().split("-")[1].equals("EMPTY")) { +@@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + public static class DummyRecordWriter + extends RecordWriter { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; +- private Data data = new Data(); ++ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + + @Override + public void write(SqoopWritable key, NullWritable value) { +- +- data.setContent(new Object[] { +- index, +- (double) index, +- String.valueOf(index)}, +- Data.ARRAY_RECORD); ++ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; ++ dataFormat.setTextData(testData); + index++; +- +- assertEquals(data.toString(), key.toString()); ++ assertEquals(dataFormat.getTextData().toString(), key.toString()); + } + + @Override +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java +deleted file mode 100644 +index 68ce5ed..0000000 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java ++++ /dev/null +@@ -1,95 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, +- * software distributed under the License is distributed on an +- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +- * KIND, either express or implied. See the License for the +- * specific language governing permissions and limitations +- * under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import com.google.common.base.Charsets; +- +-import java.io.ByteArrayInputStream; +-import java.io.ByteArrayOutputStream; +-import java.io.DataInput; +-import java.io.DataInputStream; +-import java.io.DataOutput; +-import java.io.DataOutputStream; +-import java.io.IOException; +-import java.io.InputStream; +- +-import org.apache.hadoop.conf.Configuration; +-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +-import org.apache.sqoop.job.MRJobConstants; +-import org.junit.Assert; +-import org.junit.Test; +- +-public class SqoopWritableTest { +- +- private final SqoopWritable writable = new SqoopWritable(); +- +- @Test +- public void testStringInStringOut() { +- String testData = "Live Long and prosper"; +- writable.setString(testData); +- Assert.assertEquals(testData,writable.getString()); +- } +- +- @Test +- public void testDataWritten() throws IOException { +- String testData = "One ring to rule them all"; +- byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); +- writable.setString(testData); +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- writable.write(out); +- byte[] written = ostream.toByteArray(); +- InputStream instream = new ByteArrayInputStream(written); +- DataInput in = new DataInputStream(instream); +- String readData = in.readUTF(); +- Assert.assertEquals(testData, readData); +- } +- +- @Test +- public void testDataRead() throws IOException { +- String testData = "Brandywine Bridge - 20 miles!"; +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- out.writeUTF(testData); +- InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); +- DataInput in = new DataInputStream(instream); +- writable.readFields(in); +- Assert.assertEquals(testData, writable.getString()); +- } +- +- @Test +- public void testWriteReadUsingStream() throws IOException { +- String testData = "You shall not pass"; +- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); +- DataOutput out = new DataOutputStream(ostream); +- writable.setString(testData); +- writable.write(out); +- byte[] written = ostream.toByteArray(); +- +- //Don't test what the data is, test that SqoopWritable can read it. +- InputStream instream = new ByteArrayInputStream(written); +- SqoopWritable newWritable = new SqoopWritable(); +- DataInput in = new DataInputStream(instream); +- newWritable.readFields(in); +- Assert.assertEquals(testData, newWritable.getString()); +- ostream.close(); +- instream.close(); +- } +- +-} +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java +deleted file mode 100644 +index 4e23bcb..0000000 +--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java ++++ /dev/null +@@ -1,117 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.sqoop.job.io; +- +-import java.util.Arrays; +- +-import org.junit.Assert; +-import org.junit.Test; +- +-public class TestData { +- +- private static final double TEST_NUMBER = Math.PI + 100; +- @Test +- public void testArrayToCsv() throws Exception { +- Data data = new Data(); +- String expected; +- String actual; +- +- // with special characters: +- expected = +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); +- data.setContent(new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- String.valueOf(TEST_NUMBER) + "',s", +- new byte[] {1, 2, 3, 4, 5} }, +- Data.ARRAY_RECORD); +- actual = (String)data.getContent(Data.CSV_RECORD); +- assertEquals(expected, actual); +- +- // with null characters: +- expected = +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "null" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); +- data.setContent(new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- null, +- new byte[] {1, 2, 3, 4, 5} }, +- Data.ARRAY_RECORD); +- actual = (String)data.getContent(Data.CSV_RECORD); +- assertEquals(expected, actual); +- } +- +- @Test +- public void testCsvToArray() throws Exception { +- Data data = new Data(); +- Object[] expected; +- Object[] actual; +- +- // with special characters: +- expected = new Object[] { +- Long.valueOf((long)TEST_NUMBER), +- Double.valueOf(TEST_NUMBER), +- String.valueOf(TEST_NUMBER) + "',s", +- new byte[] {1, 2, 3, 4, 5} }; +- data.setContent( +- Long.valueOf((long)TEST_NUMBER) + "," + +- Double.valueOf(TEST_NUMBER) + "," + +- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + +- Arrays.toString(new byte[] {1, 2, 3, 4, 5}), +- Data.CSV_RECORD); +- actual = (Object[])data.getContent(Data.ARRAY_RECORD); +- assertEquals(expected.length, actual.length); +- for (int c=0; c writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int count = 0; count < 100; count++) { +- data.setTextData(String.valueOf(count)); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(String.valueOf(count)); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + } catch (SqoopException ex) { +@@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); + RecordWriter writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + for (int i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); +@@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + writer.close(null); +@@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); + RecordWriter writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { +@@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + + //Allow writer to complete. +@@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); + RecordWriter writer = executor.getRecordWriter(); +- IntermediateDataFormat data = new CSVIntermediateDataFormat(); ++ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int i = 0; i < 10; i++) { +@@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable { + builder.append(","); + } + } +- data.setTextData(builder.toString()); +- writable.setString(data.getTextData()); ++ dataFormat.setTextData(builder.toString()); ++ writable.setString(dataFormat.getTextData()); + writer.write(writable, null); + } + writer.close(null); +diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java +new file mode 100644 +index 0000000..5d5359e +--- /dev/null ++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java +@@ -0,0 +1,114 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.sqoop.job.util; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.io.NullWritable; ++import org.apache.hadoop.mapreduce.InputFormat; ++import org.apache.hadoop.mapreduce.Job; ++import org.apache.hadoop.mapreduce.JobContext; ++import org.apache.hadoop.mapreduce.Mapper; ++import org.apache.hadoop.mapreduce.OutputCommitter; ++import org.apache.hadoop.mapreduce.OutputFormat; ++import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; ++import org.apache.sqoop.connector.idf.IntermediateDataFormat; ++import org.apache.sqoop.job.io.SqoopWritable; ++import org.apache.sqoop.job.mr.SqoopSplit; ++import org.apache.sqoop.schema.Schema; ++import org.apache.sqoop.schema.type.FixedPoint; ++import org.apache.sqoop.schema.type.FloatingPoint; ++import org.apache.sqoop.schema.type.Text; ++import org.apache.sqoop.utils.ClassUtils; ++ ++import static org.mockito.Mockito.mock; ++import static org.mockito.Mockito.when; ++ ++public class MRJobTestUtil { ++ ++ @SuppressWarnings("deprecation") ++ public static boolean runJob(Configuration conf, ++ Class> input, ++ Class> mapper, ++ Class> output) throws IOException, ++ InterruptedException, ClassNotFoundException { ++ Job job = new Job(conf); ++ job.setInputFormatClass(input); ++ job.setMapperClass(mapper); ++ job.setMapOutputKeyClass(SqoopWritable.class); ++ job.setMapOutputValueClass(NullWritable.class); ++ job.setOutputFormatClass(output); ++ job.setOutputKeyClass(SqoopWritable.class); ++ job.setOutputValueClass(NullWritable.class); ++ ++ boolean ret = job.waitForCompletion(true); ++ ++ // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in ++ // LocalJobRuner ++ if (isHadoop1()) { ++ callOutputCommitter(job, output); ++ } ++ ++ return ret; ++ } ++ ++ public static Schema getTestSchema() { ++ Schema schema = new Schema("Test"); ++ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) ++ .addColumn(new Text("3")); ++ return schema; ++ } ++ ++ public static IntermediateDataFormat getTestIDF() { ++ return new CSVIntermediateDataFormat(getTestSchema()); ++ } ++ ++ /** ++ * Call output format on given job manually. ++ */ ++ private static void callOutputCommitter(Job job, ++ Class> outputFormat) throws IOException, ++ InterruptedException { ++ OutputCommitter committer = ((OutputFormat) ClassUtils.instantiate(outputFormat)) ++ .getOutputCommitter(null); ++ ++ JobContext jobContext = mock(JobContext.class); ++ when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); ++ ++ committer.commitJob(jobContext); ++ } ++ ++ /** ++ * Detect Hadoop 1.0 installation ++ * ++ * @return True if and only if this is Hadoop 1 and below ++ */ ++ public static boolean isHadoop1() { ++ String version = org.apache.hadoop.util.VersionInfo.getVersion(); ++ if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) { ++ return true; ++ } ++ return false; ++ } ++ ++ private MRJobTestUtil() { ++ // Disable explicit object creation ++ } ++ ++} diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java index 40c362c8..3aa3aea7 100644 --- a/common/src/main/java/org/apache/sqoop/schema/Schema.java +++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java @@ -122,12 +122,7 @@ public List getColumns() { } public boolean isEmpty() { - if (columns.size()==0) { - return true; - } else { - return false; - } - + return columns.size() == 0; } public String toString() { diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index e0e40619..e65edd9a 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -67,9 +67,15 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { private final List stringFieldIndices = new ArrayList(); private final List byteFieldIndices = new ArrayList(); - private Schema schema; + public CSVIntermediateDataFormat() { + } + + public CSVIntermediateDataFormat(Schema schema) { + setSchema(schema); + } + /** * {@inheritDoc} */ @@ -166,7 +172,7 @@ private String[] getFields() { */ @Override public Object[] getObjectData() { - if (schema.isEmpty()) { + if (schema == null || schema.isEmpty()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); } diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 72e95ed6..fcf6c3c1 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -41,11 +41,11 @@ public class TestCSVIntermediateDataFormat { private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - private IntermediateDataFormat data; + private IntermediateDataFormat dataFormat; @Before public void setUp() { - data = new CSVIntermediateDataFormat(); + dataFormat = new CSVIntermediateDataFormat(); } private String getByteFieldString(byte[] byteFieldData) { @@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) { public void testStringInStringOut() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'" + String.valueOf(0x0A) + "'"; - data.setTextData(testData); - assertEquals(testData, data.getTextData()); + dataFormat.setTextData(testData); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -74,10 +74,10 @@ public void testNullStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(null); + dataFormat.setSchema(schema); + dataFormat.setTextData(null); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertNull(out); } @@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(""); + dataFormat.setSchema(schema); + dataFormat.setTextData(""); - data.getObjectData(); + dataFormat.getObjectData(); } @Test @@ -111,10 +111,10 @@ public void testStringInObjectOut() { .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + Object[] out = dataFormat.getObjectData(); assertEquals(new Long(10),out[0]); assertEquals(new Long(34),out[1]); @@ -134,7 +134,7 @@ public void testObjectInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -145,12 +145,12 @@ public void testObjectInStringOut() { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -164,7 +164,7 @@ public void testObjectInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); Object[] in = new Object[6]; in[0] = new Long(10); @@ -177,9 +177,9 @@ public void testObjectInObjectOut() { System.arraycopy(in,0,inCopy,0,in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test @@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() { in[4] = byteFieldData; in[5] = new String(new char[] { 0x0A }); - data.setObjectData(in); + dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); + assertEquals(testData, dataFormat.getTextData()); } @Test @@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); + dataFormat.setObjectData(in); - assertEquals(strData, data.getObjectData()[0]); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + assertEquals(strData, dataFormat.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); byte[] allCharByteArr = new byte[256]; for (int i = 0; i < allCharByteArr.length; ++i) { @@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() { System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + dataFormat.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); } @Test public void testDate() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); - data.setTextData("2014-10-01"); - assertEquals("2014-10-01", data.getObjectData()[0].toString()); + dataFormat.setTextData("2014-10-01"); + assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); } @Test public void testDateTime() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01T12:00:00", "2014-10-01T12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -289,14 +289,14 @@ public void testDateTime() { public void testDateTimeISO8601Alternative() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String dateTime : new String[]{ "2014-10-01 12:00:00", "2014-10-01 12:00:00.000" }) { - data.setTextData(dateTime); - assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); + dataFormat.setTextData(dateTime); + assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); } } @@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() { public void testBit() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - data.setSchema(schema); + dataFormat.setSchema(schema); for (String trueBit : new String[]{ "true", "TRUE", "1" }) { - data.setTextData(trueBit); - assertTrue((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(trueBit); + assertTrue((Boolean) dataFormat.getObjectData()[0]); } for (String falseBit : new String[]{ "false", "FALSE", "0" }) { - data.setTextData(falseBit); - assertFalse((Boolean) data.getObjectData()[0]); + dataFormat.setTextData(falseBit); + assertFalse((Boolean) dataFormat.getObjectData()[0]); } } @@ -326,9 +326,23 @@ public void testEmptySchema() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'\\n'"; Schema schema = new Schema("Test"); - data.setSchema(schema); - data.setTextData(testData); + dataFormat.setSchema(schema); + dataFormat.setTextData(testData); - Object[] out = data.getObjectData(); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNullSchema() { + dataFormat.setSchema(null); + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); + } + + @Test(expected = SqoopException.class) + public void testNotSettingSchema() { + @SuppressWarnings("unused") + Object[] out = dataFormat.getObjectData(); } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java deleted file mode 100644 index 139883e7..00000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +++ /dev/null @@ -1,529 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.regex.Matcher; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableUtils; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MRExecutionError; - -public class Data implements WritableComparable { - - // The content is an Object to accommodate different kinds of data. - // For example, it can be: - // - Object[] for an array of object record - // - String for a text of CSV record - private volatile Object content = null; - - public static final int EMPTY_DATA = 0; - public static final int CSV_RECORD = 1; - public static final int ARRAY_RECORD = 2; - private int type = EMPTY_DATA; - - public static final String CHARSET_NAME = "UTF-8"; - - public static final char DEFAULT_RECORD_DELIMITER = '\n'; - public static final char DEFAULT_FIELD_DELIMITER = ','; - public static final char DEFAULT_STRING_DELIMITER = '\''; - public static final char DEFAULT_STRING_ESCAPE = '\\'; - private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; - private char stringDelimiter = DEFAULT_STRING_DELIMITER; - private char stringEscape = DEFAULT_STRING_ESCAPE; - private String escapedStringDelimiter = String.valueOf(new char[] { - stringEscape, stringDelimiter - }); - - private int[] fieldTypes = null; - - public void setFieldDelimiter(char fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; - } - - public void setFieldTypes(int[] fieldTypes) { - this.fieldTypes = fieldTypes; - } - - public void setContent(Object content, int type) { - switch (type) { - case EMPTY_DATA: - case CSV_RECORD: - case ARRAY_RECORD: - this.type = type; - this.content = content; - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - public Object getContent(int targetType) { - switch (targetType) { - case CSV_RECORD: - return format(); - case ARRAY_RECORD: - return parse(); - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); - } - } - - public int getType() { - return type; - } - - public boolean isEmpty() { - return (type == EMPTY_DATA); - } - - @Override - public String toString() { - return (String)getContent(CSV_RECORD); - } - - @Override - public int compareTo(Data other) { - byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); - byte[] otherBytes = other.toString().getBytes( - Charset.forName(CHARSET_NAME)); - return WritableComparator.compareBytes( - myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof Data)) { - return false; - } - - Data data = (Data)other; - if (type != data.getType()) { - return false; - } - - return toString().equals(data.toString()); - } - - @Override - public int hashCode() { - int result = super.hashCode(); - switch (type) { - case CSV_RECORD: - result += 31 * content.hashCode(); - return result; - case ARRAY_RECORD: - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - result += 31 * array[i].hashCode(); - } - return result; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - type = readType(in); - switch (type) { - case CSV_RECORD: - readCsv(in); - break; - case ARRAY_RECORD: - readArray(in); - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - @Override - public void write(DataOutput out) throws IOException { - writeType(out, type); - switch (type) { - case CSV_RECORD: - writeCsv(out); - break; - case ARRAY_RECORD: - writeArray(out); - break; - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private int readType(DataInput in) throws IOException { - return WritableUtils.readVInt(in); - } - - private void writeType(DataOutput out, int type) throws IOException { - WritableUtils.writeVInt(out, type); - } - - private void readCsv(DataInput in) throws IOException { - content = in.readUTF(); - } - - private void writeCsv(DataOutput out) throws IOException { - out.writeUTF((String)content); - } - - private void readArray(DataInput in) throws IOException { - // read number of columns - int columns = in.readInt(); - content = new Object[columns]; - Object[] array = (Object[])content; - // read each column - for (int i = 0; i < array.length; i++) { - int type = readType(in); - switch (type) { - case FieldTypes.UTF: - array[i] = in.readUTF(); - break; - - case FieldTypes.BIN: - int length = in.readInt(); - byte[] bytes = new byte[length]; - in.readFully(bytes); - array[i] = bytes; - break; - - case FieldTypes.DOUBLE: - array[i] = in.readDouble(); - break; - - case FieldTypes.FLOAT: - array[i] = in.readFloat(); - break; - - case FieldTypes.LONG: - array[i] = in.readLong(); - break; - - case FieldTypes.INT: - array[i] = in.readInt(); - break; - - case FieldTypes.SHORT: - array[i] = in.readShort(); - break; - - case FieldTypes.CHAR: - array[i] = in.readChar(); - break; - - case FieldTypes.BYTE: - array[i] = in.readByte(); - break; - - case FieldTypes.BOOLEAN: - array[i] = in.readBoolean(); - break; - - case FieldTypes.NULL: - array[i] = null; - break; - - default: - throw new IOException( - new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) - ); - } - } - } - - private void writeArray(DataOutput out) throws IOException { - Object[] array = (Object[])content; - // write number of columns - out.writeInt(array.length); - // write each column - for (int i = 0; i < array.length; i++) { - if (array[i] instanceof String) { - writeType(out, FieldTypes.UTF); - out.writeUTF((String)array[i]); - - } else if (array[i] instanceof byte[]) { - writeType(out, FieldTypes.BIN); - out.writeInt(((byte[])array[i]).length); - out.write((byte[])array[i]); - - } else if (array[i] instanceof Double) { - writeType(out, FieldTypes.DOUBLE); - out.writeDouble((Double)array[i]); - - } else if (array[i] instanceof Float) { - writeType(out, FieldTypes.FLOAT); - out.writeFloat((Float)array[i]); - - } else if (array[i] instanceof Long) { - writeType(out, FieldTypes.LONG); - out.writeLong((Long)array[i]); - - } else if (array[i] instanceof Integer) { - writeType(out, FieldTypes.INT); - out.writeInt((Integer)array[i]); - - } else if (array[i] instanceof Short) { - writeType(out, FieldTypes.SHORT); - out.writeShort((Short)array[i]); - - } else if (array[i] instanceof Character) { - writeType(out, FieldTypes.CHAR); - out.writeChar((Character)array[i]); - - } else if (array[i] instanceof Byte) { - writeType(out, FieldTypes.BYTE); - out.writeByte((Byte)array[i]); - - } else if (array[i] instanceof Boolean) { - writeType(out, FieldTypes.BOOLEAN); - out.writeBoolean((Boolean)array[i]); - - } else if (array[i] == null) { - writeType(out, FieldTypes.NULL); - - } else { - throw new IOException( - new SqoopException(MRExecutionError.MAPRED_EXEC_0012, - array[i].getClass().getName() - ) - ); - } - } - } - - private String format() { - switch (type) { - case EMPTY_DATA: - return null; - - case CSV_RECORD: - if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { - return (String)content; - } else { - // TODO: need to exclude the case where comma is part of a string. - return ((String)content).replaceAll( - String.valueOf(DEFAULT_FIELD_DELIMITER), - String.valueOf(fieldDelimiter)); - } - - case ARRAY_RECORD: - StringBuilder sb = new StringBuilder(); - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - if (i != 0) { - sb.append(fieldDelimiter); - } - - if (array[i] instanceof String) { - sb.append(stringDelimiter); - sb.append(escape((String)array[i])); - sb.append(stringDelimiter); - } else if (array[i] instanceof byte[]) { - sb.append(Arrays.toString((byte[])array[i])); - } else { - sb.append(String.valueOf(array[i])); - } - } - return sb.toString(); - - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private Object[] parse() { - switch (type) { - case EMPTY_DATA: - return null; - - case CSV_RECORD: - ArrayList list = new ArrayList(); - char[] record = ((String)content).toCharArray(); - int start = 0; - int position = start; - boolean stringDelimited = false; - boolean arrayDelimited = false; - int index = 0; - while (position < record.length) { - if (record[position] == fieldDelimiter) { - if (!stringDelimited && !arrayDelimited) { - index = parseField(list, record, start, position, index); - start = position + 1; - } - } else if (record[position] == stringDelimiter) { - if (!stringDelimited) { - stringDelimited = true; - } - else if (position > 0 && record[position-1] != stringEscape) { - stringDelimited = false; - } - } else if (record[position] == '[') { - if (!stringDelimited) { - arrayDelimited = true; - } - } else if (record[position] == ']') { - if (!stringDelimited) { - arrayDelimited = false; - } - } - position++; - } - parseField(list, record, start, position, index); - return list.toArray(); - - case ARRAY_RECORD: - return (Object[])content; - - default: - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); - } - } - - private int parseField(ArrayList list, char[] record, - int start, int end, int index) { - String field = String.valueOf(record, start, end-start).trim(); - - int fieldType; - if (fieldTypes == null) { - fieldType = guessType(field); - } else { - fieldType = fieldTypes[index]; - } - - switch (fieldType) { - case FieldTypes.UTF: - if (field.charAt(0) != stringDelimiter || - field.charAt(field.length()-1) != stringDelimiter) { - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); - } - list.add(index, unescape(field.substring(1, field.length()-1))); - break; - - case FieldTypes.BIN: - if (field.charAt(0) != '[' || - field.charAt(field.length()-1) != ']') { - throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); - } - String[] splits = - field.substring(1, field.length()-1).split(String.valueOf(',')); - byte[] bytes = new byte[splits.length]; - for (int i=0; i getPartitions(PartitionerContext context, Object oc, Obje } } - public static class DummyExtractor extends Extractor { + public static class DummyExtractor extends + Extractor { @Override - public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { - int id = ((DummyPartition)partition).getId(); + public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, + DummyPartition partition) { + int id = ((DummyPartition) partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { - context.getDataWriter().writeArrayRecord(new Object[] { - id * NUMBER_OF_ROWS_PER_PARTITION + row, - (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), - String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + context.getDataWriter().writeArrayRecord( + new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row, + (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), + String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) }); } } @@ -189,16 +183,14 @@ public long getRowsRead() { } } - public static class DummyOutputFormat - extends OutputFormat { + public static class DummyOutputFormat extends OutputFormat { @Override public void checkOutputSpecs(JobContext context) { // do nothing } @Override - public RecordWriter getRecordWriter( - TaskAttemptContext context) { + public RecordWriter getRecordWriter(TaskAttemptContext context) { return new DummyRecordWriter(); } @@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { return new DummyOutputCommitter(); } - public static class DummyRecordWriter - extends RecordWriter { - private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data data = new Data(); + public static class DummyRecordWriter extends RecordWriter { + private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; + // should I use a dummy IDF for testing? + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override public void write(SqoopWritable key, NullWritable value) { - - data.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - - assertEquals(data.toString(), key.toString()); + assertEquals(dataFormat.getTextData().toString(), key.toString()); } @Override @@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) { public static class DummyOutputCommitter extends OutputCommitter { @Override - public void setupJob(JobContext jobContext) { } + public void setupJob(JobContext jobContext) { + } @Override - public void setupTask(TaskAttemptContext taskContext) { } + public void setupTask(TaskAttemptContext taskContext) { + } @Override - public void commitTask(TaskAttemptContext taskContext) { } + public void commitTask(TaskAttemptContext taskContext) { + } @Override - public void abortTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { + } @Override public boolean needsTaskCommit(TaskAttemptContext taskContext) { @@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { } } + // it is writing to the target. public static class DummyLoader extends Loader { - private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data expected = new Data(); + private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override - public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ + public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) + throws Exception { String data; while ((data = context.getDataReader().readTextRecord()) != null) { - expected.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - assertEquals(expected.toString(), data); + assertEquals(dataFormat.getTextData().toString(), data); } } } public static class DummyFromDestroyer extends Destroyer { - public static int count = 0; - @Override public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; } } - public static class DummyToDestroyer extends Destroyer { - + public static class DummyToDestroyer extends Destroyer { public static int count = 0; - @Override public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 04fb6927..a64a4a64 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -38,16 +38,17 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.util.MRJobTestUtil; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -121,6 +122,7 @@ public static Collection data() { return parameters; } + @SuppressWarnings("deprecation") @Test public void testSchemaMatching() throws Exception { Configuration conf = new Configuration(); @@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception { Job job = new Job(conf); MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); - JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); - boolean success = JobUtils.runJob(job.getConfiguration(), + boolean success = MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); if (from.getName().split("-")[1].equals("EMPTY")) { @@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { public static class DummyRecordWriter extends RecordWriter { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; - private Data data = new Data(); + private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); @Override public void write(SqoopWritable key, NullWritable value) { - - data.setContent(new Object[] { - index, - (double) index, - String.valueOf(index)}, - Data.ARRAY_RECORD); + String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; + dataFormat.setTextData(testData); index++; - - assertEquals(data.toString(), key.toString()); + assertEquals(dataFormat.getTextData().toString(), key.toString()); } @Override diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java deleted file mode 100644 index 4e23bcb7..00000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -import java.util.Arrays; - -import org.junit.Assert; -import org.junit.Test; - -public class TestData { - - private static final double TEST_NUMBER = Math.PI + 100; - @Test - public void testArrayToCsv() throws Exception { - Data data = new Data(); - String expected; - String actual; - - // with special characters: - expected = - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}); - data.setContent(new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - String.valueOf(TEST_NUMBER) + "',s", - new byte[] {1, 2, 3, 4, 5} }, - Data.ARRAY_RECORD); - actual = (String)data.getContent(Data.CSV_RECORD); - assertEquals(expected, actual); - - // with null characters: - expected = - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "null" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}); - data.setContent(new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - null, - new byte[] {1, 2, 3, 4, 5} }, - Data.ARRAY_RECORD); - actual = (String)data.getContent(Data.CSV_RECORD); - assertEquals(expected, actual); - } - - @Test - public void testCsvToArray() throws Exception { - Data data = new Data(); - Object[] expected; - Object[] actual; - - // with special characters: - expected = new Object[] { - Long.valueOf((long)TEST_NUMBER), - Double.valueOf(TEST_NUMBER), - String.valueOf(TEST_NUMBER) + "',s", - new byte[] {1, 2, 3, 4, 5} }; - data.setContent( - Long.valueOf((long)TEST_NUMBER) + "," + - Double.valueOf(TEST_NUMBER) + "," + - "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + - Arrays.toString(new byte[] {1, 2, 3, 4, 5}), - Data.CSV_RECORD); - actual = (Object[])data.getContent(Data.ARRAY_RECORD); - assertEquals(expected.length, actual.length); - for (int c=0; c writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); try { for (int count = 0; count < 100; count++) { - data.setTextData(String.valueOf(count)); - writable.setString(data.getTextData()); + dataFormat.setTextData(String.valueOf(count)); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } } catch (SqoopException ex) { @@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); @@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); //Allow writer to complete. @@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); - IntermediateDataFormat data = new CSVIntermediateDataFormat(); + IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(); try { for (int i = 0; i < 10; i++) { @@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable { builder.append(","); } } - data.setTextData(builder.toString()); - writable.setString(data.getTextData()); + dataFormat.setTextData(builder.toString()); + writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java similarity index 62% rename from execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java rename to execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java index dafdeb4c..5d5359e5 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.job; +package org.apache.sqoop.job.util; import java.io.IOException; @@ -27,20 +27,27 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopSplit; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; import org.apache.sqoop.utils.ClassUtils; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class JobUtils { +public class MRJobTestUtil { + @SuppressWarnings("deprecation") public static boolean runJob(Configuration conf, - Class> input, - Class> mapper, - Class> output) - throws IOException, InterruptedException, ClassNotFoundException { + Class> input, + Class> mapper, + Class> output) throws IOException, + InterruptedException, ClassNotFoundException { Job job = new Job(conf); job.setInputFormatClass(input); job.setMapperClass(mapper); @@ -52,7 +59,8 @@ public static boolean runJob(Configuration conf, boolean ret = job.waitForCompletion(true); - // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner + // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in + // LocalJobRuner if (isHadoop1()) { callOutputCommitter(job, output); } @@ -60,11 +68,25 @@ public static boolean runJob(Configuration conf, return ret; } + public static Schema getTestSchema() { + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new Text("3")); + return schema; + } + + public static IntermediateDataFormat getTestIDF() { + return new CSVIntermediateDataFormat(getTestSchema()); + } + /** * Call output format on given job manually. */ - private static void callOutputCommitter(Job job, Class> outputFormat) throws IOException, InterruptedException { - OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null); + private static void callOutputCommitter(Job job, + Class> outputFormat) throws IOException, + InterruptedException { + OutputCommitter committer = ((OutputFormat) ClassUtils.instantiate(outputFormat)) + .getOutputCommitter(null); JobContext jobContext = mock(JobContext.class); when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); @@ -79,14 +101,13 @@ private static void callOutputCommitter(Job job, Class