From 89dcbe879e81c75a27603d8cd2b2458cc0ce62a0 Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Fri, 9 Jan 2015 11:16:05 -0800 Subject: [PATCH] SQOOP-1956: Sqoop2: Cleanup IDF implementations (Veena Basavaraj via Abraham Elmahrek) --- .../connector/jdbc/util/SqlTypesUtils.java | 2 +- .../connector/jdbc/TestFromInitializer.java | 2 +- .../sqoop/connector/common/SqoopIDFUtils.java | 52 +++--- .../idf/AVROIntermediateDataFormat.java | 22 +-- .../idf/CSVIntermediateDataFormat.java | 171 ++++++++---------- .../connector/idf/IntermediateDataFormat.java | 63 +++---- .../idf/JSONIntermediateDataFormat.java | 31 ++-- .../connector/common/TestSqoopIDFUtils.java | 24 +-- .../idf/TestCSVIntermediateDataFormat.java | 155 ++++++++-------- .../org/apache/sqoop/job/TestMatching.java | 12 +- .../sqoop/job/io/TestSqoopWritable.java | 7 +- .../mr/TestSqoopOutputFormatLoadExecutor.java | 10 +- .../apache/sqoop/job/util/MRJobTestUtil.java | 2 +- 13 files changed, 272 insertions(+), 281 deletions(-) diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java index 9cfee467..c0ca7f2c 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java @@ -49,7 +49,7 @@ public static Column sqlTypeToSchemaType(int sqlType, String columnName) { case Types.SMALLINT: case Types.TINYINT: case Types.INTEGER: - return new FixedPoint(columnName); + return new FixedPoint(columnName).setByteSize(2L); case Types.CLOB: case Types.VARCHAR: diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java index e8c0f0b8..5bdcd99c 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java @@ -102,7 +102,7 @@ public void setUp() { */ public Schema getSchema(String name) { return new Schema(name) - .addColumn(new FixedPoint("ICOL")) + .addColumn(new FixedPoint("ICOL").setByteSize(2L)) .addColumn(new FloatingPoint("DCOL")) .addColumn(new Text("VCOL")) ; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java index 26ff6297..800630f7 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -94,7 +94,7 @@ public class SqoopIDFUtils { // ******** Number Column Type utils*********** - public static String encodeToCSVFixedPoint(Object obj, Column column) { + public static String toCSVFixedPoint(Object obj, Column column) { Long byteSize = ((FixedPoint) column).getByteSize(); if (byteSize != null && byteSize <= Integer.SIZE) { return ((Integer) obj).toString(); @@ -114,7 +114,7 @@ public static Object toFixedPoint(String csvString, Column column) { return returnValue; } - public static String encodeToCSVFloatingPoint(Object obj, Column column) { + public static String toCSVFloatingPoint(Object obj, Column column) { Long byteSize = ((FloatingPoint) column).getByteSize(); if (byteSize != null && byteSize <= Float.SIZE) { return ((Float) obj).toString(); @@ -134,7 +134,7 @@ public static Object toFloatingPoint(String csvString, Column column) { return returnValue; } - public static String encodeToCSVDecimal(Object obj) { + public static String toCSVDecimal(Object obj) { return ((BigDecimal) obj).toString(); } @@ -143,7 +143,7 @@ public static Object toDecimal(String csvString, Column column) { } // ********** BIT Column Type utils****************** - public static String encodeToCSVBit(Object obj) { + public static String toCSVBit(Object obj) { String bitStringValue = obj.toString(); if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { return bitStringValue; @@ -164,16 +164,16 @@ public static Object toBit(String csvString) { // *********** DATE and TIME Column Type utils ********** - public static String encodeToCSVDate(Object obj) { + public static String toCSVDate(Object obj) { org.joda.time.LocalDate date = (org.joda.time.LocalDate) obj; - return encloseWithQuote(df.print(date)); + return encloseWithQuotes(df.print(date)); } - public static String encodeToCSVTime(Object obj, Column col) { + public static String toCSVTime(Object obj, Column col) { if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { - return encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) obj)); + return encloseWithQuotes(tfWithFraction.print((org.joda.time.LocalTime) obj)); } else { - return encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) obj)); + return encloseWithQuotes(tfWithNoFraction.print((org.joda.time.LocalTime) obj)); } } @@ -187,27 +187,27 @@ public static Object toTime(String csvString, Column column) { // *********** DATE TIME Column Type utils ********** - public static String encodeToCSVLocalDateTime(Object obj, Column col) { + public static String toCSVLocalDateTime(Object obj, Column col) { org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj; org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; if (column.hasFraction()) { - return encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime)); + return encloseWithQuotes(dtfWithFractionNoTimeZone.print(localDateTime)); } else { - return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime)); + return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(localDateTime)); } } - public static String encodeToCSVDateTime(Object obj, Column col) { + public static String toCSVDateTime(Object obj, Column col) { org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj; org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; if (column.hasFraction() && column.hasTimezone()) { - return encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime)); + return encloseWithQuotes(dtfWithFractionAndTimeZone.print(dateTime)); } else if (column.hasFraction() && !column.hasTimezone()) { - return encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime)); + return encloseWithQuotes(dtfWithFractionNoTimeZone.print(dateTime)); } else if (column.hasTimezone()) { - return encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime)); + return encloseWithQuotes(dtfWithNoFractionWithTimeZone.print(dateTime)); } else { - return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime)); + return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(dateTime)); } } @@ -256,10 +256,10 @@ public static Long toDateTimeInMillis(String csvString, Column column) { // ************ MAP Column Type utils********* @SuppressWarnings("unchecked") - public static String encodeToCSVMap(Map map, Column column) { + public static String toCSVMap(Map map, Column column) { JSONObject object = new JSONObject(); object.putAll(map); - return encloseWithQuote(object.toJSONString()); + return encloseWithQuotes(object.toJSONString()); } public static Map toMap(String csvString) { @@ -314,7 +314,7 @@ else if (value instanceof JSONObject) { // ************ LIST Column Type utils********* @SuppressWarnings("unchecked") - public static String encodeToCSVList(Object[] list, Column column) { + public static String toCSVList(Object[] list, Column column) { List elementList = new ArrayList(); for (int n = 0; n < list.length; n++) { Column listType = ((AbstractComplexListType) column).getListType(); @@ -332,7 +332,7 @@ public static String encodeToCSVList(Object[] list, Column column) { } JSONArray array = new JSONArray(); array.addAll(elementList); - return encloseWithQuote(array.toJSONString()); + return encloseWithQuotes(array.toJSONString()); } public static Object[] toList(String csvString) { @@ -397,7 +397,7 @@ private static String getRegExp(String string) { return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); } - public static String encodeToCSVString(String string) { + public static String toCSVString(String string) { int j = 0; String replacement = string; try { @@ -408,7 +408,7 @@ public static String encodeToCSVString(String string) { throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage()); } - return encloseWithQuote(replacement); + return encloseWithQuotes(replacement); } public static String toText(String string) { @@ -429,10 +429,10 @@ public static String toText(String string) { // ************ BINARY Column type utils********* - public static String encodeToCSVByteArray(Object obj) { + public static String toCSVByteArray(Object obj) { byte[] bytes = (byte[]) obj; try { - return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET)); + return toCSVString(new String(bytes, BYTE_FIELD_CHARSET)); } catch (UnsupportedEncodingException e) { // We should never hit this case. // This character set should be distributed with Java. @@ -455,7 +455,7 @@ public static byte[] toByteArray(String csvString) { // *********** SQOOP CSV standard encoding utils******************** - public static String encloseWithQuote(String string) { + public static String encloseWithQuotes(String string) { StringBuilder builder = new StringBuilder(); builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER); return builder.toString(); diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index b12b59ad..f0dd914a 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -302,47 +302,47 @@ private String toCSV(GenericRecord record) { case ARRAY: case SET: List objList = (List) obj; - csvString.append(encodeToCSVList(toObjectArray(objList), cols[i])); + csvString.append(toCSVList(toObjectArray(objList), cols[i])); break; case MAP: Map objMap = (Map) obj; - csvString.append(encodeToCSVMap(objMap, cols[i])); + csvString.append(toCSVMap(objMap, cols[i])); break; case ENUM: case TEXT: - csvString.append(encodeToCSVString(obj.toString())); + csvString.append(toCSVString(obj.toString())); break; case BINARY: case UNKNOWN: - csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj))); + csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj))); break; case FIXED_POINT: - csvString.append(encodeToCSVFixedPoint(obj, cols[i])); + csvString.append(toCSVFixedPoint(obj, cols[i])); break; case FLOATING_POINT: - csvString.append(encodeToCSVFloatingPoint(obj, cols[i])); + csvString.append(toCSVFloatingPoint(obj, cols[i])); break; case DECIMAL: // stored as string - csvString.append(encodeToCSVDecimal(obj)); + csvString.append(toCSVDecimal(obj)); break; case DATE: // stored as long Long dateInMillis = (Long) obj; - csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis))); + csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis))); break; case TIME: // stored as long Long timeInMillis = (Long) obj; - csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i])); + csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i])); break; case DATE_TIME: // stored as long Long dateTimeInMillis = (Long) obj; - csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i])); + csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i])); break; case BIT: - csvString.append(encodeToCSVBit(obj)); + csvString.append(toCSVBit(obj)); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 33b5d0a6..856a4bb2 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -20,45 +20,39 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; -import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.utils.ClassUtils; -import org.joda.time.DateTime; -import org.joda.time.LocalDate; -import org.joda.time.LocalDateTime; -import org.joda.time.LocalTime; -import org.json.simple.JSONValue; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashSet; import java.util.Map; import java.util.Set; /** * A concrete implementation for the {@link #IntermediateDataFormat} that * represents each row of the data source as a comma separates list. Each - * element in the CSV represents a specific column value encoded as string using the sqoop specified rules. - * The methods allow serializing to this string and deserializing the string to its - * corresponding java object based on the {@link #Schema} and its - * {@link #Column} types. + * element in the CSV represents a specific column value encoded as string using + * the sqoop specified rules. The methods allow serializing to this string and + * deserializing the string to its corresponding java object based on the + * {@link #Schema} and its {@link #Column} types. * */ public class CSVIntermediateDataFormat extends IntermediateDataFormat { public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class); + // need this default constructor for reflection magic used in execution engine public CSVIntermediateDataFormat() { } public CSVIntermediateDataFormat(Schema schema) { - setSchema(schema); + super.setSchema(schema); } + /** * {@inheritDoc} */ @@ -80,11 +74,7 @@ public void setCSVTextData(String csvText) { */ @Override public Object[] getObjectData() { - if (schema == null || schema.isEmpty()) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002); - } - - // fieldStringArray represents the csv fields parsed into string array + super.validateSchema(schema); String[] csvStringArray = parseCSVString(this.data); if (csvStringArray == null) { @@ -92,14 +82,13 @@ public Object[] getObjectData() { } if (csvStringArray.length != schema.getColumnsArray().length) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + getCSVTextData() - + " has the wrong number of fields."); + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "The data " + getCSVTextData() + " has the wrong number of fields."); } Object[] objectArray = new Object[csvStringArray.length]; Column[] columnArray = schema.getColumnsArray(); for (int i = 0; i < csvStringArray.length; i++) { - // check for NULL field and bail out immediately if (csvStringArray[i].equals(NULL_VALUE)) { objectArray[i] = null; continue; @@ -109,7 +98,6 @@ public Object[] getObjectData() { return objectArray; } - private Object toObject(String csvString, Column column) { Object returnValue = null; @@ -163,18 +151,9 @@ private Object toObject(String csvString, Column column) { */ @Override public void setObjectData(Object[] data) { - Set nullValueIndices = new HashSet(); - Column[] columnArray = schema.getColumnsArray(); - // check for null - for (int i = 0; i < data.length; i++) { - if (data[i] == null) { - nullValueIndices.add(i); - data[i] = NULL_VALUE; - } - } - // ignore the null values while encoding the object array into csv string - encodeToCSVText(data, columnArray, nullValueIndices); - this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER); + super.validateSchema(schema); + // convert object array to csv text + this.data = toCSV(data); } @@ -200,60 +179,75 @@ public void read(DataInput in) throws IOException { * array * * @param objectArray - * @param columnArray - * @param nullValueIndices */ @SuppressWarnings("unchecked") - private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set nullValueIndices) { - for (int i : bitTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVBit(objectArray[i]); - } - } - for (int i : stringTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVString((String) objectArray[i]); - } - } - for (int i : dateTimeTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - Column col = columnArray[i]; - if (objectArray[i] instanceof org.joda.time.DateTime) { - org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; - // check for fraction and time zone and then use the right formatter - objectArray[i] = encodeToCSVDateTime(dateTime, col); - } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { - org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; - objectArray[i] = encodeToCSVLocalDateTime(localDateTime, col); + private String toCSV(Object[] objectArray) { + + Column[] columnArray = schema.getColumnsArray(); + + StringBuilder csvString = new StringBuilder(); + for (int i = 0; i < columnArray.length; i++) { + Object obj = objectArray[i]; + if (obj == null) { + csvString.append(NULL_VALUE); + } else { + switch (columnArray[i].getType()) { + case ARRAY: + case SET: + csvString.append(toCSVList((Object[]) obj, (AbstractComplexListType) columnArray[i])); + break; + case MAP: + csvString.append(toCSVMap((Map) obj, columnArray[i])); + break; + case ENUM: + case TEXT: + csvString.append(toCSVString(obj.toString())); + break; + case BINARY: + case UNKNOWN: + csvString.append(toCSVByteArray((byte[]) obj)); + break; + case FIXED_POINT: + csvString.append(toCSVFixedPoint(obj, columnArray[i])); + break; + case FLOATING_POINT: + csvString.append(toCSVFloatingPoint(obj, columnArray[i])); + break; + case DECIMAL: + csvString.append(toCSVDecimal(obj)); + break; + // stored in JSON as strings in the joda time format + case DATE: + csvString.append(toCSVDate(obj)); + break; + case TIME: + csvString.append(toCSVTime(obj, columnArray[i])); + break; + case DATE_TIME: + if (objectArray[i] instanceof org.joda.time.DateTime) { + org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj; + // check for fraction and time zone and then use the right formatter + csvString.append(toCSVDateTime(dateTime, columnArray[i])); + } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { + org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj; + csvString.append(toCSVLocalDateTime(localDateTime, columnArray[i])); + } + break; + case BIT: + csvString.append(toCSVBit(obj)); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + columnArray[i].getType()); } } - } - for (int i : dateTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVDate(objectArray[i]); - } - } - for (int i : timeTypeColumnIndices) { - Column col = columnArray[i]; - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVTime(objectArray[i], col); - } - } - for (int i : byteTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]); - } - } - for (int i : listTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType) columnArray[i]); - } - } - for (int i : mapTypeColumnIndices) { - if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeToCSVMap((Map) objectArray[i], columnArray[i]); + if (i < columnArray.length - 1) { + csvString.append(CSV_SEPARATOR_CHARACTER); } + } + + return csvString.toString(); } /** @@ -261,15 +255,6 @@ private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set getJars() { - - Set jars = super.getJars(); - // Add JODA classes for IDF date/time handling - jars.add(ClassUtils.jarForClass(LocalDate.class)); - jars.add(ClassUtils.jarForClass(LocalDateTime.class)); - jars.add(ClassUtils.jarForClass(DateTime.class)); - jars.add(ClassUtils.jarForClass(LocalTime.class)); - // Add JSON parsing jar - jars.add(ClassUtils.jarForClass(JSONValue.class)); - return jars; + return super.getJars(); } -} \ No newline at end of file +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 055b41ce..261a4627 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -18,12 +18,14 @@ */ package org.apache.sqoop.connector.idf; -import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType; -import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType; - +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.schema.type.ColumnType; +import org.apache.sqoop.utils.ClassUtils; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.joda.time.LocalDateTime; +import org.joda.time.LocalTime; +import org.json.simple.JSONValue; import java.io.DataInput; import java.io.DataOutput; @@ -51,16 +53,6 @@ public abstract class IntermediateDataFormat { protected Schema schema; - protected final Set stringTypeColumnIndices = new HashSet(); - protected final Set bitTypeColumnIndices = new HashSet(); - protected final Set byteTypeColumnIndices = new HashSet(); - protected final Set listTypeColumnIndices = new HashSet(); - protected final Set mapTypeColumnIndices = new HashSet(); - protected final Set dateTimeTypeColumnIndices = new HashSet(); - protected final Set dateTypeColumnIndices = new HashSet(); - protected final Set timeTypeColumnIndices = new HashSet(); - - /** * Get one row of data. * @@ -134,32 +126,13 @@ public void setData(T obj) { * - the schema used for serializing/de-serializing data */ public void setSchema(Schema schema) { - if (schema == null) { - // TODO(SQOOP-1956): throw an exception since working without a schema is dangerous - return; - } + validateSchema(schema); this.schema = schema; - Column[] columns = schema.getColumnsArray(); - int i = 0; - for (Column col : columns) { - if (isColumnStringType(col)) { - stringTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.BIT) { - bitTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.DATE) { - dateTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.TIME) { - timeTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.DATE_TIME) { - dateTimeTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.BINARY) { - byteTypeColumnIndices.add(i); - } else if (isColumnListType(col)) { - listTypeColumnIndices.add(i); - } else if (col.getType() == ColumnType.MAP) { - mapTypeColumnIndices.add(i); - } - i++; + } + + protected void validateSchema(Schema schema) { + if (schema == null) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002); } } @@ -186,7 +159,15 @@ public void setSchema(Schema schema) { * @return set of jars */ public Set getJars() { - return new HashSet(); + Set jars = new HashSet(); + // Add JODA classes for IDF date/time handling + jars.add(ClassUtils.jarForClass(LocalDate.class)); + jars.add(ClassUtils.jarForClass(LocalDateTime.class)); + jars.add(ClassUtils.jarForClass(DateTime.class)); + jars.add(ClassUtils.jarForClass(LocalTime.class)); + // Add JSON parsing jar + jars.add(ClassUtils.jarForClass(JSONValue.class)); + return jars; } @Override diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java index 90294f0a..b937d876 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java @@ -41,6 +41,10 @@ */ public class JSONIntermediateDataFormat extends IntermediateDataFormat { + // need this default constructor for reflection magic used in execution engine + public JSONIntermediateDataFormat() { + } + // We need schema at all times public JSONIntermediateDataFormat(Schema schema) { setSchema(schema); @@ -110,6 +114,7 @@ public Set getJars() { Set jars = super.getJars(); jars.add(ClassUtils.jarForClass(JSONObject.class)); + jars.add(ClassUtils.jarForClass(JSONArray.class)); return jars; } @@ -241,16 +246,16 @@ private JSONObject toJSON(Object[] data) { // stored in JSON as the same format as csv strings in the joda time // format case DATE_TIME: - object.put(cols[i].getName(), removeQuotes(encodeToCSVDateTime(data[i], cols[i]))); + object.put(cols[i].getName(), removeQuotes(toCSVDateTime(data[i], cols[i]))); break; case TIME: - object.put(cols[i].getName(), removeQuotes(encodeToCSVTime(data[i], cols[i]))); + object.put(cols[i].getName(), removeQuotes(toCSVTime(data[i], cols[i]))); break; case DATE: - object.put(cols[i].getName(), removeQuotes(encodeToCSVDate(data[i]))); + object.put(cols[i].getName(), removeQuotes(toCSVDate(data[i]))); break; case BIT: - object.put(cols[i].getName(), Boolean.valueOf(encodeToCSVBit(data[i]))); + object.put(cols[i].getName(), Boolean.valueOf(toCSVBit(data[i]))); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, @@ -278,40 +283,40 @@ private String toCSV(JSONObject json) { case SET: // stored as JSON array JSONArray array = (JSONArray) obj; - csvString.append(encloseWithQuote(array.toJSONString())); + csvString.append(encloseWithQuotes(array.toJSONString())); break; case MAP: // stored as JSON object - csvString.append(encloseWithQuote((((JSONObject) obj).toJSONString()))); + csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString()))); break; case ENUM: case TEXT: - csvString.append(encodeToCSVString(obj.toString())); + csvString.append(toCSVString(obj.toString())); break; case BINARY: case UNKNOWN: - csvString.append(encodeToCSVByteArray(Base64.decodeBase64(obj.toString()))); + csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString()))); break; case FIXED_POINT: - csvString.append(encodeToCSVFixedPoint(obj, cols[i])); + csvString.append(toCSVFixedPoint(obj, cols[i])); break; case FLOATING_POINT: - csvString.append(encodeToCSVFloatingPoint(obj, cols[i])); + csvString.append(toCSVFloatingPoint(obj, cols[i])); break; case DECIMAL: - csvString.append(encodeToCSVDecimal(obj)); + csvString.append(toCSVDecimal(obj)); break; // stored in JSON as strings in the joda time format case DATE: case TIME: case DATE_TIME: - csvString.append(encloseWithQuote(obj.toString())); + csvString.append(encloseWithQuotes(obj.toString())); break; // 0/1 will be stored as they are in JSON, even though valid values in // JSON // are true/false case BIT: - csvString.append(encodeToCSVBit(obj)); + csvString.append(toCSVBit(obj)); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java index 71db8dae..1cef714f 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java @@ -45,7 +45,7 @@ public static String getByteFieldString(byte[] byteFieldData) { @Test public void testEncloseStringWithQuotes() { String test = "test"; - String quotedText = encloseWithQuote(test); + String quotedText = encloseWithQuotes(test); assertEquals(quotedText, "'test'"); } @@ -53,7 +53,7 @@ public void testEncloseStringWithQuotes() { @Test public void testStringWithQuotesToEncloseStringWithQuotes() { String test = "'test'"; - String quotedText = encloseWithQuote(test); + String quotedText = encloseWithQuotes(test); assertEquals(quotedText, "''test''"); } @@ -82,28 +82,28 @@ public void testStingWithNoQuotesRemoveQuotes() { @Test public void testExample1EncodeToCSVString() { String test = "test"; - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'test'"); } @Test public void testExample2EncodeToCSVString() { String test = "test,test1"; - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'test,test1'"); } @Test public void testExample3EncodeToCSVString() { String test = "test,'test1"; - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'test,\\'test1'"); } @Test public void testExample4EncodeToCSVString() { String test = "test,\"test1"; - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'test,\\\"test1'"); } @@ -117,7 +117,7 @@ public void testExample4ToString() { public void testExample5EncodeToCSVString() { String test = new String(new char[] { 0x0A }); - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'\\n'"); } @@ -130,7 +130,7 @@ public void testExample5ToString() { public void testExample6EncodeToCSVString() { String test = new String(new char[] { 0x0D }); - String encodedText = encodeToCSVString(test); + String encodedText = toCSVString(test); assertEquals(encodedText, "'\\r'"); } @@ -138,7 +138,7 @@ public void testExample6EncodeToCSVString() { public void testEncodeByteToCSVString() { // byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 }; - String encodedText = encodeToCSVByteArray(bytes); + String encodedText = toCSVByteArray(bytes); String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r"); assertEquals(encodedText, expectedText); } @@ -149,7 +149,7 @@ public void testEncodeArrayIntegersToCSVString() { list.add(1); list.add(2); AbstractComplexListType array = new Array("a", new Text("t")); - String encodedText = encodeToCSVList(list.toArray(), array); + String encodedText = toCSVList(list.toArray(), array); assertEquals(encodedText, "'[1,2]'"); } @@ -159,7 +159,7 @@ public void testEncodeArrayStringsToCSVString() { list.add("A"); list.add("B"); AbstractComplexListType array = new Array("a", new Text("t")); - String encodedText = encodeToCSVList(list.toArray(), array); + String encodedText = toCSVList(list.toArray(), array); assertEquals(encodedText, "'[\"A\",\"B\"]'"); } @@ -172,7 +172,7 @@ public void testEncodeMapToCSVString() { map.put("A", list); org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text( "tr"))); - String encodedText = encodeToCSVMap(map, mapCol); + String encodedText = toCSVMap(map, mapCol); assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'"); } diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index e116f3cc..fca410f0 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -55,7 +55,6 @@ public class TestCSVIntermediateDataFormat { @BeforeMethod public void setUp() { - dataFormat = new CSVIntermediateDataFormat(); } @@ -70,7 +69,7 @@ public void testNullInputAsCSVTextInObjectArrayOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(null); Object[] out = dataFormat.getObjectData(); assertNull(out); @@ -81,7 +80,7 @@ public void testEmptyInputAsCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4")) .addColumn(new Binary("5")).addColumn(new Text("6")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(""); dataFormat.getObjectData(); } @@ -98,12 +97,13 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() { .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null }; dataFormat.setObjectData(in); String csvText = dataFormat.getCSVTextData(); String[] textValues = csvText.split(","); + assertEquals(14, textValues.length); for (String text : textValues) { assertEquals(text, NULL_VALUE); } @@ -121,11 +121,12 @@ public void testNullValueAsObjectArrayInAndObjectArrayOut() { .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null }; dataFormat.setObjectData(in); Object[] out = dataFormat.getObjectData(); + assertEquals(14, out.length); for (Object obj : out) { assertEquals(obj, null); } @@ -143,7 +144,7 @@ public void testNullValueAsCSVTextInAndObjectArrayOut() { .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; dataFormat.setCSVTextData(StringUtils.join(test, ",")); @@ -166,7 +167,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" }; dataFormat.setCSVTextData(StringUtils.join(test, ",")); @@ -182,17 +183,30 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { @Test public void testInputAsCSVTextInCSVTextOut() { - String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'" + String.valueOf(0x0A) + "'"; + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2")) + .addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5")) + .addColumn(new Binary("6")).addColumn(new Text("7")); + + String testData = "'ENUM',10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "'"; + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(testData); assertEquals(testData, dataFormat.getCSVTextData()); } - @Test public void testInputAsCSVTextInAndDataOut() { - String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'" + String.valueOf(0x0A) + "'"; + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2")) + .addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5")) + .addColumn(new Binary("6")).addColumn(new Text("7")); + + String testData = "'ENUM',10,34,'54','random data'," + + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A) + + "'"; + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(testData); assertEquals(testData, dataFormat.getData()); } @@ -204,7 +218,7 @@ public void testInputAsCSVTextInObjectOutWithSingleColumn() { Schema schema = new Schema("test"); schema.addColumn(new Text("text")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(testData); Object[] out = dataFormat.getObjectData(); @@ -226,7 +240,7 @@ public void testInputAsCSVTextInObjectOut() { .addColumn(new Text("6")) .addColumn(new org.apache.sqoop.schema.type.Enum("7")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(testData); Object[] out = dataFormat.getObjectData(); @@ -251,7 +265,7 @@ public void testInputAsObjectArayInCSVTextOrDataOut() { .addColumn(new Binary("5")) .addColumn(new Text("6")) .addColumn(new org.apache.sqoop.schema.type.Enum("7")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[7]; @@ -285,7 +299,7 @@ public void testObjectArrayInObjectArrayOut() { .addColumn(new Text("6")) .addColumn(new org.apache.sqoop.schema.type.Enum("7")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] in = new Object[7]; in[0] = new Long(10); @@ -315,7 +329,7 @@ public void testObjectArrayWithNullInCSVTextOut() { .addColumn(new Text("6")) .addColumn(new org.apache.sqoop.schema.type.Enum("7")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[7]; @@ -340,7 +354,7 @@ public void testStringFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -363,7 +377,7 @@ public void testStringFullRangeOfCharacters() { public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); byte[] allCharByteArr = new byte[256]; for (int i = 0; i < allCharByteArr.length; ++i) { @@ -385,7 +399,7 @@ public void testByteArrayFullRangeOfCharacters() { public void testTimeWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Time("1", false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'12:00:00'"); assertEquals("'12:00:00'", dataFormat.getCSVTextData()); } @@ -394,7 +408,7 @@ public void testTimeWithCSVTextInCSVTextOut() { public void testTimeWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new Time("1", false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'12:59:59'"); org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59); assertEquals(time.toString(), dataFormat.getObjectData()[0].toString()); @@ -404,7 +418,7 @@ public void testTimeWithCSVTextInObjectArrayOut() { public void testTimeWithObjectArrayInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Time("1", true)).addColumn(new Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0); Object[] in = { time, "test" }; dataFormat.setObjectData(in); @@ -415,7 +429,7 @@ public void testTimeWithObjectArrayInCSVTextOut() { public void testTimeWithObjectArrayInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new Time("1", true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); org.joda.time.LocalTime time = new org.joda.time.LocalTime(2, 23, 33); Object[] in = { time }; dataFormat.setObjectData(in); @@ -428,7 +442,7 @@ public void testTimeWithObjectArrayInObjectArrayOut() { public void testDateWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01'"); assertEquals("'2014-10-01'", dataFormat.getCSVTextData()); } @@ -437,7 +451,7 @@ public void testDateWithCSVTextInCSVTextOut() { public void testDateWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01'"); org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); assertEquals(date.toString(), dataFormat.getObjectData()[0].toString()); @@ -447,7 +461,7 @@ public void testDateWithCSVTextInObjectArrayOut() { public void testDateWithObjectArrayInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")).addColumn(new Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); Object[] in = { date, "test" }; dataFormat.setObjectData(in); @@ -458,7 +472,7 @@ public void testDateWithObjectArrayInCSVTextOut() { public void testDateWithObjectArrayInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new Date("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); Object[] in = { date }; dataFormat.setObjectData(in); @@ -471,7 +485,7 @@ public void testDateWithObjectArrayInObjectArrayOut() { public void testDateTimeWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", false, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00'"); assertEquals("'2014-10-01 12:00:00'", dataFormat.getCSVTextData()); @@ -481,7 +495,7 @@ public void testDateTimeWithCSVTextInCSVTextOut() { public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'"); assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getCSVTextData()); } @@ -489,7 +503,7 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() { public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", false, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00'"); // NOTE: string representation will have the T added, it is an // implementation quirk of using JODA @@ -500,7 +514,7 @@ public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() { public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'"); // NOTE: string representation will have the T added, it is an // implementation quirk of using JODA @@ -512,7 +526,7 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() { public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); DateTimeZone zone = DateTimeZone.forID("America/New_York"); org.joda.time.DateTime dateTime = new org.joda.time.DateTime(zone); dataFormat.setCSVTextData(dateTime.toString()); @@ -524,7 +538,7 @@ public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut( public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-3310-01 12:00:00.000'"); dataFormat.getObjectData()[0].toString(); } @@ -533,7 +547,7 @@ public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() { public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); // current date time org.joda.time.DateTime dateTime = new org.joda.time.DateTime(); String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime); @@ -545,7 +559,7 @@ public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOu public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-0400'"); // NOTE: string representation will have the T added, it is an // implementation quirk of using JODA @@ -556,7 +570,7 @@ public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() { public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); DateTimeZone zone = DateTimeZone.forID("America/New_York"); org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 1, zone); Object[] in = { dateTime }; @@ -569,7 +583,7 @@ public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() { public void testLocalDateTimeWithObjectInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, false)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0, 2); Object[] in = { dateTime }; dataFormat.setObjectData(in); @@ -581,7 +595,7 @@ public void testLocalDateTimeWithObjectInCSVTextOut() { public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new DateTime("1", true, true)); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-04:00'"); DateTimeZone zone = DateTimeZone.forID("America/New_York"); org.joda.time.DateTime edateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0, zone); @@ -594,13 +608,11 @@ public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() { // **************test cases for BIT******************* - // **************test cases for BIT******************* - @Test public void testBitTrueFalseWithCSVTextInAndCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); for (String trueBit : new String[] { "true", "TRUE" }) { dataFormat.setCSVTextData(trueBit); @@ -617,7 +629,7 @@ public void testBitTrueFalseWithCSVTextInAndCSVTextOut() { public void testBitWithCSVTextInAndCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("1"); assertEquals("1", dataFormat.getCSVTextData()); dataFormat.setCSVTextData("0"); @@ -628,7 +640,7 @@ public void testBitWithCSVTextInAndCSVTextOut() { public void testBitWithObjectArrayInAndCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")).addColumn(new Bit("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] data = new Object[2]; data[0] = Boolean.TRUE; data[1] = Boolean.FALSE; @@ -640,7 +652,7 @@ public void testBitWithObjectArrayInAndCSVTextOut() { public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")).addColumn(new Bit("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] data = new Object[2]; data[0] = "1"; data[1] = "2"; @@ -652,7 +664,7 @@ public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() { public void testBitWithObjectArrayInAndObjectOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")).addColumn(new Bit("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] data = new Object[2]; data[0] = Boolean.TRUE; data[1] = Boolean.FALSE; @@ -669,7 +681,7 @@ public void testBitWithObjectArrayInAndObjectOut() { public void testBitWithCSVTextInAndObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); for (String trueBit : new String[] { "true", "TRUE", "1" }) { dataFormat.setCSVTextData(trueBit); @@ -686,7 +698,7 @@ public void testBitWithCSVTextInAndObjectArrayOut() { public void testUnsupportedBitWithObjectArrayInAndObjectOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")).addColumn(new Bit("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] data = new Object[2]; data[0] = "1"; data[1] = "2"; @@ -699,7 +711,7 @@ public void testUnsupportedBitWithObjectArrayInAndObjectOut() { public void testUnsupportedBitWithCSVTextInAndObjectOut() { Schema schema = new Schema("test"); schema.addColumn(new Bit("1")).addColumn(new Bit("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("1,3"); assertEquals(true, dataFormat.getObjectData()[0]); assertEquals(false, dataFormat.getObjectData()[1]); @@ -711,7 +723,7 @@ public void testArrayOfStringWithObjectArrayInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { "A", "B" }; // create an array inside the object array Object[] data = new Object[2]; @@ -728,7 +740,7 @@ public void testArrayOfStringWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { "A", "B" }; // create an array inside the object array Object[] data = new Object[2]; @@ -746,7 +758,7 @@ public void testArrayOfStringWithObjectArrayInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { "A", "B" }; // create an array inside the object array Object[] data = new Object[2]; @@ -762,7 +774,7 @@ public void testArrayOfStringWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); String testData = "'[\"A\",\"B\"]','text'"; dataFormat.setCSVTextData(testData); assertEquals(testData, dataFormat.getCSVTextData()); @@ -773,7 +785,7 @@ public void testArrayOfComplexStrings() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { "A''\"ssss", "Bss###''" }; // create an array inside the object array Object[] data = new Object[2]; @@ -790,7 +802,7 @@ public void testArrayOfIntegers() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { 1, 2 }; // create an array inside the object array Object[] data = new Object[2]; @@ -807,7 +819,7 @@ public void testListOfIntegers() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); List givenList = new ArrayList(); givenList.add(1); givenList.add(1); @@ -825,7 +837,7 @@ public void testSetOfIntegers() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Set("1", new FixedPoint("fn"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Set givenSet = new HashSet(); givenSet.add(1); givenSet.add(3); @@ -845,7 +857,7 @@ public void testArrayOfDecimals() { schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new org.apache.sqoop.schema.type.Decimal("deci"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArray = { 1.22, 2.444 }; // create an array inside the object array Object[] data = new Object[2]; @@ -863,7 +875,7 @@ public void testArrayOfObjectsWithObjectArrayInObjectArrayOut() { schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArrayOne = { 11, 12 }; Object[] givenArrayTwo = { 14, 15 }; @@ -889,7 +901,7 @@ public void testArrayOfObjectsWithCSVTextInObjectArrayOut() { schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArrayOne = { 11, 12 }; Object[] givenArrayTwo = { 14, 15 }; @@ -914,7 +926,7 @@ public void testArrayOfObjectsWithCSVTextInCSVTextOut() { schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); String input = "'[[11, 12],[14, 15]]','text'"; dataFormat.setCSVTextData(input); Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0]; @@ -928,7 +940,7 @@ public void testArrayOfObjectsWithObjectArrayInCSVTextOut() { schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Object[] givenArrayOne = { 11, 12 }; Object[] givenArrayTwo = { 14, 15 }; @@ -951,7 +963,7 @@ public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map map = new HashMap(); map.put("testKey", "testValue"); // create an array inside the object array @@ -971,7 +983,7 @@ public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut() schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", new FixedPoint("number")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); List intList = new ArrayList(); intList.add(11); @@ -994,7 +1006,7 @@ public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() { schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", new Text("text")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); List stringList = new ArrayList(); stringList.add("A"); @@ -1017,7 +1029,7 @@ public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() { schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", new Text("text")))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); List stringList = new ArrayList(); stringList.add("A"); @@ -1041,7 +1053,7 @@ public void testMapWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); givenMap.put("testKey", "testValue"); Object[] data = new Object[2]; @@ -1060,7 +1072,7 @@ public void testMapWithComplexValueWithCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); givenMap.put("testKey", "testValue"); Object[] data = new Object[2]; @@ -1079,7 +1091,7 @@ public void testMapWithObjectArrayInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); Map givenMap = new HashMap(); givenMap.put("testKey", "testValue"); Object[] data = new Object[2]; @@ -1095,7 +1107,7 @@ public void testMapWithCSVTextInCSVTextOut() { Schema schema = new Schema("test"); schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); String testData = "'{\"testKey\":\"testValue\"}','text'"; dataFormat.setCSVTextData(testData); assertEquals(testData, dataFormat.getCSVTextData()); @@ -1106,7 +1118,7 @@ public void testEmptySchema() { String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + ",'\\n'"; Schema schema = new Schema("Test"); - dataFormat.setSchema(schema); + dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData(testData); @SuppressWarnings("unused") @@ -1115,13 +1127,14 @@ public void testEmptySchema() { @Test(expectedExceptions = SqoopException.class) public void testNullSchema() { - dataFormat.setSchema(null); + dataFormat = new CSVIntermediateDataFormat(null); @SuppressWarnings("unused") Object[] out = dataFormat.getObjectData(); } @Test(expectedExceptions = SqoopException.class) public void testNotSettingSchema() { + dataFormat = new CSVIntermediateDataFormat(); @SuppressWarnings("unused") Object[] out = dataFormat.getObjectData(); } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index ab2178eb..4945584b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -88,12 +88,12 @@ public static Object[][] data() { Schema from2 = new Schema("FROM-2"); Schema to2 = new Schema("TO-2"); - from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + from1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2")) .addColumn(new org.apache.sqoop.schema.type.Text("3")); - to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + to1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2")) .addColumn(new org.apache.sqoop.schema.type.Text("3")); - from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")); - to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")); + from2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2")); + to2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2")); parameters.add(new Object[]{ emptyFrom, @@ -160,7 +160,6 @@ public void testSchemaMatching() throws Exception { @Test public void testSchemalessFromAndTo() throws UnsupportedEncodingException { - CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat(); String testData = "\"This is the data you are looking for. It has no structure.\""; Object[] testObject = new Object[] {testData.getBytes(BYTE_FIELD_CHARSET)}; Object[] testObjectCopy = new Object[1]; @@ -169,8 +168,7 @@ public void testSchemalessFromAndTo() throws UnsupportedEncodingException { Matcher matcher = MatcherFactory.getMatcher(NullSchema.getInstance(), NullSchema.getInstance()); // Checking FROM side only because currently that is the only IDF that is used - dataFormat.setSchema(matcher.getFromSchema()); - + CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat(matcher.getFromSchema()); // Setting data as CSV and validating getting CSV and object dataFormat.setCSVTextData(testData); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java index 087d7d3b..6a14201c 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java @@ -34,6 +34,8 @@ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Text; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -85,17 +87,18 @@ public void testReadFields() throws IOException { // it existed before. @Test public void testWriteAndReadFields() throws IOException { + Schema schema = new Schema("test").addColumn(new Text("t")); String testData = "You shall not pass"; ByteArrayOutputStream ostream = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(ostream); - SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat()); + SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat(schema)); writableOne.setString(testData); writableOne.write(out); byte[] written = ostream.toByteArray(); // Don't test what the data is, test that SqoopWritable can read it. InputStream instream = new ByteArrayInputStream(written); - SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat()); + SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat(schema)); DataInput in = new DataInputStream(instream); writableTwo.readFields(in); assertEquals(writableOne.toString(), writableTwo.toString()); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 7ddaa10b..41ea24aa 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -39,6 +39,8 @@ import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.util.MRJobTestUtil; import org.apache.sqoop.schema.NullSchema; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Text; import org.apache.sqoop.submission.counter.SqoopCounters; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -145,8 +147,12 @@ private Matcher getMatcher(){ } // TODO:SQOOP-1873: Mock objects instead - private IntermediateDataFormat getIDF(){ - return new CSVIntermediateDataFormat(); + private IntermediateDataFormat getIDF() { + return new CSVIntermediateDataFormat(getSchema()); + } + + private Schema getSchema() { + return new Schema("test").addColumn(new Text("t")); } @BeforeMethod diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java index d4988509..ce39a786 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java @@ -70,7 +70,7 @@ public static boolean runJob(Configuration conf, public static Schema getTestSchema() { Schema schema = new Schema("Test"); - schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + schema.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2")) .addColumn(new Text("3")); return schema; }