diff --git a/SQOOP-1348.patch b/SQOOP-1348.patch deleted file mode 100644 index 7834a3f3..00000000 --- a/SQOOP-1348.patch +++ /dev/null @@ -1,1844 +0,0 @@ -diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java -index 40c362c..3aa3aea 100644 ---- a/common/src/main/java/org/apache/sqoop/schema/Schema.java -+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java -@@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) { - } - - public boolean isEmpty() { -- if (columns.size()==0) { -- return true; -- } else { -- return false; -- } -- -+ return columns.size() == 0; - } - - public String toString() { -diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java -index e0e4061..e65edd9 100644 ---- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java -+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java -@@ -67,9 +67,15 @@ - - private final List stringFieldIndices = new ArrayList(); - private final List byteFieldIndices = new ArrayList(); -- - private Schema schema; - -+ public CSVIntermediateDataFormat() { -+ } -+ -+ public CSVIntermediateDataFormat(Schema schema) { -+ setSchema(schema); -+ } -+ - /** - * {@inheritDoc} - */ -@@ -166,7 +172,7 @@ public void setSchema(Schema schema) { - */ - @Override - public Object[] getObjectData() { -- if (schema.isEmpty()) { -+ if (schema == null || schema.isEmpty()) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); - } - -diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java -index 72e95ed..fcf6c3c 100644 ---- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java -+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java -@@ -41,11 +41,11 @@ - - private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - -- private IntermediateDataFormat data; -+ private IntermediateDataFormat dataFormat; - - @Before - public void setUp() { -- data = new CSVIntermediateDataFormat(); -+ dataFormat = new CSVIntermediateDataFormat(); - } - - private String getByteFieldString(byte[] byteFieldData) { -@@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) { - public void testStringInStringOut() { - String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'" + String.valueOf(0x0A) + "'"; -- data.setTextData(testData); -- assertEquals(testData, data.getTextData()); -+ dataFormat.setTextData(testData); -+ assertEquals(testData, dataFormat.getTextData()); - } - - @Test -@@ -74,10 +74,10 @@ public void testNullStringInObjectOut() { - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); -- data.setSchema(schema); -- data.setTextData(null); -+ dataFormat.setSchema(schema); -+ dataFormat.setTextData(null); - -- Object[] out = data.getObjectData(); -+ Object[] out = dataFormat.getObjectData(); - - assertNull(out); - } -@@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() { - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); -- data.setSchema(schema); -- data.setTextData(""); -+ dataFormat.setSchema(schema); -+ dataFormat.setTextData(""); - -- data.getObjectData(); -+ dataFormat.getObjectData(); - } - - @Test -@@ -111,10 +111,10 @@ public void testStringInObjectOut() { - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - -- data.setSchema(schema); -- data.setTextData(testData); -+ dataFormat.setSchema(schema); -+ dataFormat.setTextData(testData); - -- Object[] out = data.getObjectData(); -+ Object[] out = dataFormat.getObjectData(); - - assertEquals(new Long(10),out[0]); - assertEquals(new Long(34),out[1]); -@@ -134,7 +134,7 @@ public void testObjectInStringOut() { - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; - Object[] in = new Object[6]; -@@ -145,12 +145,12 @@ public void testObjectInStringOut() { - in[4] = byteFieldData; - in[5] = new String(new char[] { 0x0A }); - -- data.setObjectData(in); -+ dataFormat.setObjectData(in); - - //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,'54','random data'," + - getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; -- assertEquals(testData, data.getTextData()); -+ assertEquals(testData, dataFormat.getTextData()); - } - - @Test -@@ -164,7 +164,7 @@ public void testObjectInObjectOut() { - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - Object[] in = new Object[6]; - in[0] = new Long(10); -@@ -177,9 +177,9 @@ public void testObjectInObjectOut() { - System.arraycopy(in,0,inCopy,0,in.length); - - // Modifies the input array, so we use the copy to confirm -- data.setObjectData(in); -+ dataFormat.setObjectData(in); - -- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); -+ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); - } - - @Test -@@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() { - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; - Object[] in = new Object[6]; -@@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() { - in[4] = byteFieldData; - in[5] = new String(new char[] { 0x0A }); - -- data.setObjectData(in); -+ dataFormat.setObjectData(in); - - //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,NULL,'random data'," + - getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; -- assertEquals(testData, data.getTextData()); -+ assertEquals(testData, dataFormat.getTextData()); - } - - @Test -@@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() { - Schema schema = new Schema("test"); - schema.addColumn(new Text("1")); - -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - char[] allCharArr = new char[256]; - for(int i = 0; i < allCharArr.length; ++i) { -@@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() { - System.arraycopy(in, 0, inCopy, 0, in.length); - - // Modifies the input array, so we use the copy to confirm -- data.setObjectData(in); -+ dataFormat.setObjectData(in); - -- assertEquals(strData, data.getObjectData()[0]); -- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); -+ assertEquals(strData, dataFormat.getObjectData()[0]); -+ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); - } - - @Test - public void testByteArrayFullRangeOfCharacters() { - Schema schema = new Schema("test"); - schema.addColumn(new Binary("1")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - byte[] allCharByteArr = new byte[256]; - for (int i = 0; i < allCharByteArr.length; ++i) { -@@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() { - System.arraycopy(in, 0, inCopy, 0, in.length); - - // Modifies the input array, so we use the copy to confirm -- data.setObjectData(in); -- assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); -+ dataFormat.setObjectData(in); -+ assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData())); - } - - @Test - public void testDate() { - Schema schema = new Schema("test"); - schema.addColumn(new Date("1")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - -- data.setTextData("2014-10-01"); -- assertEquals("2014-10-01", data.getObjectData()[0].toString()); -+ dataFormat.setTextData("2014-10-01"); -+ assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString()); - } - - @Test - public void testDateTime() { - Schema schema = new Schema("test"); - schema.addColumn(new DateTime("1")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - for (String dateTime : new String[]{ - "2014-10-01T12:00:00", - "2014-10-01T12:00:00.000" - }) { -- data.setTextData(dateTime); -- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); -+ dataFormat.setTextData(dateTime); -+ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); - } - } - -@@ -289,14 +289,14 @@ public void testDateTime() { - public void testDateTimeISO8601Alternative() { - Schema schema = new Schema("test"); - schema.addColumn(new DateTime("1")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - for (String dateTime : new String[]{ - "2014-10-01 12:00:00", - "2014-10-01 12:00:00.000" - }) { -- data.setTextData(dateTime); -- assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString()); -+ dataFormat.setTextData(dateTime); -+ assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString()); - } - } - -@@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() { - public void testBit() { - Schema schema = new Schema("test"); - schema.addColumn(new Bit("1")); -- data.setSchema(schema); -+ dataFormat.setSchema(schema); - - for (String trueBit : new String[]{ - "true", "TRUE", "1" - }) { -- data.setTextData(trueBit); -- assertTrue((Boolean) data.getObjectData()[0]); -+ dataFormat.setTextData(trueBit); -+ assertTrue((Boolean) dataFormat.getObjectData()[0]); - } - - for (String falseBit : new String[]{ - "false", "FALSE", "0" - }) { -- data.setTextData(falseBit); -- assertFalse((Boolean) data.getObjectData()[0]); -+ dataFormat.setTextData(falseBit); -+ assertFalse((Boolean) dataFormat.getObjectData()[0]); - } - } - -@@ -326,9 +326,23 @@ public void testEmptySchema() { - String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'\\n'"; - Schema schema = new Schema("Test"); -- data.setSchema(schema); -- data.setTextData(testData); -+ dataFormat.setSchema(schema); -+ dataFormat.setTextData(testData); - -- Object[] out = data.getObjectData(); -+ @SuppressWarnings("unused") -+ Object[] out = dataFormat.getObjectData(); -+ } -+ -+ @Test(expected = SqoopException.class) -+ public void testNullSchema() { -+ dataFormat.setSchema(null); -+ @SuppressWarnings("unused") -+ Object[] out = dataFormat.getObjectData(); -+ } -+ -+ @Test(expected = SqoopException.class) -+ public void testNotSettingSchema() { -+ @SuppressWarnings("unused") -+ Object[] out = dataFormat.getObjectData(); - } - } -diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java -deleted file mode 100644 -index 139883e..0000000 ---- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java -+++ /dev/null -@@ -1,529 +0,0 @@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.sqoop.job.io; -- --import java.io.DataInput; --import java.io.DataOutput; --import java.io.IOException; --import java.nio.charset.Charset; --import java.util.ArrayList; --import java.util.Arrays; --import java.util.regex.Matcher; -- --import org.apache.hadoop.io.WritableComparable; --import org.apache.hadoop.io.WritableComparator; --import org.apache.hadoop.io.WritableUtils; --import org.apache.sqoop.common.SqoopException; --import org.apache.sqoop.job.MRExecutionError; -- --public class Data implements WritableComparable { -- -- // The content is an Object to accommodate different kinds of data. -- // For example, it can be: -- // - Object[] for an array of object record -- // - String for a text of CSV record -- private volatile Object content = null; -- -- public static final int EMPTY_DATA = 0; -- public static final int CSV_RECORD = 1; -- public static final int ARRAY_RECORD = 2; -- private int type = EMPTY_DATA; -- -- public static final String CHARSET_NAME = "UTF-8"; -- -- public static final char DEFAULT_RECORD_DELIMITER = '\n'; -- public static final char DEFAULT_FIELD_DELIMITER = ','; -- public static final char DEFAULT_STRING_DELIMITER = '\''; -- public static final char DEFAULT_STRING_ESCAPE = '\\'; -- private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; -- private char stringDelimiter = DEFAULT_STRING_DELIMITER; -- private char stringEscape = DEFAULT_STRING_ESCAPE; -- private String escapedStringDelimiter = String.valueOf(new char[] { -- stringEscape, stringDelimiter -- }); -- -- private int[] fieldTypes = null; -- -- public void setFieldDelimiter(char fieldDelimiter) { -- this.fieldDelimiter = fieldDelimiter; -- } -- -- public void setFieldTypes(int[] fieldTypes) { -- this.fieldTypes = fieldTypes; -- } -- -- public void setContent(Object content, int type) { -- switch (type) { -- case EMPTY_DATA: -- case CSV_RECORD: -- case ARRAY_RECORD: -- this.type = type; -- this.content = content; -- break; -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- public Object getContent(int targetType) { -- switch (targetType) { -- case CSV_RECORD: -- return format(); -- case ARRAY_RECORD: -- return parse(); -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); -- } -- } -- -- public int getType() { -- return type; -- } -- -- public boolean isEmpty() { -- return (type == EMPTY_DATA); -- } -- -- @Override -- public String toString() { -- return (String)getContent(CSV_RECORD); -- } -- -- @Override -- public int compareTo(Data other) { -- byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); -- byte[] otherBytes = other.toString().getBytes( -- Charset.forName(CHARSET_NAME)); -- return WritableComparator.compareBytes( -- myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); -- } -- -- @Override -- public boolean equals(Object other) { -- if (!(other instanceof Data)) { -- return false; -- } -- -- Data data = (Data)other; -- if (type != data.getType()) { -- return false; -- } -- -- return toString().equals(data.toString()); -- } -- -- @Override -- public int hashCode() { -- int result = super.hashCode(); -- switch (type) { -- case CSV_RECORD: -- result += 31 * content.hashCode(); -- return result; -- case ARRAY_RECORD: -- Object[] array = (Object[])content; -- for (int i = 0; i < array.length; i++) { -- result += 31 * array[i].hashCode(); -- } -- return result; -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- @Override -- public void readFields(DataInput in) throws IOException { -- type = readType(in); -- switch (type) { -- case CSV_RECORD: -- readCsv(in); -- break; -- case ARRAY_RECORD: -- readArray(in); -- break; -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- @Override -- public void write(DataOutput out) throws IOException { -- writeType(out, type); -- switch (type) { -- case CSV_RECORD: -- writeCsv(out); -- break; -- case ARRAY_RECORD: -- writeArray(out); -- break; -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- private int readType(DataInput in) throws IOException { -- return WritableUtils.readVInt(in); -- } -- -- private void writeType(DataOutput out, int type) throws IOException { -- WritableUtils.writeVInt(out, type); -- } -- -- private void readCsv(DataInput in) throws IOException { -- content = in.readUTF(); -- } -- -- private void writeCsv(DataOutput out) throws IOException { -- out.writeUTF((String)content); -- } -- -- private void readArray(DataInput in) throws IOException { -- // read number of columns -- int columns = in.readInt(); -- content = new Object[columns]; -- Object[] array = (Object[])content; -- // read each column -- for (int i = 0; i < array.length; i++) { -- int type = readType(in); -- switch (type) { -- case FieldTypes.UTF: -- array[i] = in.readUTF(); -- break; -- -- case FieldTypes.BIN: -- int length = in.readInt(); -- byte[] bytes = new byte[length]; -- in.readFully(bytes); -- array[i] = bytes; -- break; -- -- case FieldTypes.DOUBLE: -- array[i] = in.readDouble(); -- break; -- -- case FieldTypes.FLOAT: -- array[i] = in.readFloat(); -- break; -- -- case FieldTypes.LONG: -- array[i] = in.readLong(); -- break; -- -- case FieldTypes.INT: -- array[i] = in.readInt(); -- break; -- -- case FieldTypes.SHORT: -- array[i] = in.readShort(); -- break; -- -- case FieldTypes.CHAR: -- array[i] = in.readChar(); -- break; -- -- case FieldTypes.BYTE: -- array[i] = in.readByte(); -- break; -- -- case FieldTypes.BOOLEAN: -- array[i] = in.readBoolean(); -- break; -- -- case FieldTypes.NULL: -- array[i] = null; -- break; -- -- default: -- throw new IOException( -- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) -- ); -- } -- } -- } -- -- private void writeArray(DataOutput out) throws IOException { -- Object[] array = (Object[])content; -- // write number of columns -- out.writeInt(array.length); -- // write each column -- for (int i = 0; i < array.length; i++) { -- if (array[i] instanceof String) { -- writeType(out, FieldTypes.UTF); -- out.writeUTF((String)array[i]); -- -- } else if (array[i] instanceof byte[]) { -- writeType(out, FieldTypes.BIN); -- out.writeInt(((byte[])array[i]).length); -- out.write((byte[])array[i]); -- -- } else if (array[i] instanceof Double) { -- writeType(out, FieldTypes.DOUBLE); -- out.writeDouble((Double)array[i]); -- -- } else if (array[i] instanceof Float) { -- writeType(out, FieldTypes.FLOAT); -- out.writeFloat((Float)array[i]); -- -- } else if (array[i] instanceof Long) { -- writeType(out, FieldTypes.LONG); -- out.writeLong((Long)array[i]); -- -- } else if (array[i] instanceof Integer) { -- writeType(out, FieldTypes.INT); -- out.writeInt((Integer)array[i]); -- -- } else if (array[i] instanceof Short) { -- writeType(out, FieldTypes.SHORT); -- out.writeShort((Short)array[i]); -- -- } else if (array[i] instanceof Character) { -- writeType(out, FieldTypes.CHAR); -- out.writeChar((Character)array[i]); -- -- } else if (array[i] instanceof Byte) { -- writeType(out, FieldTypes.BYTE); -- out.writeByte((Byte)array[i]); -- -- } else if (array[i] instanceof Boolean) { -- writeType(out, FieldTypes.BOOLEAN); -- out.writeBoolean((Boolean)array[i]); -- -- } else if (array[i] == null) { -- writeType(out, FieldTypes.NULL); -- -- } else { -- throw new IOException( -- new SqoopException(MRExecutionError.MAPRED_EXEC_0012, -- array[i].getClass().getName() -- ) -- ); -- } -- } -- } -- -- private String format() { -- switch (type) { -- case EMPTY_DATA: -- return null; -- -- case CSV_RECORD: -- if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { -- return (String)content; -- } else { -- // TODO: need to exclude the case where comma is part of a string. -- return ((String)content).replaceAll( -- String.valueOf(DEFAULT_FIELD_DELIMITER), -- String.valueOf(fieldDelimiter)); -- } -- -- case ARRAY_RECORD: -- StringBuilder sb = new StringBuilder(); -- Object[] array = (Object[])content; -- for (int i = 0; i < array.length; i++) { -- if (i != 0) { -- sb.append(fieldDelimiter); -- } -- -- if (array[i] instanceof String) { -- sb.append(stringDelimiter); -- sb.append(escape((String)array[i])); -- sb.append(stringDelimiter); -- } else if (array[i] instanceof byte[]) { -- sb.append(Arrays.toString((byte[])array[i])); -- } else { -- sb.append(String.valueOf(array[i])); -- } -- } -- return sb.toString(); -- -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- private Object[] parse() { -- switch (type) { -- case EMPTY_DATA: -- return null; -- -- case CSV_RECORD: -- ArrayList list = new ArrayList(); -- char[] record = ((String)content).toCharArray(); -- int start = 0; -- int position = start; -- boolean stringDelimited = false; -- boolean arrayDelimited = false; -- int index = 0; -- while (position < record.length) { -- if (record[position] == fieldDelimiter) { -- if (!stringDelimited && !arrayDelimited) { -- index = parseField(list, record, start, position, index); -- start = position + 1; -- } -- } else if (record[position] == stringDelimiter) { -- if (!stringDelimited) { -- stringDelimited = true; -- } -- else if (position > 0 && record[position-1] != stringEscape) { -- stringDelimited = false; -- } -- } else if (record[position] == '[') { -- if (!stringDelimited) { -- arrayDelimited = true; -- } -- } else if (record[position] == ']') { -- if (!stringDelimited) { -- arrayDelimited = false; -- } -- } -- position++; -- } -- parseField(list, record, start, position, index); -- return list.toArray(); -- -- case ARRAY_RECORD: -- return (Object[])content; -- -- default: -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); -- } -- } -- -- private int parseField(ArrayList list, char[] record, -- int start, int end, int index) { -- String field = String.valueOf(record, start, end-start).trim(); -- -- int fieldType; -- if (fieldTypes == null) { -- fieldType = guessType(field); -- } else { -- fieldType = fieldTypes[index]; -- } -- -- switch (fieldType) { -- case FieldTypes.UTF: -- if (field.charAt(0) != stringDelimiter || -- field.charAt(field.length()-1) != stringDelimiter) { -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); -- } -- list.add(index, unescape(field.substring(1, field.length()-1))); -- break; -- -- case FieldTypes.BIN: -- if (field.charAt(0) != '[' || -- field.charAt(field.length()-1) != ']') { -- throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); -- } -- String[] splits = -- field.substring(1, field.length()-1).split(String.valueOf(',')); -- byte[] bytes = new byte[splits.length]; -- for (int i=0; i> input, -- Class> mapper, -- Class> output) -- throws IOException, InterruptedException, ClassNotFoundException { -- Job job = new Job(conf); -- job.setInputFormatClass(input); -- job.setMapperClass(mapper); -- job.setMapOutputKeyClass(SqoopWritable.class); -- job.setMapOutputValueClass(NullWritable.class); -- job.setOutputFormatClass(output); -- job.setOutputKeyClass(SqoopWritable.class); -- job.setOutputValueClass(NullWritable.class); -- -- boolean ret = job.waitForCompletion(true); -- -- // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner -- if (isHadoop1()) { -- callOutputCommitter(job, output); -- } -- -- return ret; -- } -- -- /** -- * Call output format on given job manually. -- */ -- private static void callOutputCommitter(Job job, Class> outputFormat) throws IOException, InterruptedException { -- OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null); -- -- JobContext jobContext = mock(JobContext.class); -- when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); -- -- committer.commitJob(jobContext); -- } -- -- /** -- * Detect Hadoop 1.0 installation -- * -- * @return True if and only if this is Hadoop 1 and below -- */ -- public static boolean isHadoop1() { -- String version = org.apache.hadoop.util.VersionInfo.getVersion(); -- if (version.matches("\\b0\\.20\\..+\\b") -- || version.matches("\\b1\\.\\d\\.\\d")) { -- return true; -- } -- return false; -- } -- -- private JobUtils() { -- // Disable explicit object creation -- } -- --} -diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java -index 78ae4ec..bbac7d2 100644 ---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java -+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java -@@ -37,6 +37,7 @@ - import org.apache.sqoop.common.Direction; - import org.apache.sqoop.connector.common.EmptyConfiguration; - import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -+import org.apache.sqoop.connector.idf.IntermediateDataFormat; - import org.apache.sqoop.job.etl.Destroyer; - import org.apache.sqoop.job.etl.DestroyerContext; - import org.apache.sqoop.job.etl.Extractor; -@@ -46,17 +47,13 @@ - import org.apache.sqoop.job.etl.Partition; - import org.apache.sqoop.job.etl.Partitioner; - import org.apache.sqoop.job.etl.PartitionerContext; --import org.apache.sqoop.job.io.Data; - import org.apache.sqoop.job.io.SqoopWritable; - import org.apache.sqoop.job.mr.MRConfigurationUtils; - import org.apache.sqoop.job.mr.SqoopInputFormat; - import org.apache.sqoop.job.mr.SqoopMapper; - import org.apache.sqoop.job.mr.SqoopNullOutputFormat; - import org.apache.sqoop.job.mr.SqoopSplit; --import org.apache.sqoop.schema.Schema; --import org.apache.sqoop.schema.type.FixedPoint; --import org.apache.sqoop.schema.type.FloatingPoint; --import org.apache.sqoop.schema.type.Text; -+import org.apache.sqoop.job.util.MRJobTestUtil; - import org.junit.Assert; - import org.junit.Test; - -@@ -67,11 +64,10 @@ - private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; - - @Test -- public void testInputFormat() throws Exception { -+ public void testSqoopInputFormat() throws Exception { - Configuration conf = new Configuration(); - conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, -- CSVIntermediateDataFormat.class.getName()); -+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); - Job job = new Job(conf); - - SqoopInputFormat inputformat = new SqoopInputFormat(); -@@ -79,51 +75,47 @@ public void testInputFormat() throws Exception { - assertEquals(9, splits.size()); - - for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { -- SqoopSplit split = (SqoopSplit)splits.get(id-1); -- DummyPartition partition = (DummyPartition)split.getPartition(); -+ SqoopSplit split = (SqoopSplit) splits.get(id - 1); -+ DummyPartition partition = (DummyPartition) split.getPartition(); - assertEquals(id, partition.getId()); - } - } - - @Test -- public void testMapper() throws Exception { -+ public void testSqoopMapper() throws Exception { - Configuration conf = new Configuration(); - conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, -- CSVIntermediateDataFormat.class.getName()); -- Schema schema = new Schema("Test"); -- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -- .addColumn(new org.apache.sqoop.schema.type.Text("3")); -- -+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); - Job job = new Job(conf); -- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); -- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); -- boolean success = JobUtils.runJob(job.getConfiguration(), -- SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); -+ // from and to have the same schema in this test case -+ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); -+ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); -+ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), -+ SqoopInputFormat.class, -+ SqoopMapper.class, -+ DummyOutputFormat.class); - Assert.assertEquals("Job failed!", true, success); - } - - @Test -- public void testOutputFormat() throws Exception { -+ public void testNullOutputFormat() throws Exception { - Configuration conf = new Configuration(); - conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); - conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); -- conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, -- CSVIntermediateDataFormat.class.getName()); -- Schema schema = new Schema("Test"); -- schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -- .addColumn(new Text("3")); -+ conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); - - Job job = new Job(conf); -- MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); -- MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); -- boolean success = JobUtils.runJob(job.getConfiguration(), -- SqoopInputFormat.class, SqoopMapper.class, -- SqoopNullOutputFormat.class); -+ // from and to have the same schema in this test case -+ MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); -+ MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema()); -+ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), -+ SqoopInputFormat.class, -+ SqoopMapper.class, -+ SqoopNullOutputFormat.class); - Assert.assertEquals("Job failed!", true, success); - - // Make sure both destroyers get called. -@@ -171,15 +163,17 @@ public String toString() { - } - } - -- public static class DummyExtractor extends Extractor { -+ public static class DummyExtractor extends -+ Extractor { - @Override -- public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { -- int id = ((DummyPartition)partition).getId(); -+ public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, -+ DummyPartition partition) { -+ int id = ((DummyPartition) partition).getId(); - for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { -- context.getDataWriter().writeArrayRecord(new Object[] { -- id * NUMBER_OF_ROWS_PER_PARTITION + row, -- (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), -- String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); -+ context.getDataWriter().writeArrayRecord( -+ new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row, -+ (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), -+ String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) }); - } - } - -@@ -189,16 +183,14 @@ public long getRowsRead() { - } - } - -- public static class DummyOutputFormat -- extends OutputFormat { -+ public static class DummyOutputFormat extends OutputFormat { - @Override - public void checkOutputSpecs(JobContext context) { - // do nothing - } - - @Override -- public RecordWriter getRecordWriter( -- TaskAttemptContext context) { -+ public RecordWriter getRecordWriter(TaskAttemptContext context) { - return new DummyRecordWriter(); - } - -@@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new DummyOutputCommitter(); - } - -- public static class DummyRecordWriter -- extends RecordWriter { -- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; -- private Data data = new Data(); -+ public static class DummyRecordWriter extends RecordWriter { -+ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; -+ // should I use a dummy IDF for testing? -+ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - - @Override - public void write(SqoopWritable key, NullWritable value) { -- -- data.setContent(new Object[] { -- index, -- (double) index, -- String.valueOf(index)}, -- Data.ARRAY_RECORD); -+ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; -+ dataFormat.setTextData(testData); - index++; -- -- assertEquals(data.toString(), key.toString()); -+ assertEquals(dataFormat.getTextData().toString(), key.toString()); - } - - @Override -@@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) { - - public static class DummyOutputCommitter extends OutputCommitter { - @Override -- public void setupJob(JobContext jobContext) { } -+ public void setupJob(JobContext jobContext) { -+ } - - @Override -- public void setupTask(TaskAttemptContext taskContext) { } -+ public void setupTask(TaskAttemptContext taskContext) { -+ } - - @Override -- public void commitTask(TaskAttemptContext taskContext) { } -+ public void commitTask(TaskAttemptContext taskContext) { -+ } - - @Override -- public void abortTask(TaskAttemptContext taskContext) { } -+ public void abortTask(TaskAttemptContext taskContext) { -+ } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskContext) { -@@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { - } - } - -+ // it is writing to the target. - public static class DummyLoader extends Loader { -- private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; -- private Data expected = new Data(); -+ private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; -+ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - - @Override -- public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ -+ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) -+ throws Exception { - String data; - while ((data = context.getDataReader().readTextRecord()) != null) { -- expected.setContent(new Object[] { -- index, -- (double) index, -- String.valueOf(index)}, -- Data.ARRAY_RECORD); -+ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; -+ dataFormat.setTextData(testData); - index++; -- assertEquals(expected.toString(), data); -+ assertEquals(dataFormat.getTextData().toString(), data); - } - } - } - - public static class DummyFromDestroyer extends Destroyer { -- - public static int count = 0; -- - @Override - public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { - count++; - } - } - -- public static class DummyToDestroyer extends Destroyer { -- -+ public static class DummyToDestroyer extends Destroyer { - public static int count = 0; -- - @Override - public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { - count++; -diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java -index 04fb692..a64a4a6 100644 ---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java -+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java -@@ -38,16 +38,17 @@ - import org.apache.sqoop.common.Direction; - import org.apache.sqoop.connector.common.EmptyConfiguration; - import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -+import org.apache.sqoop.connector.idf.IntermediateDataFormat; - import org.apache.sqoop.job.etl.Extractor; - import org.apache.sqoop.job.etl.ExtractorContext; - import org.apache.sqoop.job.etl.Partition; - import org.apache.sqoop.job.etl.Partitioner; - import org.apache.sqoop.job.etl.PartitionerContext; --import org.apache.sqoop.job.io.Data; - import org.apache.sqoop.job.io.SqoopWritable; - import org.apache.sqoop.job.mr.MRConfigurationUtils; - import org.apache.sqoop.job.mr.SqoopInputFormat; - import org.apache.sqoop.job.mr.SqoopMapper; -+import org.apache.sqoop.job.util.MRJobTestUtil; - import org.apache.sqoop.schema.Schema; - import org.apache.sqoop.schema.type.FixedPoint; - import org.apache.sqoop.schema.type.FloatingPoint; -@@ -121,6 +122,7 @@ public TestMatching(Schema from, - return parameters; - } - -+ @SuppressWarnings("deprecation") - @Test - public void testSchemaMatching() throws Exception { - Configuration conf = new Configuration(); -@@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception { - Job job = new Job(conf); - MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); - MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); -- JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, -+ MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, - DummyOutputFormat.class); -- boolean success = JobUtils.runJob(job.getConfiguration(), -+ boolean success = MRJobTestUtil.runJob(job.getConfiguration(), - SqoopInputFormat.class, SqoopMapper.class, - DummyOutputFormat.class); - if (from.getName().split("-")[1].equals("EMPTY")) { -@@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - public static class DummyRecordWriter - extends RecordWriter { - private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; -- private Data data = new Data(); -+ private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - - @Override - public void write(SqoopWritable key, NullWritable value) { -- -- data.setContent(new Object[] { -- index, -- (double) index, -- String.valueOf(index)}, -- Data.ARRAY_RECORD); -+ String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; -+ dataFormat.setTextData(testData); - index++; -- -- assertEquals(data.toString(), key.toString()); -+ assertEquals(dataFormat.getTextData().toString(), key.toString()); - } - - @Override -diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java -deleted file mode 100644 -index 68ce5ed..0000000 ---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java -+++ /dev/null -@@ -1,95 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ --package org.apache.sqoop.job.io; -- --import com.google.common.base.Charsets; -- --import java.io.ByteArrayInputStream; --import java.io.ByteArrayOutputStream; --import java.io.DataInput; --import java.io.DataInputStream; --import java.io.DataOutput; --import java.io.DataOutputStream; --import java.io.IOException; --import java.io.InputStream; -- --import org.apache.hadoop.conf.Configuration; --import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; --import org.apache.sqoop.job.MRJobConstants; --import org.junit.Assert; --import org.junit.Test; -- --public class SqoopWritableTest { -- -- private final SqoopWritable writable = new SqoopWritable(); -- -- @Test -- public void testStringInStringOut() { -- String testData = "Live Long and prosper"; -- writable.setString(testData); -- Assert.assertEquals(testData,writable.getString()); -- } -- -- @Test -- public void testDataWritten() throws IOException { -- String testData = "One ring to rule them all"; -- byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); -- writable.setString(testData); -- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); -- DataOutput out = new DataOutputStream(ostream); -- writable.write(out); -- byte[] written = ostream.toByteArray(); -- InputStream instream = new ByteArrayInputStream(written); -- DataInput in = new DataInputStream(instream); -- String readData = in.readUTF(); -- Assert.assertEquals(testData, readData); -- } -- -- @Test -- public void testDataRead() throws IOException { -- String testData = "Brandywine Bridge - 20 miles!"; -- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); -- DataOutput out = new DataOutputStream(ostream); -- out.writeUTF(testData); -- InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); -- DataInput in = new DataInputStream(instream); -- writable.readFields(in); -- Assert.assertEquals(testData, writable.getString()); -- } -- -- @Test -- public void testWriteReadUsingStream() throws IOException { -- String testData = "You shall not pass"; -- ByteArrayOutputStream ostream = new ByteArrayOutputStream(); -- DataOutput out = new DataOutputStream(ostream); -- writable.setString(testData); -- writable.write(out); -- byte[] written = ostream.toByteArray(); -- -- //Don't test what the data is, test that SqoopWritable can read it. -- InputStream instream = new ByteArrayInputStream(written); -- SqoopWritable newWritable = new SqoopWritable(); -- DataInput in = new DataInputStream(instream); -- newWritable.readFields(in); -- Assert.assertEquals(testData, newWritable.getString()); -- ostream.close(); -- instream.close(); -- } -- --} -diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java -deleted file mode 100644 -index 4e23bcb..0000000 ---- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java -+++ /dev/null -@@ -1,117 +0,0 @@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.sqoop.job.io; -- --import java.util.Arrays; -- --import org.junit.Assert; --import org.junit.Test; -- --public class TestData { -- -- private static final double TEST_NUMBER = Math.PI + 100; -- @Test -- public void testArrayToCsv() throws Exception { -- Data data = new Data(); -- String expected; -- String actual; -- -- // with special characters: -- expected = -- Long.valueOf((long)TEST_NUMBER) + "," + -- Double.valueOf(TEST_NUMBER) + "," + -- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + -- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); -- data.setContent(new Object[] { -- Long.valueOf((long)TEST_NUMBER), -- Double.valueOf(TEST_NUMBER), -- String.valueOf(TEST_NUMBER) + "',s", -- new byte[] {1, 2, 3, 4, 5} }, -- Data.ARRAY_RECORD); -- actual = (String)data.getContent(Data.CSV_RECORD); -- assertEquals(expected, actual); -- -- // with null characters: -- expected = -- Long.valueOf((long)TEST_NUMBER) + "," + -- Double.valueOf(TEST_NUMBER) + "," + -- "null" + "," + -- Arrays.toString(new byte[] {1, 2, 3, 4, 5}); -- data.setContent(new Object[] { -- Long.valueOf((long)TEST_NUMBER), -- Double.valueOf(TEST_NUMBER), -- null, -- new byte[] {1, 2, 3, 4, 5} }, -- Data.ARRAY_RECORD); -- actual = (String)data.getContent(Data.CSV_RECORD); -- assertEquals(expected, actual); -- } -- -- @Test -- public void testCsvToArray() throws Exception { -- Data data = new Data(); -- Object[] expected; -- Object[] actual; -- -- // with special characters: -- expected = new Object[] { -- Long.valueOf((long)TEST_NUMBER), -- Double.valueOf(TEST_NUMBER), -- String.valueOf(TEST_NUMBER) + "',s", -- new byte[] {1, 2, 3, 4, 5} }; -- data.setContent( -- Long.valueOf((long)TEST_NUMBER) + "," + -- Double.valueOf(TEST_NUMBER) + "," + -- "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + -- Arrays.toString(new byte[] {1, 2, 3, 4, 5}), -- Data.CSV_RECORD); -- actual = (Object[])data.getContent(Data.ARRAY_RECORD); -- assertEquals(expected.length, actual.length); -- for (int c=0; c writer = executor.getRecordWriter(); -- IntermediateDataFormat data = new CSVIntermediateDataFormat(); -+ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); - try { - for (int count = 0; count < 100; count++) { -- data.setTextData(String.valueOf(count)); -- writable.setString(data.getTextData()); -+ dataFormat.setTextData(String.valueOf(count)); -+ writable.setString(dataFormat.getTextData()); - writer.write(writable, null); - } - } catch (SqoopException ex) { -@@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); -- IntermediateDataFormat data = new CSVIntermediateDataFormat(); -+ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); - for (int i = 0; i < 10; i++) { - StringBuilder builder = new StringBuilder(); -@@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable { - builder.append(","); - } - } -- data.setTextData(builder.toString()); -- writable.setString(data.getTextData()); -+ dataFormat.setTextData(builder.toString()); -+ writable.setString(dataFormat.getTextData()); - writer.write(writable, null); - } - writer.close(null); -@@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable { - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); -- IntermediateDataFormat data = new CSVIntermediateDataFormat(); -+ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); - StringBuilder builder = new StringBuilder(); - for (int count = 0; count < 100; count++) { -@@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable { - builder.append(","); - } - } -- data.setTextData(builder.toString()); -- writable.setString(data.getTextData()); -+ dataFormat.setTextData(builder.toString()); -+ writable.setString(dataFormat.getTextData()); - writer.write(writable, null); - - //Allow writer to complete. -@@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable { - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); - RecordWriter writer = executor.getRecordWriter(); -- IntermediateDataFormat data = new CSVIntermediateDataFormat(); -+ IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); - try { - for (int i = 0; i < 10; i++) { -@@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable { - builder.append(","); - } - } -- data.setTextData(builder.toString()); -- writable.setString(data.getTextData()); -+ dataFormat.setTextData(builder.toString()); -+ writable.setString(dataFormat.getTextData()); - writer.write(writable, null); - } - writer.close(null); -diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java -new file mode 100644 -index 0000000..5d5359e ---- /dev/null -+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java -@@ -0,0 +1,114 @@ -+/** -+ * Licensed to the Apache Software Foundation (ASF) under one -+ * or more contributor license agreements. See the NOTICE file -+ * distributed with this work for additional information -+ * regarding copyright ownership. The ASF licenses this file -+ * to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance -+ * with the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+package org.apache.sqoop.job.util; -+ -+import java.io.IOException; -+ -+import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.io.NullWritable; -+import org.apache.hadoop.mapreduce.InputFormat; -+import org.apache.hadoop.mapreduce.Job; -+import org.apache.hadoop.mapreduce.JobContext; -+import org.apache.hadoop.mapreduce.Mapper; -+import org.apache.hadoop.mapreduce.OutputCommitter; -+import org.apache.hadoop.mapreduce.OutputFormat; -+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -+import org.apache.sqoop.connector.idf.IntermediateDataFormat; -+import org.apache.sqoop.job.io.SqoopWritable; -+import org.apache.sqoop.job.mr.SqoopSplit; -+import org.apache.sqoop.schema.Schema; -+import org.apache.sqoop.schema.type.FixedPoint; -+import org.apache.sqoop.schema.type.FloatingPoint; -+import org.apache.sqoop.schema.type.Text; -+import org.apache.sqoop.utils.ClassUtils; -+ -+import static org.mockito.Mockito.mock; -+import static org.mockito.Mockito.when; -+ -+public class MRJobTestUtil { -+ -+ @SuppressWarnings("deprecation") -+ public static boolean runJob(Configuration conf, -+ Class> input, -+ Class> mapper, -+ Class> output) throws IOException, -+ InterruptedException, ClassNotFoundException { -+ Job job = new Job(conf); -+ job.setInputFormatClass(input); -+ job.setMapperClass(mapper); -+ job.setMapOutputKeyClass(SqoopWritable.class); -+ job.setMapOutputValueClass(NullWritable.class); -+ job.setOutputFormatClass(output); -+ job.setOutputKeyClass(SqoopWritable.class); -+ job.setOutputValueClass(NullWritable.class); -+ -+ boolean ret = job.waitForCompletion(true); -+ -+ // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in -+ // LocalJobRuner -+ if (isHadoop1()) { -+ callOutputCommitter(job, output); -+ } -+ -+ return ret; -+ } -+ -+ public static Schema getTestSchema() { -+ Schema schema = new Schema("Test"); -+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -+ .addColumn(new Text("3")); -+ return schema; -+ } -+ -+ public static IntermediateDataFormat getTestIDF() { -+ return new CSVIntermediateDataFormat(getTestSchema()); -+ } -+ -+ /** -+ * Call output format on given job manually. -+ */ -+ private static void callOutputCommitter(Job job, -+ Class> outputFormat) throws IOException, -+ InterruptedException { -+ OutputCommitter committer = ((OutputFormat) ClassUtils.instantiate(outputFormat)) -+ .getOutputCommitter(null); -+ -+ JobContext jobContext = mock(JobContext.class); -+ when(jobContext.getConfiguration()).thenReturn(job.getConfiguration()); -+ -+ committer.commitJob(jobContext); -+ } -+ -+ /** -+ * Detect Hadoop 1.0 installation -+ * -+ * @return True if and only if this is Hadoop 1 and below -+ */ -+ public static boolean isHadoop1() { -+ String version = org.apache.hadoop.util.VersionInfo.getVersion(); -+ if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) { -+ return true; -+ } -+ return false; -+ } -+ -+ private MRJobTestUtil() { -+ // Disable explicit object creation -+ } -+ -+}