5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-10 22:13:07 +08:00

SQOOP-1825: Sqoop2: Handle NULLs for all types in CSV Intermediate Data Format

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-12 11:51:00 -06:00
parent 546f861527
commit 1fc6589221
2 changed files with 151 additions and 40 deletions

View File

@ -67,7 +67,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final char ESCAPE_CHARACTER = '\\'; public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\''; public static final char QUOTE_CHARACTER = '\'';
public static final String NULL_STRING = "NULL"; public static final String NULL_VALUE = "NULL";
private static final char[] originals = { private static final char[] originals = {
0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
@ -249,7 +249,7 @@ public Object[] getObjectData() {
Column[] columnArray = schema.getColumns().toArray(new Column[fieldStringArray.length]); Column[] columnArray = schema.getColumns().toArray(new Column[fieldStringArray.length]);
for (int i = 0; i < fieldStringArray.length; i++) { for (int i = 0; i < fieldStringArray.length; i++) {
// check for NULL field and bail out immediately // check for NULL field and bail out immediately
if (fieldStringArray[i].equals("NULL")) { if (fieldStringArray[i].equals(NULL_VALUE)) {
objectArray[i] = null; objectArray[i] = null;
continue; continue;
} }
@ -410,8 +410,17 @@ else if (value instanceof JSONObject) {
*/ */
@Override @Override
public void setObjectData(Object[] data) { public void setObjectData(Object[] data) {
Set<Integer> nullValueIndices = new HashSet<Integer>();
Column[] columnArray = schema.getColumns().toArray(new Column[data.length]); Column[] columnArray = schema.getColumns().toArray(new Column[data.length]);
encodeCSVStringElements(data, columnArray); // check for null
for (int i = 0; i < data.length; i++) {
if (data[i] == null) {
nullValueIndices.add(i);
data[i] = NULL_VALUE;
}
}
// ignore the null values while encoding the object array into csv string
encodeCSVStringElements(data, columnArray, nullValueIndices);
this.data = StringUtils.join(data, SEPARATOR_CHARACTER); this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
} }
@ -465,49 +474,66 @@ public int compareTo(IntermediateDataFormat<?> o) {
* @param objectArray * @param objectArray
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray) { private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
for (int i : bitTypeColumnIndices) { for (int i : bitTypeColumnIndices) {
String bitStringValue = objectArray[i].toString(); if (!nullValueIndices.contains(i)) {
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { String bitStringValue = objectArray[i].toString();
objectArray[i] = bitStringValue; if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
} else { objectArray[i] = bitStringValue;
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + objectArray[i]); } else {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
+ objectArray[i]);
}
} }
} }
for (int i : stringTypeColumnIndices) { for (int i : stringTypeColumnIndices) {
objectArray[i] = escapeString((String) objectArray[i]); if (!nullValueIndices.contains(i)) {
objectArray[i] = escapeString((String) objectArray[i]);
}
} }
for (int i : dateTimeTypeColumnIndices) { for (int i : dateTimeTypeColumnIndices) {
Column col = columnArray[i]; if (!nullValueIndices.contains(i)) {
if (objectArray[i] instanceof org.joda.time.DateTime) { Column col = columnArray[i];
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; if (objectArray[i] instanceof org.joda.time.DateTime) {
// check for fraction and time zone and then use the right formatter org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
formatDateTime(objectArray, i, col, dateTime); // check for fraction and time zone and then use the right formatter
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { formatDateTime(objectArray, i, col, dateTime);
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
formatLocalDateTime(objectArray, i, col, localDateTime); org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
formatLocalDateTime(objectArray, i, col, localDateTime);
}
} }
} }
for (int i : dateTypeColumnIndices) { for (int i : dateTypeColumnIndices) {
org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i]; if (!nullValueIndices.contains(i)) {
objectArray[i] = encloseWithQuote(df.print(date)); org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
objectArray[i] = encloseWithQuote(df.print(date));
}
} }
for (int i : timeColumnIndices) { for (int i : timeColumnIndices) {
Column col = columnArray[i]; Column col = columnArray[i];
if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { if (!nullValueIndices.contains(i)) {
objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i])); if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
} else { objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i])); } else {
objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
}
} }
} }
for (int i : byteTypeColumnIndices) { for (int i : byteTypeColumnIndices) {
objectArray[i] = escapeByteArrays((byte[]) objectArray[i]); if (!nullValueIndices.contains(i)) {
objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
}
} }
for (int i : listTypeColumnIndices) { for (int i : listTypeColumnIndices) {
objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]); if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
}
} }
for (int i : mapTypeColumnIndices) { for (int i : mapTypeColumnIndices) {
objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], columnArray[i]); if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], columnArray[i]);
}
} }
} }
@ -562,8 +588,7 @@ private boolean isColumnListType(Column listType) {
} }
private boolean isColumnStringType(Column stringType) { private boolean isColumnStringType(Column stringType) {
return stringType.getType().equals(ColumnType.TEXT) return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
|| stringType.getType().equals(ColumnType.ENUM);
} }
private String escapeByteArrays(byte[] bytes) { private String escapeByteArrays(byte[] bytes) {
@ -586,10 +611,6 @@ private String getRegExp(String orig) {
} }
private String escapeString(String orig) { private String escapeString(String orig) {
if (orig == null) {
return NULL_STRING;
}
int j = 0; int j = 0;
String replacement = orig; String replacement = orig;
try { try {

View File

@ -32,6 +32,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Array;
@ -39,6 +40,7 @@
import org.apache.sqoop.schema.type.Bit; import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Date; import org.apache.sqoop.schema.type.Date;
import org.apache.sqoop.schema.type.DateTime; import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.Decimal;
import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text; import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.schema.type.Time; import org.apache.sqoop.schema.type.Time;
@ -83,20 +85,108 @@ public void testNullInputAsCSVTextInObjectArrayOut() {
assertNull(out); assertNull(out);
} }
@Test(expected=SqoopException.class) @Test(expected = SqoopException.class)
public void testEmptyInputAsCSVTextInObjectArrayOut() { public void testEmptyInputAsCSVTextInObjectArrayOut() {
Schema schema = new Schema("test"); Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")) schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4"))
.addColumn(new FixedPoint("2")) .addColumn(new Binary("5")).addColumn(new Text("6"));
.addColumn(new Text("3"))
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
dataFormat.setSchema(schema); dataFormat.setSchema(schema);
dataFormat.setCSVTextData(""); dataFormat.setCSVTextData("");
dataFormat.getObjectData(); dataFormat.getObjectData();
} }
@Test
public void testNullValueAsObjectArrayInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3"))
.addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
.addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7"))
.addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false))
.addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
.addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
dataFormat.setObjectData(in);
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
}
}
@Test
public void testNullValueAsObjectArrayInAndObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3"))
.addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
.addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7"))
.addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false))
.addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
.addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
dataFormat.setObjectData(in);
Object[] out = dataFormat.getObjectData();
for (Object obj : out) {
assertEquals(obj, null);
}
}
@Test
public void testNullValueAsCSVTextInAndObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3"))
.addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
.addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7"))
.addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false))
.addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
.addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
"NULL" };
dataFormat.setCSVTextData(StringUtils.join(test, ","));
Object[] out = dataFormat.getObjectData();
for (Object obj : out) {
assertEquals(obj, null);
}
}
@Test
public void testNullValueAsCSVTextInAndCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1")).addColumn(new Decimal("2")).addColumn(new Text("3"))
.addColumn(new Array("4", new Text("t"))).addColumn(new Binary("5"))
.addColumn(new org.apache.sqoop.schema.type.Map("6", new Text("t1"), new Text("t2"))).addColumn(new Bit("7"))
.addColumn(new org.apache.sqoop.schema.type.DateTime("8", false, false))
.addColumn(new org.apache.sqoop.schema.type.Time("9", false)).addColumn(new org.apache.sqoop.schema.type.Date("10"))
.addColumn(new org.apache.sqoop.schema.type.FloatingPoint("11"))
.addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
.addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
dataFormat.setSchema(schema);
String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
"NULL" };
dataFormat.setCSVTextData(StringUtils.join(test, ","));
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
}
}
//**************test cases for primitive types( text, number, bytearray)******************* //**************test cases for primitive types( text, number, bytearray)*******************
@Test @Test