mirror of
https://github.com/apache/sqoop.git
synced 2025-05-05 03:30:14 +08:00
SQOOP-1817: Sqoop2: Update CSVIntermediate BIT data type
(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
parent
c19f9c9460
commit
c865aefea6
@ -44,6 +44,7 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -89,9 +90,10 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
|
|||||||
// http://www.joda.org/joda-time/key_format.html provides details on the formatter token
|
// http://www.joda.org/joda-time/key_format.html provides details on the formatter token
|
||||||
static final DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSZ");
|
static final DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSZ");
|
||||||
static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
|
static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
|
||||||
static final DateTimeFormatter tf = DateTimeFormat.forPattern("HH:mm:ss.SSSSSSZ");
|
static final DateTimeFormatter tf = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
|
||||||
|
|
||||||
private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
|
||||||
|
private final List<Integer> bitTypeColumnIndices = new ArrayList<Integer>();
|
||||||
private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
|
||||||
private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>();
|
||||||
private final List<Integer> mapTypeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> mapTypeColumnIndices = new ArrayList<Integer>();
|
||||||
@ -99,6 +101,11 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
|
|||||||
private final List<Integer> dateTypeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> dateTypeColumnIndices = new ArrayList<Integer>();
|
||||||
private final List<Integer> timeColumnIndices = new ArrayList<Integer>();
|
private final List<Integer> timeColumnIndices = new ArrayList<Integer>();
|
||||||
|
|
||||||
|
static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
|
||||||
|
static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
|
||||||
|
static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
|
||||||
|
static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
|
||||||
|
|
||||||
private Schema schema;
|
private Schema schema;
|
||||||
|
|
||||||
public CSVIntermediateDataFormat() {
|
public CSVIntermediateDataFormat() {
|
||||||
@ -138,6 +145,8 @@ public void setSchema(Schema schema) {
|
|||||||
for (Column col : columns) {
|
for (Column col : columns) {
|
||||||
if (isColumnStringType(col)) {
|
if (isColumnStringType(col)) {
|
||||||
stringTypeColumnIndices.add(i);
|
stringTypeColumnIndices.add(i);
|
||||||
|
} else if (col.getType() == ColumnType.BIT) {
|
||||||
|
bitTypeColumnIndices.add(i);
|
||||||
} else if (col.getType() == ColumnType.DATE) {
|
} else if (col.getType() == ColumnType.DATE) {
|
||||||
dateTypeColumnIndices.add(i);
|
dateTypeColumnIndices.add(i);
|
||||||
} else if (col.getType() == ColumnType.TIME) {
|
} else if (col.getType() == ColumnType.TIME) {
|
||||||
@ -288,8 +297,12 @@ private Object parseCSVStringArrayElement(String fieldString, Column column) {
|
|||||||
returnValue = DateTime.parse(dateTime);
|
returnValue = DateTime.parse(dateTime);
|
||||||
break;
|
break;
|
||||||
case BIT:
|
case BIT:
|
||||||
returnValue = Boolean.valueOf(fieldString.equals("1")
|
if ((TRUE_BIT_SET.contains(fieldString)) || (FALSE_BIT_SET.contains(fieldString))) {
|
||||||
|| fieldString.toLowerCase().equals("true"));
|
returnValue = TRUE_BIT_SET.contains(fieldString);
|
||||||
|
} else {
|
||||||
|
// throw an exception for any unsupported value for BITs
|
||||||
|
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case ARRAY:
|
case ARRAY:
|
||||||
case SET:
|
case SET:
|
||||||
@ -425,36 +438,44 @@ public int compareTo(IntermediateDataFormat<?> o) {
|
|||||||
/**
|
/**
|
||||||
* Sanitize every element of the CSV string based on the column type
|
* Sanitize every element of the CSV string based on the column type
|
||||||
*
|
*
|
||||||
* @param stringArray
|
* @param objectArray
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) {
|
private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray) {
|
||||||
|
for (int i : bitTypeColumnIndices) {
|
||||||
|
String bitStringValue = objectArray[i].toString();
|
||||||
|
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
|
||||||
|
objectArray[i] = bitStringValue;
|
||||||
|
} else {
|
||||||
|
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + objectArray[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
for (int i : stringTypeColumnIndices) {
|
for (int i : stringTypeColumnIndices) {
|
||||||
stringArray[i] = escapeString((String) stringArray[i]);
|
objectArray[i] = escapeString((String) objectArray[i]);
|
||||||
}
|
}
|
||||||
for (int i : dateTimeTypeColumnIndices) {
|
for (int i : dateTimeTypeColumnIndices) {
|
||||||
if (stringArray[i] instanceof org.joda.time.DateTime) {
|
if (objectArray[i] instanceof org.joda.time.DateTime) {
|
||||||
stringArray[i] = encloseWithQuote(dtf.print((org.joda.time.DateTime) stringArray[i]));
|
objectArray[i] = encloseWithQuote(dtf.print((org.joda.time.DateTime) objectArray[i]));
|
||||||
} else if (stringArray[i] instanceof org.joda.time.LocalDateTime) {
|
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
|
||||||
stringArray[i] = encloseWithQuote(dtf.print((org.joda.time.LocalDateTime) stringArray[i]));
|
objectArray[i] = encloseWithQuote(dtf.print((org.joda.time.LocalDateTime) objectArray[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int i : dateTypeColumnIndices) {
|
for (int i : dateTypeColumnIndices) {
|
||||||
org.joda.time.LocalDate date = (org.joda.time.LocalDate) stringArray[i];
|
org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
|
||||||
stringArray[i] = encloseWithQuote(df.print(date));
|
objectArray[i] = encloseWithQuote(df.print(date));
|
||||||
}
|
}
|
||||||
for (int i : timeColumnIndices) {
|
for (int i : timeColumnIndices) {
|
||||||
org.joda.time.LocalTime date = (org.joda.time.LocalTime) stringArray[i];
|
org.joda.time.LocalTime date = (org.joda.time.LocalTime) objectArray[i];
|
||||||
stringArray[i] = encloseWithQuote(tf.print(date));
|
objectArray[i] = encloseWithQuote(tf.print(date));
|
||||||
}
|
}
|
||||||
for (int i : byteTypeColumnIndices) {
|
for (int i : byteTypeColumnIndices) {
|
||||||
stringArray[i] = escapeByteArrays((byte[]) stringArray[i]);
|
objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
|
||||||
}
|
}
|
||||||
for (int i : listTypeColumnIndices) {
|
for (int i : listTypeColumnIndices) {
|
||||||
stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]);
|
objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
|
||||||
}
|
}
|
||||||
for (int i : mapTypeColumnIndices) {
|
for (int i : mapTypeColumnIndices) {
|
||||||
stringArray[i] = encodeMap((Map<Object, Object>) stringArray[i], columnArray[i]);
|
objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], columnArray[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -571,4 +592,4 @@ private byte[] unescapeByteArray(String orig) {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,13 +36,18 @@ public enum IntermediateDataFormatError implements ErrorCode {
|
|||||||
/** Column type isn't known by Intermediate Data Format. */
|
/** Column type isn't known by Intermediate Data Format. */
|
||||||
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
|
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
|
||||||
|
|
||||||
/** Number of fields. */
|
/** Number of columns in schema does not match the data set. */
|
||||||
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."),
|
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of columns."),
|
||||||
|
|
||||||
|
/** Schema is missing in the IDF. */
|
||||||
INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
|
INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
|
||||||
|
|
||||||
|
/** For arrays and maps we use JSON representation and incorrect representation results in parse exception*/
|
||||||
INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
|
INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
|
||||||
|
|
||||||
|
/** Unsupported bit values */
|
||||||
|
INTERMEDIATE_DATA_FORMAT_0009("Unsupported bit value."),
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final String message;
|
private final String message;
|
||||||
|
@ -442,28 +442,118 @@ public void testDateTimeISO8601Alternative() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// **************test cases for BIT*******************
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBit() {
|
public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
|
||||||
Schema schema = new Schema("test");
|
Schema schema = new Schema("test");
|
||||||
schema.addColumn(new Bit("1"));
|
schema.addColumn(new Bit("1"));
|
||||||
dataFormat.setSchema(schema);
|
dataFormat.setSchema(schema);
|
||||||
|
|
||||||
for (String trueBit : new String[]{
|
for (String trueBit : new String[] { "true", "TRUE" }) {
|
||||||
"true", "TRUE", "1"
|
dataFormat.setTextData(trueBit);
|
||||||
}) {
|
assertTrue(Boolean.valueOf(dataFormat.getTextData()));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String falseBit : new String[] { "false", "FALSE" }) {
|
||||||
|
dataFormat.setTextData(falseBit);
|
||||||
|
assertFalse(Boolean.valueOf(dataFormat.getTextData()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBitWithCSVTextInAndCSVTextOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
dataFormat.setTextData("1");
|
||||||
|
assertEquals("1", dataFormat.getTextData());
|
||||||
|
dataFormat.setTextData("0");
|
||||||
|
assertEquals("0", dataFormat.getTextData());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBitWithObjectArrayInAndCSVTextOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
Object[] data = new Object[2];
|
||||||
|
data[0] = Boolean.TRUE;
|
||||||
|
data[1] = Boolean.FALSE;
|
||||||
|
dataFormat.setObjectData(data);
|
||||||
|
assertEquals("true,false", dataFormat.getTextData());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = SqoopException.class)
|
||||||
|
public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
Object[] data = new Object[2];
|
||||||
|
data[0] = "1";
|
||||||
|
data[1] = "2";
|
||||||
|
dataFormat.setObjectData(data);
|
||||||
|
assertEquals("1,2", dataFormat.getTextData());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBitWithObjectArrayInAndObjectOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
Object[] data = new Object[2];
|
||||||
|
data[0] = Boolean.TRUE;
|
||||||
|
data[1] = Boolean.FALSE;
|
||||||
|
dataFormat.setObjectData(data);
|
||||||
|
assertEquals(true, dataFormat.getObjectData()[0]);
|
||||||
|
assertEquals(false, dataFormat.getObjectData()[1]);
|
||||||
|
data[0] = "1";
|
||||||
|
data[1] = "0";
|
||||||
|
dataFormat.setObjectData(data);
|
||||||
|
assertEquals(true, dataFormat.getObjectData()[0]);
|
||||||
|
assertEquals(false, dataFormat.getObjectData()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBitWithCSVTextInAndObjectArrayOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
|
||||||
|
for (String trueBit : new String[] { "true", "TRUE", "1" }) {
|
||||||
dataFormat.setTextData(trueBit);
|
dataFormat.setTextData(trueBit);
|
||||||
assertTrue((Boolean) dataFormat.getObjectData()[0]);
|
assertTrue((Boolean) dataFormat.getObjectData()[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String falseBit : new String[]{
|
for (String falseBit : new String[] { "false", "FALSE", "0" }) {
|
||||||
"false", "FALSE", "0"
|
|
||||||
}) {
|
|
||||||
dataFormat.setTextData(falseBit);
|
dataFormat.setTextData(falseBit);
|
||||||
assertFalse((Boolean) dataFormat.getObjectData()[0]);
|
assertFalse((Boolean) dataFormat.getObjectData()[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//**************test cases for arrays*******************
|
@Test(expected = SqoopException.class)
|
||||||
|
public void testUnsupportedBitWithObjectArrayInAndObjectOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
Object[] data = new Object[2];
|
||||||
|
data[0] = "1";
|
||||||
|
data[1] = "2";
|
||||||
|
dataFormat.setObjectData(data);
|
||||||
|
assertEquals(true, dataFormat.getObjectData()[0]);
|
||||||
|
assertEquals(false, dataFormat.getObjectData()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = SqoopException.class)
|
||||||
|
public void testUnsupportedBitWithCSVTextInAndObjectOut() {
|
||||||
|
Schema schema = new Schema("test");
|
||||||
|
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||||
|
dataFormat.setSchema(schema);
|
||||||
|
dataFormat.setTextData("1,3");
|
||||||
|
assertEquals(true, dataFormat.getObjectData()[0]);
|
||||||
|
assertEquals(false, dataFormat.getObjectData()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// **************test cases for arrays*******************
|
||||||
@Test
|
@Test
|
||||||
public void testArrayOfStringWithObjectArrayInObjectArrayOut() {
|
public void testArrayOfStringWithObjectArrayInObjectArrayOut() {
|
||||||
Schema schema = new Schema("test");
|
Schema schema = new Schema("test");
|
||||||
|
Loading…
Reference in New Issue
Block a user