5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 01:59:09 +08:00

SQOOP-1956: Sqoop2: Cleanup IDF implementations

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-01-09 11:16:05 -08:00
parent aeab9150b2
commit 89dcbe879e
13 changed files with 272 additions and 281 deletions

View File

@ -49,7 +49,7 @@ public static Column sqlTypeToSchemaType(int sqlType, String columnName) {
case Types.SMALLINT:
case Types.TINYINT:
case Types.INTEGER:
return new FixedPoint(columnName);
return new FixedPoint(columnName).setByteSize(2L);
case Types.CLOB:
case Types.VARCHAR:

View File

@ -102,7 +102,7 @@ public void setUp() {
*/
public Schema getSchema(String name) {
return new Schema(name)
.addColumn(new FixedPoint("ICOL"))
.addColumn(new FixedPoint("ICOL").setByteSize(2L))
.addColumn(new FloatingPoint("DCOL"))
.addColumn(new Text("VCOL"))
;

View File

@ -94,7 +94,7 @@ public class SqoopIDFUtils {
// ******** Number Column Type utils***********
public static String encodeToCSVFixedPoint(Object obj, Column column) {
public static String toCSVFixedPoint(Object obj, Column column) {
Long byteSize = ((FixedPoint) column).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
return ((Integer) obj).toString();
@ -114,7 +114,7 @@ public static Object toFixedPoint(String csvString, Column column) {
return returnValue;
}
public static String encodeToCSVFloatingPoint(Object obj, Column column) {
public static String toCSVFloatingPoint(Object obj, Column column) {
Long byteSize = ((FloatingPoint) column).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
return ((Float) obj).toString();
@ -134,7 +134,7 @@ public static Object toFloatingPoint(String csvString, Column column) {
return returnValue;
}
public static String encodeToCSVDecimal(Object obj) {
public static String toCSVDecimal(Object obj) {
return ((BigDecimal) obj).toString();
}
@ -143,7 +143,7 @@ public static Object toDecimal(String csvString, Column column) {
}
// ********** BIT Column Type utils******************
public static String encodeToCSVBit(Object obj) {
public static String toCSVBit(Object obj) {
String bitStringValue = obj.toString();
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
return bitStringValue;
@ -164,16 +164,16 @@ public static Object toBit(String csvString) {
// *********** DATE and TIME Column Type utils **********
public static String encodeToCSVDate(Object obj) {
public static String toCSVDate(Object obj) {
org.joda.time.LocalDate date = (org.joda.time.LocalDate) obj;
return encloseWithQuote(df.print(date));
return encloseWithQuotes(df.print(date));
}
public static String encodeToCSVTime(Object obj, Column col) {
public static String toCSVTime(Object obj, Column col) {
if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
return encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) obj));
return encloseWithQuotes(tfWithFraction.print((org.joda.time.LocalTime) obj));
} else {
return encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
return encloseWithQuotes(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
}
}
@ -187,27 +187,27 @@ public static Object toTime(String csvString, Column column) {
// *********** DATE TIME Column Type utils **********
public static String encodeToCSVLocalDateTime(Object obj, Column col) {
public static String toCSVLocalDateTime(Object obj, Column col) {
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction()) {
return encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
return encloseWithQuotes(dtfWithFractionNoTimeZone.print(localDateTime));
} else {
return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(localDateTime));
}
}
public static String encodeToCSVDateTime(Object obj, Column col) {
public static String toCSVDateTime(Object obj, Column col) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction() && column.hasTimezone()) {
return encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
return encloseWithQuotes(dtfWithFractionAndTimeZone.print(dateTime));
} else if (column.hasFraction() && !column.hasTimezone()) {
return encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
return encloseWithQuotes(dtfWithFractionNoTimeZone.print(dateTime));
} else if (column.hasTimezone()) {
return encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
return encloseWithQuotes(dtfWithNoFractionWithTimeZone.print(dateTime));
} else {
return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(dateTime));
}
}
@ -256,10 +256,10 @@ public static Long toDateTimeInMillis(String csvString, Column column) {
// ************ MAP Column Type utils*********
@SuppressWarnings("unchecked")
public static String encodeToCSVMap(Map<Object, Object> map, Column column) {
public static String toCSVMap(Map<Object, Object> map, Column column) {
JSONObject object = new JSONObject();
object.putAll(map);
return encloseWithQuote(object.toJSONString());
return encloseWithQuotes(object.toJSONString());
}
public static Map<Object, Object> toMap(String csvString) {
@ -314,7 +314,7 @@ else if (value instanceof JSONObject) {
// ************ LIST Column Type utils*********
@SuppressWarnings("unchecked")
public static String encodeToCSVList(Object[] list, Column column) {
public static String toCSVList(Object[] list, Column column) {
List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType();
@ -332,7 +332,7 @@ public static String encodeToCSVList(Object[] list, Column column) {
}
JSONArray array = new JSONArray();
array.addAll(elementList);
return encloseWithQuote(array.toJSONString());
return encloseWithQuotes(array.toJSONString());
}
public static Object[] toList(String csvString) {
@ -397,7 +397,7 @@ private static String getRegExp(String string) {
return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
}
public static String encodeToCSVString(String string) {
public static String toCSVString(String string) {
int j = 0;
String replacement = string;
try {
@ -408,7 +408,7 @@ public static String encodeToCSVString(String string) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
+ " " + String.valueOf(j) + " " + e.getMessage());
}
return encloseWithQuote(replacement);
return encloseWithQuotes(replacement);
}
public static String toText(String string) {
@ -429,10 +429,10 @@ public static String toText(String string) {
// ************ BINARY Column type utils*********
public static String encodeToCSVByteArray(Object obj) {
public static String toCSVByteArray(Object obj) {
byte[] bytes = (byte[]) obj;
try {
return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
return toCSVString(new String(bytes, BYTE_FIELD_CHARSET));
} catch (UnsupportedEncodingException e) {
// We should never hit this case.
// This character set should be distributed with Java.
@ -455,7 +455,7 @@ public static byte[] toByteArray(String csvString) {
// *********** SQOOP CSV standard encoding utils********************
public static String encloseWithQuote(String string) {
public static String encloseWithQuotes(String string) {
StringBuilder builder = new StringBuilder();
builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
return builder.toString();

View File

@ -302,47 +302,47 @@ private String toCSV(GenericRecord record) {
case ARRAY:
case SET:
List<Object> objList = (List<Object>) obj;
csvString.append(encodeToCSVList(toObjectArray(objList), cols[i]));
csvString.append(toCSVList(toObjectArray(objList), cols[i]));
break;
case MAP:
Map<Object, Object> objMap = (Map<Object, Object>) obj;
csvString.append(encodeToCSVMap(objMap, cols[i]));
csvString.append(toCSVMap(objMap, cols[i]));
break;
case ENUM:
case TEXT:
csvString.append(encodeToCSVString(obj.toString()));
csvString.append(toCSVString(obj.toString()));
break;
case BINARY:
case UNKNOWN:
csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj)));
csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
break;
case FIXED_POINT:
csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
csvString.append(toCSVFixedPoint(obj, cols[i]));
break;
case FLOATING_POINT:
csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
csvString.append(toCSVFloatingPoint(obj, cols[i]));
break;
case DECIMAL:
// stored as string
csvString.append(encodeToCSVDecimal(obj));
csvString.append(toCSVDecimal(obj));
break;
case DATE:
// stored as long
Long dateInMillis = (Long) obj;
csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis)));
csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
break;
case TIME:
// stored as long
Long timeInMillis = (Long) obj;
csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
break;
case DATE_TIME:
// stored as long
Long dateTimeInMillis = (Long) obj;
csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
break;
case BIT:
csvString.append(encodeToCSVBit(obj));
csvString.append(toCSVBit(obj));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,

View File

@ -20,45 +20,39 @@
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
import org.json.simple.JSONValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* A concrete implementation for the {@link #IntermediateDataFormat} that
* represents each row of the data source as a comma separates list. Each
* element in the CSV represents a specific column value encoded as string using the sqoop specified rules.
* The methods allow serializing to this string and deserializing the string to its
* corresponding java object based on the {@link #Schema} and its
* {@link #Column} types.
* element in the CSV represents a specific column value encoded as string using
* the sqoop specified rules. The methods allow serializing to this string and
* deserializing the string to its corresponding java object based on the
* {@link #Schema} and its {@link #Column} types.
*
*/
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
// need this default constructor for reflection magic used in execution engine
public CSVIntermediateDataFormat() {
}
public CSVIntermediateDataFormat(Schema schema) {
setSchema(schema);
super.setSchema(schema);
}
/**
* {@inheritDoc}
*/
@ -80,11 +74,7 @@ public void setCSVTextData(String csvText) {
*/
@Override
public Object[] getObjectData() {
if (schema == null || schema.isEmpty()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
}
// fieldStringArray represents the csv fields parsed into string array
super.validateSchema(schema);
String[] csvStringArray = parseCSVString(this.data);
if (csvStringArray == null) {
@ -92,14 +82,13 @@ public Object[] getObjectData() {
}
if (csvStringArray.length != schema.getColumnsArray().length) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + getCSVTextData()
+ " has the wrong number of fields.");
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
"The data " + getCSVTextData() + " has the wrong number of fields.");
}
Object[] objectArray = new Object[csvStringArray.length];
Column[] columnArray = schema.getColumnsArray();
for (int i = 0; i < csvStringArray.length; i++) {
// check for NULL field and bail out immediately
if (csvStringArray[i].equals(NULL_VALUE)) {
objectArray[i] = null;
continue;
@ -109,7 +98,6 @@ public Object[] getObjectData() {
return objectArray;
}
private Object toObject(String csvString, Column column) {
Object returnValue = null;
@ -163,18 +151,9 @@ private Object toObject(String csvString, Column column) {
*/
@Override
public void setObjectData(Object[] data) {
Set<Integer> nullValueIndices = new HashSet<Integer>();
Column[] columnArray = schema.getColumnsArray();
// check for null
for (int i = 0; i < data.length; i++) {
if (data[i] == null) {
nullValueIndices.add(i);
data[i] = NULL_VALUE;
}
}
// ignore the null values while encoding the object array into csv string
encodeToCSVText(data, columnArray, nullValueIndices);
this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
super.validateSchema(schema);
// convert object array to csv text
this.data = toCSV(data);
}
@ -200,60 +179,75 @@ public void read(DataInput in) throws IOException {
* array
*
* @param objectArray
* @param columnArray
* @param nullValueIndices
*/
@SuppressWarnings("unchecked")
private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
for (int i : bitTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVBit(objectArray[i]);
}
}
for (int i : stringTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVString((String) objectArray[i]);
}
}
for (int i : dateTimeTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
Column col = columnArray[i];
if (objectArray[i] instanceof org.joda.time.DateTime) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
// check for fraction and time zone and then use the right formatter
objectArray[i] = encodeToCSVDateTime(dateTime, col);
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
objectArray[i] = encodeToCSVLocalDateTime(localDateTime, col);
private String toCSV(Object[] objectArray) {
Column[] columnArray = schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
for (int i = 0; i < columnArray.length; i++) {
Object obj = objectArray[i];
if (obj == null) {
csvString.append(NULL_VALUE);
} else {
switch (columnArray[i].getType()) {
case ARRAY:
case SET:
csvString.append(toCSVList((Object[]) obj, (AbstractComplexListType) columnArray[i]));
break;
case MAP:
csvString.append(toCSVMap((Map<Object, Object>) obj, columnArray[i]));
break;
case ENUM:
case TEXT:
csvString.append(toCSVString(obj.toString()));
break;
case BINARY:
case UNKNOWN:
csvString.append(toCSVByteArray((byte[]) obj));
break;
case FIXED_POINT:
csvString.append(toCSVFixedPoint(obj, columnArray[i]));
break;
case FLOATING_POINT:
csvString.append(toCSVFloatingPoint(obj, columnArray[i]));
break;
case DECIMAL:
csvString.append(toCSVDecimal(obj));
break;
// stored in JSON as strings in the joda time format
case DATE:
csvString.append(toCSVDate(obj));
break;
case TIME:
csvString.append(toCSVTime(obj, columnArray[i]));
break;
case DATE_TIME:
if (objectArray[i] instanceof org.joda.time.DateTime) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
// check for fraction and time zone and then use the right formatter
csvString.append(toCSVDateTime(dateTime, columnArray[i]));
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
csvString.append(toCSVLocalDateTime(localDateTime, columnArray[i]));
}
break;
case BIT:
csvString.append(toCSVBit(obj));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
"Column type from schema was not recognized for " + columnArray[i].getType());
}
}
}
for (int i : dateTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVDate(objectArray[i]);
}
}
for (int i : timeTypeColumnIndices) {
Column col = columnArray[i];
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVTime(objectArray[i], col);
}
}
for (int i : byteTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]);
}
}
for (int i : listTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType) columnArray[i]);
}
}
for (int i : mapTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeToCSVMap((Map<Object, Object>) objectArray[i], columnArray[i]);
if (i < columnArray.length - 1) {
csvString.append(CSV_SEPARATOR_CHARACTER);
}
}
return csvString.toString();
}
/**
@ -261,15 +255,6 @@ private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Int
*/
@Override
public Set<String> getJars() {
Set<String> jars = super.getJars();
// Add JODA classes for IDF date/time handling
jars.add(ClassUtils.jarForClass(LocalDate.class));
jars.add(ClassUtils.jarForClass(LocalDateTime.class));
jars.add(ClassUtils.jarForClass(DateTime.class));
jars.add(ClassUtils.jarForClass(LocalTime.class));
// Add JSON parsing jar
jars.add(ClassUtils.jarForClass(JSONValue.class));
return jars;
return super.getJars();
}
}
}

View File

@ -18,12 +18,14 @@
*/
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
import org.apache.sqoop.utils.ClassUtils;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
import org.json.simple.JSONValue;
import java.io.DataInput;
import java.io.DataOutput;
@ -51,16 +53,6 @@ public abstract class IntermediateDataFormat<T> {
protected Schema schema;
protected final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
protected final Set<Integer> timeTypeColumnIndices = new HashSet<Integer>();
/**
* Get one row of data.
*
@ -134,32 +126,13 @@ public void setData(T obj) {
* - the schema used for serializing/de-serializing data
*/
public void setSchema(Schema schema) {
if (schema == null) {
// TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
return;
}
validateSchema(schema);
this.schema = schema;
Column[] columns = schema.getColumnsArray();
int i = 0;
for (Column col : columns) {
if (isColumnStringType(col)) {
stringTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.BIT) {
bitTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.DATE) {
dateTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.TIME) {
timeTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.DATE_TIME) {
dateTimeTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.BINARY) {
byteTypeColumnIndices.add(i);
} else if (isColumnListType(col)) {
listTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.MAP) {
mapTypeColumnIndices.add(i);
}
i++;
}
protected void validateSchema(Schema schema) {
if (schema == null) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
}
}
@ -186,7 +159,15 @@ public void setSchema(Schema schema) {
* @return set of jars
*/
public Set<String> getJars() {
return new HashSet<String>();
Set<String> jars = new HashSet<String>();
// Add JODA classes for IDF date/time handling
jars.add(ClassUtils.jarForClass(LocalDate.class));
jars.add(ClassUtils.jarForClass(LocalDateTime.class));
jars.add(ClassUtils.jarForClass(DateTime.class));
jars.add(ClassUtils.jarForClass(LocalTime.class));
// Add JSON parsing jar
jars.add(ClassUtils.jarForClass(JSONValue.class));
return jars;
}
@Override

View File

@ -41,6 +41,10 @@
*/
public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObject> {
// need this default constructor for reflection magic used in execution engine
public JSONIntermediateDataFormat() {
}
// We need schema at all times
public JSONIntermediateDataFormat(Schema schema) {
setSchema(schema);
@ -110,6 +114,7 @@ public Set<String> getJars() {
Set<String> jars = super.getJars();
jars.add(ClassUtils.jarForClass(JSONObject.class));
jars.add(ClassUtils.jarForClass(JSONArray.class));
return jars;
}
@ -241,16 +246,16 @@ private JSONObject toJSON(Object[] data) {
// stored in JSON as the same format as csv strings in the joda time
// format
case DATE_TIME:
object.put(cols[i].getName(), removeQuotes(encodeToCSVDateTime(data[i], cols[i])));
object.put(cols[i].getName(), removeQuotes(toCSVDateTime(data[i], cols[i])));
break;
case TIME:
object.put(cols[i].getName(), removeQuotes(encodeToCSVTime(data[i], cols[i])));
object.put(cols[i].getName(), removeQuotes(toCSVTime(data[i], cols[i])));
break;
case DATE:
object.put(cols[i].getName(), removeQuotes(encodeToCSVDate(data[i])));
object.put(cols[i].getName(), removeQuotes(toCSVDate(data[i])));
break;
case BIT:
object.put(cols[i].getName(), Boolean.valueOf(encodeToCSVBit(data[i])));
object.put(cols[i].getName(), Boolean.valueOf(toCSVBit(data[i])));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
@ -278,40 +283,40 @@ private String toCSV(JSONObject json) {
case SET:
// stored as JSON array
JSONArray array = (JSONArray) obj;
csvString.append(encloseWithQuote(array.toJSONString()));
csvString.append(encloseWithQuotes(array.toJSONString()));
break;
case MAP:
// stored as JSON object
csvString.append(encloseWithQuote((((JSONObject) obj).toJSONString())));
csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString())));
break;
case ENUM:
case TEXT:
csvString.append(encodeToCSVString(obj.toString()));
csvString.append(toCSVString(obj.toString()));
break;
case BINARY:
case UNKNOWN:
csvString.append(encodeToCSVByteArray(Base64.decodeBase64(obj.toString())));
csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString())));
break;
case FIXED_POINT:
csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
csvString.append(toCSVFixedPoint(obj, cols[i]));
break;
case FLOATING_POINT:
csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
csvString.append(toCSVFloatingPoint(obj, cols[i]));
break;
case DECIMAL:
csvString.append(encodeToCSVDecimal(obj));
csvString.append(toCSVDecimal(obj));
break;
// stored in JSON as strings in the joda time format
case DATE:
case TIME:
case DATE_TIME:
csvString.append(encloseWithQuote(obj.toString()));
csvString.append(encloseWithQuotes(obj.toString()));
break;
// 0/1 will be stored as they are in JSON, even though valid values in
// JSON
// are true/false
case BIT:
csvString.append(encodeToCSVBit(obj));
csvString.append(toCSVBit(obj));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,

View File

@ -45,7 +45,7 @@ public static String getByteFieldString(byte[] byteFieldData) {
@Test
public void testEncloseStringWithQuotes() {
String test = "test";
String quotedText = encloseWithQuote(test);
String quotedText = encloseWithQuotes(test);
assertEquals(quotedText, "'test'");
}
@ -53,7 +53,7 @@ public void testEncloseStringWithQuotes() {
@Test
public void testStringWithQuotesToEncloseStringWithQuotes() {
String test = "'test'";
String quotedText = encloseWithQuote(test);
String quotedText = encloseWithQuotes(test);
assertEquals(quotedText, "''test''");
}
@ -82,28 +82,28 @@ public void testStingWithNoQuotesRemoveQuotes() {
@Test
public void testExample1EncodeToCSVString() {
String test = "test";
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'test'");
}
@Test
public void testExample2EncodeToCSVString() {
String test = "test,test1";
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'test,test1'");
}
@Test
public void testExample3EncodeToCSVString() {
String test = "test,'test1";
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'test,\\'test1'");
}
@Test
public void testExample4EncodeToCSVString() {
String test = "test,\"test1";
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'test,\\\"test1'");
}
@ -117,7 +117,7 @@ public void testExample4ToString() {
public void testExample5EncodeToCSVString() {
String test = new String(new char[] { 0x0A });
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'\\n'");
}
@ -130,7 +130,7 @@ public void testExample5ToString() {
public void testExample6EncodeToCSVString() {
String test = new String(new char[] { 0x0D });
String encodedText = encodeToCSVString(test);
String encodedText = toCSVString(test);
assertEquals(encodedText, "'\\r'");
}
@ -138,7 +138,7 @@ public void testExample6EncodeToCSVString() {
public void testEncodeByteToCSVString() {
// byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 };
String encodedText = encodeToCSVByteArray(bytes);
String encodedText = toCSVByteArray(bytes);
String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r");
assertEquals(encodedText, expectedText);
}
@ -149,7 +149,7 @@ public void testEncodeArrayIntegersToCSVString() {
list.add(1);
list.add(2);
AbstractComplexListType array = new Array("a", new Text("t"));
String encodedText = encodeToCSVList(list.toArray(), array);
String encodedText = toCSVList(list.toArray(), array);
assertEquals(encodedText, "'[1,2]'");
}
@ -159,7 +159,7 @@ public void testEncodeArrayStringsToCSVString() {
list.add("A");
list.add("B");
AbstractComplexListType array = new Array("a", new Text("t"));
String encodedText = encodeToCSVList(list.toArray(), array);
String encodedText = toCSVList(list.toArray(), array);
assertEquals(encodedText, "'[\"A\",\"B\"]'");
}
@ -172,7 +172,7 @@ public void testEncodeMapToCSVString() {
map.put("A", list);
org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text(
"tr")));
String encodedText = encodeToCSVMap(map, mapCol);
String encodedText = toCSVMap(map, mapCol);
assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'");
}

View File

@ -55,7 +55,6 @@ public class TestCSVIntermediateDataFormat {
@BeforeMethod
public void setUp() {
dataFormat = new CSVIntermediateDataFormat();
}
@ -70,7 +69,7 @@ public void testNullInputAsCSVTextInObjectArrayOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(null);
Object[] out = dataFormat.getObjectData();
assertNull(out);
@ -81,7 +80,7 @@ public void testEmptyInputAsCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4"))
.addColumn(new Binary("5")).addColumn(new Text("6"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("");
dataFormat.getObjectData();
}
@ -98,12 +97,13 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() {
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
dataFormat.setObjectData(in);
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
assertEquals(14, textValues.length);
for (String text : textValues) {
assertEquals(text, NULL_VALUE);
}
@ -121,11 +121,12 @@ public void testNullValueAsObjectArrayInAndObjectArrayOut() {
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
dataFormat.setObjectData(in);
Object[] out = dataFormat.getObjectData();
assertEquals(14, out.length);
for (Object obj : out) {
assertEquals(obj, null);
}
@ -143,7 +144,7 @@ public void testNullValueAsCSVTextInAndObjectArrayOut() {
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
"NULL" };
dataFormat.setCSVTextData(StringUtils.join(test, ","));
@ -166,7 +167,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
"NULL" };
dataFormat.setCSVTextData(StringUtils.join(test, ","));
@ -182,17 +183,30 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
@Test
public void testInputAsCSVTextInCSVTextOut() {
String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'" + String.valueOf(0x0A) + "'";
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5"))
.addColumn(new Binary("6")).addColumn(new Text("7"));
String testData = "'ENUM',10,34,'54','random data',"
+ getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+ "'";
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(testData);
assertEquals(testData, dataFormat.getCSVTextData());
}
@Test
public void testInputAsCSVTextInAndDataOut() {
String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'" + String.valueOf(0x0A) + "'";
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5"))
.addColumn(new Binary("6")).addColumn(new Text("7"));
String testData = "'ENUM',10,34,'54','random data',"
+ getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+ "'";
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(testData);
assertEquals(testData, dataFormat.getData());
}
@ -204,7 +218,7 @@ public void testInputAsCSVTextInObjectOutWithSingleColumn() {
Schema schema = new Schema("test");
schema.addColumn(new Text("text"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(testData);
Object[] out = dataFormat.getObjectData();
@ -226,7 +240,7 @@ public void testInputAsCSVTextInObjectOut() {
.addColumn(new Text("6"))
.addColumn(new org.apache.sqoop.schema.type.Enum("7"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(testData);
Object[] out = dataFormat.getObjectData();
@ -251,7 +265,7 @@ public void testInputAsObjectArayInCSVTextOrDataOut() {
.addColumn(new Binary("5"))
.addColumn(new Text("6"))
.addColumn(new org.apache.sqoop.schema.type.Enum("7"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[7];
@ -285,7 +299,7 @@ public void testObjectArrayInObjectArrayOut() {
.addColumn(new Text("6"))
.addColumn(new org.apache.sqoop.schema.type.Enum("7"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] in = new Object[7];
in[0] = new Long(10);
@ -315,7 +329,7 @@ public void testObjectArrayWithNullInCSVTextOut() {
.addColumn(new Text("6"))
.addColumn(new org.apache.sqoop.schema.type.Enum("7"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[7];
@ -340,7 +354,7 @@ public void testStringFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
@ -363,7 +377,7 @@ public void testStringFullRangeOfCharacters() {
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
byte[] allCharByteArr = new byte[256];
for (int i = 0; i < allCharByteArr.length; ++i) {
@ -385,7 +399,7 @@ public void testByteArrayFullRangeOfCharacters() {
public void testTimeWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Time("1", false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'12:00:00'");
assertEquals("'12:00:00'", dataFormat.getCSVTextData());
}
@ -394,7 +408,7 @@ public void testTimeWithCSVTextInCSVTextOut() {
public void testTimeWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new Time("1", false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'12:59:59'");
org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
assertEquals(time.toString(), dataFormat.getObjectData()[0].toString());
@ -404,7 +418,7 @@ public void testTimeWithCSVTextInObjectArrayOut() {
public void testTimeWithObjectArrayInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Time("1", true)).addColumn(new Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
Object[] in = { time, "test" };
dataFormat.setObjectData(in);
@ -415,7 +429,7 @@ public void testTimeWithObjectArrayInCSVTextOut() {
public void testTimeWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new Time("1", true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
org.joda.time.LocalTime time = new org.joda.time.LocalTime(2, 23, 33);
Object[] in = { time };
dataFormat.setObjectData(in);
@ -428,7 +442,7 @@ public void testTimeWithObjectArrayInObjectArrayOut() {
public void testDateWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Date("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01'");
assertEquals("'2014-10-01'", dataFormat.getCSVTextData());
}
@ -437,7 +451,7 @@ public void testDateWithCSVTextInCSVTextOut() {
public void testDateWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new Date("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01'");
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
assertEquals(date.toString(), dataFormat.getObjectData()[0].toString());
@ -447,7 +461,7 @@ public void testDateWithCSVTextInObjectArrayOut() {
public void testDateWithObjectArrayInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Date("1")).addColumn(new Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
Object[] in = { date, "test" };
dataFormat.setObjectData(in);
@ -458,7 +472,7 @@ public void testDateWithObjectArrayInCSVTextOut() {
public void testDateWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new Date("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
Object[] in = { date };
dataFormat.setObjectData(in);
@ -471,7 +485,7 @@ public void testDateWithObjectArrayInObjectArrayOut() {
public void testDateTimeWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", false, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
assertEquals("'2014-10-01 12:00:00'", dataFormat.getCSVTextData());
@ -481,7 +495,7 @@ public void testDateTimeWithCSVTextInCSVTextOut() {
public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getCSVTextData());
}
@ -489,7 +503,7 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() {
public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", false, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
// NOTE: string representation will have the T added, it is an
// implementation quirk of using JODA
@ -500,7 +514,7 @@ public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() {
public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
// NOTE: string representation will have the T added, it is an
// implementation quirk of using JODA
@ -512,7 +526,7 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
DateTimeZone zone = DateTimeZone.forID("America/New_York");
org.joda.time.DateTime dateTime = new org.joda.time.DateTime(zone);
dataFormat.setCSVTextData(dateTime.toString());
@ -524,7 +538,7 @@ public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut(
public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-3310-01 12:00:00.000'");
dataFormat.getObjectData()[0].toString();
}
@ -533,7 +547,7 @@ public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() {
public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
// current date time
org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime);
@ -545,7 +559,7 @@ public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOu
public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-0400'");
// NOTE: string representation will have the T added, it is an
// implementation quirk of using JODA
@ -556,7 +570,7 @@ public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() {
public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
DateTimeZone zone = DateTimeZone.forID("America/New_York");
org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 1, zone);
Object[] in = { dateTime };
@ -569,7 +583,7 @@ public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() {
public void testLocalDateTimeWithObjectInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, false));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0, 2);
Object[] in = { dateTime };
dataFormat.setObjectData(in);
@ -581,7 +595,7 @@ public void testLocalDateTimeWithObjectInCSVTextOut() {
public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1", true, true));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-04:00'");
DateTimeZone zone = DateTimeZone.forID("America/New_York");
org.joda.time.DateTime edateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0, zone);
@ -594,13 +608,11 @@ public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() {
// **************test cases for BIT*******************
// **************test cases for BIT*******************
@Test
public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
for (String trueBit : new String[] { "true", "TRUE" }) {
dataFormat.setCSVTextData(trueBit);
@ -617,7 +629,7 @@ public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
public void testBitWithCSVTextInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("1");
assertEquals("1", dataFormat.getCSVTextData());
dataFormat.setCSVTextData("0");
@ -628,7 +640,7 @@ public void testBitWithCSVTextInAndCSVTextOut() {
public void testBitWithObjectArrayInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] data = new Object[2];
data[0] = Boolean.TRUE;
data[1] = Boolean.FALSE;
@ -640,7 +652,7 @@ public void testBitWithObjectArrayInAndCSVTextOut() {
public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] data = new Object[2];
data[0] = "1";
data[1] = "2";
@ -652,7 +664,7 @@ public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() {
public void testBitWithObjectArrayInAndObjectOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] data = new Object[2];
data[0] = Boolean.TRUE;
data[1] = Boolean.FALSE;
@ -669,7 +681,7 @@ public void testBitWithObjectArrayInAndObjectOut() {
public void testBitWithCSVTextInAndObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
for (String trueBit : new String[] { "true", "TRUE", "1" }) {
dataFormat.setCSVTextData(trueBit);
@ -686,7 +698,7 @@ public void testBitWithCSVTextInAndObjectArrayOut() {
public void testUnsupportedBitWithObjectArrayInAndObjectOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] data = new Object[2];
data[0] = "1";
data[1] = "2";
@ -699,7 +711,7 @@ public void testUnsupportedBitWithObjectArrayInAndObjectOut() {
public void testUnsupportedBitWithCSVTextInAndObjectOut() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData("1,3");
assertEquals(true, dataFormat.getObjectData()[0]);
assertEquals(false, dataFormat.getObjectData()[1]);
@ -711,7 +723,7 @@ public void testArrayOfStringWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { "A", "B" };
// create an array inside the object array
Object[] data = new Object[2];
@ -728,7 +740,7 @@ public void testArrayOfStringWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { "A", "B" };
// create an array inside the object array
Object[] data = new Object[2];
@ -746,7 +758,7 @@ public void testArrayOfStringWithObjectArrayInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { "A", "B" };
// create an array inside the object array
Object[] data = new Object[2];
@ -762,7 +774,7 @@ public void testArrayOfStringWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
String testData = "'[\"A\",\"B\"]','text'";
dataFormat.setCSVTextData(testData);
assertEquals(testData, dataFormat.getCSVTextData());
@ -773,7 +785,7 @@ public void testArrayOfComplexStrings() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { "A''\"ssss", "Bss###''" };
// create an array inside the object array
Object[] data = new Object[2];
@ -790,7 +802,7 @@ public void testArrayOfIntegers() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { 1, 2 };
// create an array inside the object array
Object[] data = new Object[2];
@ -807,7 +819,7 @@ public void testListOfIntegers() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
List<Integer> givenList = new ArrayList<Integer>();
givenList.add(1);
givenList.add(1);
@ -825,7 +837,7 @@ public void testSetOfIntegers() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Set("1", new FixedPoint("fn")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Set<Integer> givenSet = new HashSet<Integer>();
givenSet.add(1);
givenSet.add(3);
@ -845,7 +857,7 @@ public void testArrayOfDecimals() {
schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
new org.apache.sqoop.schema.type.Decimal("deci")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArray = { 1.22, 2.444 };
// create an array inside the object array
Object[] data = new Object[2];
@ -863,7 +875,7 @@ public void testArrayOfObjectsWithObjectArrayInObjectArrayOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArrayOne = { 11, 12 };
Object[] givenArrayTwo = { 14, 15 };
@ -889,7 +901,7 @@ public void testArrayOfObjectsWithCSVTextInObjectArrayOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArrayOne = { 11, 12 };
Object[] givenArrayTwo = { 14, 15 };
@ -914,7 +926,7 @@ public void testArrayOfObjectsWithCSVTextInCSVTextOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
String input = "'[[11, 12],[14, 15]]','text'";
dataFormat.setCSVTextData(input);
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
@ -928,7 +940,7 @@ public void testArrayOfObjectsWithObjectArrayInCSVTextOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Object[] givenArrayOne = { 11, 12 };
Object[] givenArrayTwo = { 14, 15 };
@ -951,7 +963,7 @@ public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("testKey", "testValue");
// create an array inside the object array
@ -971,7 +983,7 @@ public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut()
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new FixedPoint("number"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<Integer> intList = new ArrayList<Integer>();
intList.add(11);
@ -994,7 +1006,7 @@ public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new Text("text"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<String> stringList = new ArrayList<String>();
stringList.add("A");
@ -1017,7 +1029,7 @@ public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() {
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new Text("text"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<String> stringList = new ArrayList<String>();
stringList.add("A");
@ -1041,7 +1053,7 @@ public void testMapWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
@ -1060,7 +1072,7 @@ public void testMapWithComplexValueWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
@ -1079,7 +1091,7 @@ public void testMapWithObjectArrayInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
@ -1095,7 +1107,7 @@ public void testMapWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
String testData = "'{\"testKey\":\"testValue\"}','text'";
dataFormat.setCSVTextData(testData);
assertEquals(testData, dataFormat.getCSVTextData());
@ -1106,7 +1118,7 @@ public void testEmptySchema() {
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'\\n'";
Schema schema = new Schema("Test");
dataFormat.setSchema(schema);
dataFormat = new CSVIntermediateDataFormat(schema);
dataFormat.setCSVTextData(testData);
@SuppressWarnings("unused")
@ -1115,13 +1127,14 @@ public void testEmptySchema() {
@Test(expectedExceptions = SqoopException.class)
public void testNullSchema() {
dataFormat.setSchema(null);
dataFormat = new CSVIntermediateDataFormat(null);
@SuppressWarnings("unused")
Object[] out = dataFormat.getObjectData();
}
@Test(expectedExceptions = SqoopException.class)
public void testNotSettingSchema() {
dataFormat = new CSVIntermediateDataFormat();
@SuppressWarnings("unused")
Object[] out = dataFormat.getObjectData();
}

View File

@ -88,12 +88,12 @@ public static Object[][] data() {
Schema from2 = new Schema("FROM-2");
Schema to2 = new Schema("TO-2");
from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
from1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
to1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
from2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"));
to2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"));
parameters.add(new Object[]{
emptyFrom,
@ -160,7 +160,6 @@ public void testSchemaMatching() throws Exception {
@Test
public void testSchemalessFromAndTo() throws UnsupportedEncodingException {
CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat();
String testData = "\"This is the data you are looking for. It has no structure.\"";
Object[] testObject = new Object[] {testData.getBytes(BYTE_FIELD_CHARSET)};
Object[] testObjectCopy = new Object[1];
@ -169,8 +168,7 @@ public void testSchemalessFromAndTo() throws UnsupportedEncodingException {
Matcher matcher = MatcherFactory.getMatcher(NullSchema.getInstance(),
NullSchema.getInstance());
// Checking FROM side only because currently that is the only IDF that is used
dataFormat.setSchema(matcher.getFromSchema());
CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat(matcher.getFromSchema());
// Setting data as CSV and validating getting CSV and object
dataFormat.setCSVTextData(testData);

View File

@ -34,6 +34,8 @@
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Text;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -85,17 +87,18 @@ public void testReadFields() throws IOException {
// it existed before.
@Test
public void testWriteAndReadFields() throws IOException {
Schema schema = new Schema("test").addColumn(new Text("t"));
String testData = "You shall not pass";
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(ostream);
SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat());
SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat(schema));
writableOne.setString(testData);
writableOne.write(out);
byte[] written = ostream.toByteArray();
// Don't test what the data is, test that SqoopWritable can read it.
InputStream instream = new ByteArrayInputStream(written);
SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat());
SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat(schema));
DataInput in = new DataInputStream(instream);
writableTwo.readFields(in);
assertEquals(writableOne.toString(), writableTwo.toString());

View File

@ -39,6 +39,8 @@
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.util.MRJobTestUtil;
import org.apache.sqoop.schema.NullSchema;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@ -145,8 +147,12 @@ private Matcher getMatcher(){
}
// TODO:SQOOP-1873: Mock objects instead
private IntermediateDataFormat<?> getIDF(){
return new CSVIntermediateDataFormat();
private IntermediateDataFormat<?> getIDF() {
return new CSVIntermediateDataFormat(getSchema());
}
private Schema getSchema() {
return new Schema("test").addColumn(new Text("t"));
}
@BeforeMethod

View File

@ -70,7 +70,7 @@ public static boolean runJob(Configuration conf,
public static Schema getTestSchema() {
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
schema.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
return schema;
}