diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index fe4cdd79..e4a83b1b 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -67,7 +67,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { public static final char ESCAPE_CHARACTER = '\\'; public static final char QUOTE_CHARACTER = '\''; - public static final String NULL_STRING = "NULL"; + public static final String NULL_VALUE = "NULL"; private static final char[] originals = { 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 @@ -249,7 +249,7 @@ public Object[] getObjectData() { Column[] columnArray = schema.getColumns().toArray(new Column[fieldStringArray.length]); for (int i = 0; i < fieldStringArray.length; i++) { // check for NULL field and bail out immediately - if (fieldStringArray[i].equals("NULL")) { + if (fieldStringArray[i].equals(NULL_VALUE)) { objectArray[i] = null; continue; } @@ -410,8 +410,17 @@ else if (value instanceof JSONObject) { */ @Override public void setObjectData(Object[] data) { + Set nullValueIndices = new HashSet(); Column[] columnArray = schema.getColumns().toArray(new Column[data.length]); - encodeCSVStringElements(data, columnArray); + // check for null + for (int i = 0; i < data.length; i++) { + if (data[i] == null) { + nullValueIndices.add(i); + data[i] = NULL_VALUE; + } + } + // ignore the null values while encoding the object array into csv string + encodeCSVStringElements(data, columnArray, nullValueIndices); this.data = StringUtils.join(data, SEPARATOR_CHARACTER); } @@ -465,49 +474,66 @@ public int compareTo(IntermediateDataFormat o) { * @param objectArray */ @SuppressWarnings("unchecked") - private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray) { + private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, Set nullValueIndices) { for (int i : bitTypeColumnIndices) { - String bitStringValue = objectArray[i].toString(); - if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { - objectArray[i] = bitStringValue; - } else { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + objectArray[i]); + if (!nullValueIndices.contains(i)) { + String bitStringValue = objectArray[i].toString(); + if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { + objectArray[i] = bitStringValue; + } else { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + + objectArray[i]); + } } } for (int i : stringTypeColumnIndices) { - objectArray[i] = escapeString((String) objectArray[i]); + if (!nullValueIndices.contains(i)) { + objectArray[i] = escapeString((String) objectArray[i]); + } } for (int i : dateTimeTypeColumnIndices) { - Column col = columnArray[i]; - if (objectArray[i] instanceof org.joda.time.DateTime) { - org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; - // check for fraction and time zone and then use the right formatter - formatDateTime(objectArray, i, col, dateTime); - } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { - org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; - formatLocalDateTime(objectArray, i, col, localDateTime); + if (!nullValueIndices.contains(i)) { + Column col = columnArray[i]; + if (objectArray[i] instanceof org.joda.time.DateTime) { + org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; + // check for fraction and time zone and then use the right formatter + formatDateTime(objectArray, i, col, dateTime); + } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { + org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; + formatLocalDateTime(objectArray, i, col, localDateTime); + } } } for (int i : dateTypeColumnIndices) { - org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i]; - objectArray[i] = encloseWithQuote(df.print(date)); + if (!nullValueIndices.contains(i)) { + org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i]; + objectArray[i] = encloseWithQuote(df.print(date)); + } } for (int i : timeColumnIndices) { Column col = columnArray[i]; - if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { - objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i])); - } else { - objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i])); + if (!nullValueIndices.contains(i)) { + if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { + objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i])); + } else { + objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i])); + } } } for (int i : byteTypeColumnIndices) { - objectArray[i] = escapeByteArrays((byte[]) objectArray[i]); + if (!nullValueIndices.contains(i)) { + objectArray[i] = escapeByteArrays((byte[]) objectArray[i]); + } } for (int i : listTypeColumnIndices) { - objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]); + if (!nullValueIndices.contains(i)) { + objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]); + } } for (int i : mapTypeColumnIndices) { - objectArray[i] = encodeMap((Map) objectArray[i], columnArray[i]); + if (!nullValueIndices.contains(i)) { + objectArray[i] = encodeMap((Map) objectArray[i], columnArray[i]); + } } } @@ -562,8 +588,7 @@ private boolean isColumnListType(Column listType) { } private boolean isColumnStringType(Column stringType) { - return stringType.getType().equals(ColumnType.TEXT) - || stringType.getType().equals(ColumnType.ENUM); + return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM); } private String escapeByteArrays(byte[] bytes) { @@ -586,10 +611,6 @@ private String getRegExp(String orig) { } private String escapeString(String orig) { - if (orig == null) { - return NULL_STRING; - } - int j = 0; String replacement = orig; try { diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 2376d4a5..1a2a96fb 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Array; @@ -39,6 +40,7 @@ import org.apache.sqoop.schema.type.Bit; import org.apache.sqoop.schema.type.Date; import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.Decimal; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.Text; import org.apache.sqoop.schema.type.Time; @@ -83,20 +85,108 @@ public void testNullInputAsCSVTextInObjectArrayOut() { assertNull(out); } - @Test(expected=SqoopException.class) + @Test(expected = SqoopException.class) public void testEmptyInputAsCSVTextInObjectArrayOut() { Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); + schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4")) + .addColumn(new Binary("5")).addColumn(new Text("6")); dataFormat.setSchema(schema); dataFormat.setCSVTextData(""); dataFormat.getObjectData(); } + @Test + public void testNullValueAsObjectArrayInAndCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3")) + .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5")) + .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7")) + .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false)) + .addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10")) + .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11")) + .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) + .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); + + dataFormat.setSchema(schema); + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null }; + dataFormat.setObjectData(in); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + for (String text : textValues) { + assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE); + } + } + + @Test + public void testNullValueAsObjectArrayInAndObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3")) + .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5")) + .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7")) + .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false)) + .addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10")) + .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11")) + .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) + .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); + + dataFormat.setSchema(schema); + Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null }; + dataFormat.setObjectData(in); + + Object[] out = dataFormat.getObjectData(); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3")) + .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5")) + .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7")) + .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false)) + .addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10")) + .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11")) + .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) + .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); + + dataFormat.setSchema(schema); + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + + Object[] out = dataFormat.getObjectData(); + for (Object obj : out) { + assertEquals(obj, null); + } + } + + @Test + public void testNullValueAsCSVTextInAndCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3")) + .addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5")) + .addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7")) + .addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false)) + .addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10")) + .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11")) + .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4"))) + .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14")); + + dataFormat.setSchema(schema); + String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", + "NULL" }; + dataFormat.setCSVTextData(StringUtils.join(test, ",")); + + String csvText = dataFormat.getCSVTextData(); + String[] textValues = csvText.split(","); + for (String text : textValues) { + assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE); + } + } + //**************test cases for primitive types( text, number, bytearray)******************* @Test