mirror of
https://github.com/apache/sqoop.git
synced 2025-05-09 21:01:20 +08:00
SQOOP-1811: Sqoop2: Change IDF *TextData to *CSVTextData
(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
parent
fdc0469505
commit
486af80f45
@ -126,7 +126,7 @@ public CSVIntermediateDataFormat(Schema schema) {
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public String getTextData() {
|
||||
public String getCSVTextData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@ -134,7 +134,7 @@ public String getTextData() {
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setTextData(String text) {
|
||||
public void setCSVTextData(String text) {
|
||||
this.data = text;
|
||||
}
|
||||
|
||||
@ -230,7 +230,7 @@ private String[] getFieldStringArray() {
|
||||
@Override
|
||||
public Object[] getObjectData() {
|
||||
if (schema == null || schema.isEmpty()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006);
|
||||
}
|
||||
|
||||
// fieldStringArray represents the csv fields parsed into string array
|
||||
@ -241,8 +241,8 @@ public Object[] getObjectData() {
|
||||
}
|
||||
|
||||
if (fieldStringArray.length != schema.getColumns().size()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
"The data " + getTextData() + " has the wrong number of fields.");
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005,
|
||||
"The data " + getCSVTextData() + " has the wrong number of fields.");
|
||||
}
|
||||
|
||||
Object[] objectArray = new Object[fieldStringArray.length];
|
||||
@ -304,7 +304,7 @@ private Object parseCSVStringArrayElement(String fieldString, Column column) {
|
||||
returnValue = TRUE_BIT_SET.contains(fieldString);
|
||||
} else {
|
||||
// throw an exception for any unsupported value for BITs
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString);
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString);
|
||||
}
|
||||
break;
|
||||
case ARRAY:
|
||||
@ -315,7 +315,7 @@ private Object parseCSVStringArrayElement(String fieldString, Column column) {
|
||||
returnValue = parseMapElementFromJSON(fieldString);
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
|
||||
"Column type from schema was not recognized for " + column.getType());
|
||||
}
|
||||
return returnValue;
|
||||
@ -348,7 +348,7 @@ private Object[] parseListElementFromJSON(String fieldString) {
|
||||
try {
|
||||
array = (JSONArray) new JSONParser().parse(removeQuotes(fieldString));
|
||||
} catch (org.json.simple.parser.ParseException e) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
|
||||
}
|
||||
if (array != null) {
|
||||
return array.toArray();
|
||||
@ -362,7 +362,7 @@ private Map<Object, Object> parseMapElementFromJSON(String fieldString) {
|
||||
try {
|
||||
object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString));
|
||||
} catch (org.json.simple.parser.ParseException e) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
|
||||
}
|
||||
if (object != null) {
|
||||
return toMap(object);
|
||||
@ -456,7 +456,7 @@ public int compareTo(IntermediateDataFormat<?> o) {
|
||||
throw new IllegalStateException("Expected Data to be instance of "
|
||||
+ "CSVIntermediateFormat, but was an instance of " + o.getClass().getName());
|
||||
}
|
||||
return data.compareTo(o.getTextData());
|
||||
return data.compareTo(o.getCSVTextData());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -471,7 +471,7 @@ private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray)
|
||||
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
|
||||
objectArray[i] = bitStringValue;
|
||||
} else {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + objectArray[i]);
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + objectArray[i]);
|
||||
}
|
||||
}
|
||||
for (int i : stringTypeColumnIndices) {
|
||||
@ -572,7 +572,7 @@ private String escapeByteArrays(byte[] bytes) {
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// We should never hit this case.
|
||||
// This character set should be distributed with Java.
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"The character set " + BYTE_FIELD_CHARSET + " is not available.");
|
||||
}
|
||||
}
|
||||
@ -598,7 +598,7 @@ private String escapeString(String orig) {
|
||||
Matcher.quoteReplacement(replacements[j]));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, orig
|
||||
+ " " + replacement + " " + String.valueOf(j) + " " + e.getMessage());
|
||||
}
|
||||
return encloseWithQuote(replacement);
|
||||
@ -620,7 +620,7 @@ private String unescapeString(String orig) {
|
||||
Matcher.quoteReplacement(String.valueOf(originals[j])));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, orig
|
||||
+ " " + String.valueOf(j) + e.getMessage());
|
||||
}
|
||||
|
||||
@ -638,7 +638,7 @@ private byte[] unescapeByteArray(String orig) {
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// Should never hit this case.
|
||||
// This character set should be distributed with Java.
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"The character set " + BYTE_FIELD_CHARSET + " is not available.");
|
||||
}
|
||||
}
|
||||
|
@ -20,39 +20,39 @@
|
||||
|
||||
import org.apache.sqoop.common.ErrorCode;
|
||||
|
||||
public enum IntermediateDataFormatError implements ErrorCode {
|
||||
public enum CSVIntermediateDataFormatError implements ErrorCode {
|
||||
/** An unknown error has occurred. */
|
||||
INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
|
||||
|
||||
/** An encoding is missing in the Java native libraries. */
|
||||
INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
|
||||
|
||||
/** Error while escaping a row. */
|
||||
INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."),
|
||||
|
||||
/** Error while escaping a row. */
|
||||
INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
|
||||
|
||||
/** Column type isn't known by Intermediate Data Format. */
|
||||
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
|
||||
|
||||
/** Number of columns in schema does not match the data set. */
|
||||
INTERMEDIATE_DATA_FORMAT_0005("Wrong number of columns."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0005("Wrong number of columns."),
|
||||
|
||||
/** Schema is missing in the IDF. */
|
||||
INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
|
||||
|
||||
/** For arrays and maps we use JSON representation and incorrect representation results in parse exception*/
|
||||
INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
|
||||
|
||||
/** Unsupported bit values */
|
||||
INTERMEDIATE_DATA_FORMAT_0009("Unsupported bit value."),
|
||||
CSV_INTERMEDIATE_DATA_FORMAT_0009("Unsupported bit value."),
|
||||
|
||||
;
|
||||
|
||||
private final String message;
|
||||
|
||||
private IntermediateDataFormatError(String message) {
|
||||
private CSVIntermediateDataFormatError(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
@ -25,27 +25,15 @@
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Abstract class representing a pluggable intermediate data format the Sqoop
|
||||
* driver will use to move data to/from the connector. All intermediate
|
||||
* Abstract class representing a pluggable intermediate data format Sqoop
|
||||
* will use to move data between the FROM and TO connectors. All intermediate
|
||||
* data formats are expected to have an internal/native implementation,
|
||||
* but also should minimally be able to return a text (CSV) version of the
|
||||
* data. The data format should also be able to return the data as an object
|
||||
* array - each array representing one row.
|
||||
* but also should minimally be able to return CSV text version as specified by
|
||||
* Sqoop spec. The data format in addition should also be able to return the data
|
||||
* as an object array as represented by the object model - each array represents one row.
|
||||
* <p/>
|
||||
* Why a "native" internal format and then return text too?
|
||||
* Imagine a connector that moves data from a system that stores data as a
|
||||
* serialization format called FooFormat. If I also need the data to be
|
||||
* written into HDFS as FooFormat, the additional cycles burnt in converting
|
||||
* the FooFormat to text and back is useless - so plugging in an intermediate
|
||||
* format that can store the data as FooFormat saves those cycles!
|
||||
* <p/>
|
||||
* Most fast access mechanisms, like mysqldump or pgsqldump write the data
|
||||
* out as CSV, and most often the destination data is also represented as CSV
|
||||
* - so having a minimal CSV support is important, so we can easily pull the
|
||||
* data out as text.
|
||||
* <p/>
|
||||
* Any conversion to the final format from the native or text format is to be
|
||||
* done by the connector or OutputFormat classes.
|
||||
* Any conversion to the format dictated by the corresponding data source from the native or CSV text format
|
||||
* has to be done by the connector themselves both in FROM and TO
|
||||
*
|
||||
* @param <T> - Each data format may have a native representation of the
|
||||
* data, represented by the parameter.
|
||||
@ -58,15 +46,6 @@ public int hashCode() {
|
||||
return data.hashCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set one row of data. If validate is set to true, the data is validated
|
||||
* against the schema.
|
||||
*
|
||||
* @param data - A single row of data to be moved.
|
||||
*/
|
||||
public void setData(T data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get one row of data.
|
||||
@ -79,22 +58,45 @@ public T getData() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get one row of data as CSV.
|
||||
* Set one row of data. If validate is set to true, the data is validated
|
||||
* against the schema.
|
||||
*
|
||||
* @return - String representing the data in CSV, according to the "FROM" schema.
|
||||
* No schema conversion is done on textData, to keep it as "high performance" option.
|
||||
* @param data - A single row of data to be moved.
|
||||
*/
|
||||
public abstract String getTextData();
|
||||
public void setData(T data) {
|
||||
this.data = data;
|
||||
}
|
||||
/**
|
||||
* Get one row of data as CSV text. Use SqoopDataUtils for reading and writing
|
||||
* into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
|
||||
* Why a "native" internal format and then return CSV text too?
|
||||
* Imagine a connector that moves data from a system that stores data as a
|
||||
* serialization format called FooFormat. If I also need the data to be
|
||||
* written into HDFS as FooFormat, the additional cycles burnt in converting
|
||||
* the FooFormat to text and back is useless - so using the sqoop specified
|
||||
* CSV text format saves those extra cycles
|
||||
* <p/>
|
||||
* Most fast access mechanisms, like mysqldump or pgsqldump write the data
|
||||
* out as CSV, and most often the source data is also represented as CSV
|
||||
* - so having a minimal CSV support is mandated for all IDF, so we can easily read the
|
||||
* data out as text and write as text.
|
||||
* <p/>
|
||||
* @return - String representing the data in CSV text format.
|
||||
*/
|
||||
public abstract String getCSVTextData();
|
||||
|
||||
/**
|
||||
* Set one row of data as CSV.
|
||||
*
|
||||
*/
|
||||
public abstract void setTextData(String text);
|
||||
public abstract void setCSVTextData(String csvText);
|
||||
|
||||
/**
|
||||
* Get one row of data as an Object array.
|
||||
*
|
||||
* Get one row of data as an Object array. Sqoop uses defined object representation
|
||||
* for each column type. For instance org.joda.time to represent date.Use SqoopDataUtils
|
||||
* for reading and writing into the sqoop specified object format
|
||||
* for each {@link #ColumnType} field in the row
|
||||
* </p>
|
||||
* @return - String representing the data as an Object array
|
||||
* If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
|
||||
*/
|
||||
@ -131,4 +133,4 @@ public T getData() {
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void read(DataInput in) throws IOException;
|
||||
}
|
||||
}
|
@ -78,7 +78,7 @@ public void testNullInputAsCSVTextInObjectArrayOut() {
|
||||
.addColumn(new Binary("5"))
|
||||
.addColumn(new Text("6"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData(null);
|
||||
dataFormat.setCSVTextData(null);
|
||||
Object[] out = dataFormat.getObjectData();
|
||||
assertNull(out);
|
||||
}
|
||||
@ -93,7 +93,7 @@ public void testEmptyInputAsCSVTextInObjectArrayOut() {
|
||||
.addColumn(new Binary("5"))
|
||||
.addColumn(new Text("6"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("");
|
||||
dataFormat.setCSVTextData("");
|
||||
dataFormat.getObjectData();
|
||||
}
|
||||
|
||||
@ -103,8 +103,8 @@ public void testEmptyInputAsCSVTextInObjectArrayOut() {
|
||||
public void testInputAsCSVTextInCSVTextOut() {
|
||||
String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
|
||||
+ ",'" + String.valueOf(0x0A) + "'";
|
||||
dataFormat.setTextData(testData);
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData(testData);
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -123,7 +123,7 @@ public void testInputAsCSVTextInObjectOut() {
|
||||
.addColumn(new org.apache.sqoop.schema.type.Enum("7"));
|
||||
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
|
||||
Object[] out = dataFormat.getObjectData();
|
||||
|
||||
@ -164,7 +164,7 @@ public void testInputAsObjectArayInCSVTextOut() {
|
||||
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
|
||||
String testData = "10,34,'54','random data'," +
|
||||
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -227,7 +227,7 @@ public void testObjectArrayWithNullInCSVTextOut() {
|
||||
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
|
||||
String testData = "10,34,NULL,'random data'," +
|
||||
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -281,8 +281,8 @@ public void testTimeWithCSVTextInCSVTextOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Time("1", false));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'12:00:00'");
|
||||
assertEquals("'12:00:00'", dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData("'12:00:00'");
|
||||
assertEquals("'12:00:00'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -290,7 +290,7 @@ public void testTimeWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Time("1", false));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'12:59:59'");
|
||||
dataFormat.setCSVTextData("'12:59:59'");
|
||||
org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
|
||||
assertEquals(time.toString(), dataFormat.getObjectData()[0].toString());
|
||||
}
|
||||
@ -303,7 +303,7 @@ public void testTimeWithObjectArrayInCSVTextOut() {
|
||||
org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
|
||||
Object[] in = { time, "test" };
|
||||
dataFormat.setObjectData(in);
|
||||
assertEquals("'15:00:00.000000','test'", dataFormat.getTextData());
|
||||
assertEquals("'15:00:00.000000','test'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -324,8 +324,8 @@ public void testDateWithCSVTextInCSVTextOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Date("1"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01'");
|
||||
assertEquals("'2014-10-01'", dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData("'2014-10-01'");
|
||||
assertEquals("'2014-10-01'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -333,7 +333,7 @@ public void testDateWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Date("1"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01'");
|
||||
dataFormat.setCSVTextData("'2014-10-01'");
|
||||
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
|
||||
assertEquals(date.toString(), dataFormat.getObjectData()[0].toString());
|
||||
}
|
||||
@ -346,7 +346,7 @@ public void testDateWithObjectArrayInCSVTextOut() {
|
||||
org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
|
||||
Object[] in = { date, "test" };
|
||||
dataFormat.setObjectData(in);
|
||||
assertEquals("'2014-10-01','test'", dataFormat.getTextData());
|
||||
assertEquals("'2014-10-01','test'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -368,8 +368,8 @@ public void testDateTimeWithCSVTextInCSVTextOut() {
|
||||
schema.addColumn(new DateTime("1", false, false));
|
||||
dataFormat.setSchema(schema);
|
||||
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00'");
|
||||
assertEquals("'2014-10-01 12:00:00'", dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
|
||||
assertEquals("'2014-10-01 12:00:00'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -377,15 +377,15 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", true, false));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00.000'");
|
||||
assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
|
||||
assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", false, false));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00'");
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
|
||||
// NOTE: string representation will have the T added, it is an
|
||||
// implementation quirk of using JODA
|
||||
assertEquals("2014-10-01T12:00:00", dataFormat.getObjectData()[0].toString());
|
||||
@ -396,7 +396,7 @@ public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", true, false));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00.000'");
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
|
||||
// NOTE: string representation will have the T added, it is an
|
||||
// implementation quirk of using JODA
|
||||
assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
|
||||
@ -410,7 +410,7 @@ public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut(
|
||||
dataFormat.setSchema(schema);
|
||||
DateTimeZone zone = DateTimeZone.forID("America/New_York");
|
||||
org.joda.time.DateTime dateTime = new org.joda.time.DateTime(zone);
|
||||
dataFormat.setTextData(dateTime.toString());
|
||||
dataFormat.setCSVTextData(dateTime.toString());
|
||||
dataFormat.getObjectData()[0].toString();
|
||||
}
|
||||
|
||||
@ -420,7 +420,7 @@ public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", true, true));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-3310-01 12:00:00.000'");
|
||||
dataFormat.setCSVTextData("'2014-3310-01 12:00:00.000'");
|
||||
dataFormat.getObjectData()[0].toString();
|
||||
}
|
||||
|
||||
@ -432,7 +432,7 @@ public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOu
|
||||
// current date time
|
||||
org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
|
||||
String dateTimeString = CSVIntermediateDataFormat.dtfWithFractionNoTimeZone.print(dateTime);
|
||||
dataFormat.setTextData("'" + dateTimeString + "'");
|
||||
dataFormat.setCSVTextData("'" + dateTimeString + "'");
|
||||
assertEquals(dateTimeString.replace(" ", "T"), dataFormat.getObjectData()[0].toString());
|
||||
}
|
||||
|
||||
@ -441,7 +441,7 @@ public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", true, true));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00.000-0400'");
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-0400'");
|
||||
// NOTE: string representation will have the T added, it is an
|
||||
// implementation quirk of using JODA
|
||||
assertEquals("2014-10-01T12:00:00.000-04:00", dataFormat.getObjectData()[0].toString());
|
||||
@ -457,7 +457,7 @@ public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() {
|
||||
Object[] in = { dateTime };
|
||||
dataFormat.setObjectData(in);
|
||||
// Note: DateTime has the timezone info
|
||||
assertEquals("'2014-10-01 12:00:00.001-0400'", dataFormat.getTextData());
|
||||
assertEquals("'2014-10-01 12:00:00.001-0400'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -469,7 +469,7 @@ public void testLocalDateTimeWithObjectInCSVTextOut() {
|
||||
Object[] in = { dateTime };
|
||||
dataFormat.setObjectData(in);
|
||||
// Note: LocalDateTime will not have the timezone info
|
||||
assertEquals("'2014-10-01 12:00:00.002'", dataFormat.getTextData());
|
||||
assertEquals("'2014-10-01 12:00:00.002'", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -477,7 +477,7 @@ public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new DateTime("1", true, true));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("'2014-10-01 12:00:00.000-04:00'");
|
||||
dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-04:00'");
|
||||
DateTimeZone zone = DateTimeZone.forID("America/New_York");
|
||||
org.joda.time.DateTime edateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0, zone);
|
||||
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) dataFormat.getObjectData()[0];
|
||||
@ -498,13 +498,13 @@ public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
|
||||
dataFormat.setSchema(schema);
|
||||
|
||||
for (String trueBit : new String[] { "true", "TRUE" }) {
|
||||
dataFormat.setTextData(trueBit);
|
||||
assertTrue(Boolean.valueOf(dataFormat.getTextData()));
|
||||
dataFormat.setCSVTextData(trueBit);
|
||||
assertTrue(Boolean.valueOf(dataFormat.getCSVTextData()));
|
||||
}
|
||||
|
||||
for (String falseBit : new String[] { "false", "FALSE" }) {
|
||||
dataFormat.setTextData(falseBit);
|
||||
assertFalse(Boolean.valueOf(dataFormat.getTextData()));
|
||||
dataFormat.setCSVTextData(falseBit);
|
||||
assertFalse(Boolean.valueOf(dataFormat.getCSVTextData()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -513,10 +513,10 @@ public void testBitWithCSVTextInAndCSVTextOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Bit("1"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("1");
|
||||
assertEquals("1", dataFormat.getTextData());
|
||||
dataFormat.setTextData("0");
|
||||
assertEquals("0", dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData("1");
|
||||
assertEquals("1", dataFormat.getCSVTextData());
|
||||
dataFormat.setCSVTextData("0");
|
||||
assertEquals("0", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -528,7 +528,7 @@ public void testBitWithObjectArrayInAndCSVTextOut() {
|
||||
data[0] = Boolean.TRUE;
|
||||
data[1] = Boolean.FALSE;
|
||||
dataFormat.setObjectData(data);
|
||||
assertEquals("true,false", dataFormat.getTextData());
|
||||
assertEquals("true,false", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test(expected = SqoopException.class)
|
||||
@ -540,7 +540,7 @@ public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() {
|
||||
data[0] = "1";
|
||||
data[1] = "2";
|
||||
dataFormat.setObjectData(data);
|
||||
assertEquals("1,2", dataFormat.getTextData());
|
||||
assertEquals("1,2", dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -567,12 +567,12 @@ public void testBitWithCSVTextInAndObjectArrayOut() {
|
||||
dataFormat.setSchema(schema);
|
||||
|
||||
for (String trueBit : new String[] { "true", "TRUE", "1" }) {
|
||||
dataFormat.setTextData(trueBit);
|
||||
dataFormat.setCSVTextData(trueBit);
|
||||
assertTrue((Boolean) dataFormat.getObjectData()[0]);
|
||||
}
|
||||
|
||||
for (String falseBit : new String[] { "false", "FALSE", "0" }) {
|
||||
dataFormat.setTextData(falseBit);
|
||||
dataFormat.setCSVTextData(falseBit);
|
||||
assertFalse((Boolean) dataFormat.getObjectData()[0]);
|
||||
}
|
||||
}
|
||||
@ -595,7 +595,7 @@ public void testUnsupportedBitWithCSVTextInAndObjectOut() {
|
||||
Schema schema = new Schema("test");
|
||||
schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData("1,3");
|
||||
dataFormat.setCSVTextData("1,3");
|
||||
assertEquals(true, dataFormat.getObjectData()[0]);
|
||||
assertEquals(false, dataFormat.getObjectData()[1]);
|
||||
}
|
||||
@ -630,7 +630,7 @@ public void testArrayOfStringWithCSVTextInObjectArrayOut() {
|
||||
data[0] = givenArray;
|
||||
data[1] = "text";
|
||||
String testData = "'[\"A\",\"B\"]','text'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
|
||||
assertEquals(Arrays.toString(givenArray), Arrays.toString(expectedArray));
|
||||
assertEquals("text", dataFormat.getObjectData()[1]);
|
||||
@ -649,7 +649,7 @@ public void testArrayOfStringWithObjectArrayInCSVTextOut() {
|
||||
data[1] = "text";
|
||||
String testData = "'[\"A\",\"B\"]','text'";
|
||||
dataFormat.setObjectData(data);
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -659,8 +659,8 @@ public void testArrayOfStringWithCSVTextInCSVTextOut() {
|
||||
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
|
||||
dataFormat.setSchema(schema);
|
||||
String testData = "'[\"A\",\"B\"]','text'";
|
||||
dataFormat.setTextData(testData);
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData(testData);
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -795,7 +795,7 @@ public void testArrayOfObjectsWithCSVTextInObjectArrayOut() {
|
||||
Object[] data = new Object[2];
|
||||
data[0] = arrayOfArrays;
|
||||
data[1] = "text";
|
||||
dataFormat.setTextData("'[\"[11, 12]\",\"[14, 15]\"]','text'");
|
||||
dataFormat.setCSVTextData("'[\"[11, 12]\",\"[14, 15]\"]','text'");
|
||||
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
|
||||
assertEquals(2, expectedArray.length);
|
||||
assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString(expectedArray));
|
||||
@ -810,10 +810,10 @@ public void testArrayOfObjectsWithCSVTextInCSVTextOut() {
|
||||
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
|
||||
dataFormat.setSchema(schema);
|
||||
String input = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
|
||||
dataFormat.setTextData(input);
|
||||
dataFormat.setCSVTextData(input);
|
||||
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
|
||||
assertEquals(2, expectedArray.length);
|
||||
assertEquals(input, dataFormat.getTextData());
|
||||
assertEquals(input, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -836,7 +836,7 @@ public void testArrayOfObjectsWithObjectArrayInCSVTextOut() {
|
||||
data[1] = "text";
|
||||
dataFormat.setObjectData(data);
|
||||
String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
|
||||
assertEquals(expected, dataFormat.getTextData());
|
||||
assertEquals(expected, dataFormat.getCSVTextData());
|
||||
}
|
||||
//**************test cases for map**********************
|
||||
|
||||
@ -942,7 +942,7 @@ public void testMapWithCSVTextInObjectArrayOut() {
|
||||
data[0] = givenMap;
|
||||
data[1] = "text";
|
||||
String testData = "'{\"testKey\":\"testValue\"}','text'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
|
||||
assertEquals(givenMap, expectedMap);
|
||||
@ -961,7 +961,7 @@ public void testMapWithComplexValueWithCSVTextInObjectArrayOut() {
|
||||
data[0] = givenMap;
|
||||
data[1] = "text";
|
||||
String testData = "'{\"testKey\":\"testValue\"}','text'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
|
||||
assertEquals(givenMap, expectedMap);
|
||||
@ -981,7 +981,7 @@ public void testMapWithObjectArrayInCSVTextOut() {
|
||||
data[1] = "text";
|
||||
String testData = "'{\"testKey\":\"testValue\"}','text'";
|
||||
dataFormat.setObjectData(data);
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -991,8 +991,8 @@ public void testMapWithCSVTextInCSVTextOut() {
|
||||
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
|
||||
dataFormat.setSchema(schema);
|
||||
String testData = "'{\"testKey\":\"testValue\"}','text'";
|
||||
dataFormat.setTextData(testData);
|
||||
assertEquals(testData, dataFormat.getTextData());
|
||||
dataFormat.setCSVTextData(testData);
|
||||
assertEquals(testData, dataFormat.getCSVTextData());
|
||||
}
|
||||
//**************test cases for schema*******************
|
||||
@Test(expected=SqoopException.class)
|
||||
@ -1001,7 +1001,7 @@ public void testEmptySchema() {
|
||||
+ ",'\\n'";
|
||||
Schema schema = new Schema("Test");
|
||||
dataFormat.setSchema(schema);
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
Object[] out = dataFormat.getObjectData();
|
||||
|
@ -43,18 +43,18 @@ public SqoopWritable(IntermediateDataFormat<?> dataFormat) {
|
||||
}
|
||||
|
||||
public void setString(String data) {
|
||||
this.dataFormat.setTextData(data);
|
||||
this.dataFormat.setCSVTextData(data);
|
||||
}
|
||||
|
||||
public String getString() { return dataFormat.getTextData(); }
|
||||
public String getString() { return dataFormat.getCSVTextData(); }
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeUTF(dataFormat.getTextData());
|
||||
out.writeUTF(dataFormat.getCSVTextData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException { dataFormat.setTextData(in.readUTF()); }
|
||||
public void readFields(DataInput in) throws IOException { dataFormat.setCSVTextData(in.readUTF()); }
|
||||
|
||||
@Override
|
||||
public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); }
|
||||
|
@ -127,7 +127,7 @@ public void writeArrayRecord(Object[] array) {
|
||||
|
||||
@Override
|
||||
public void writeStringRecord(String text) {
|
||||
fromDataFormat.setTextData(text);
|
||||
fromDataFormat.setCSVTextData(text);
|
||||
writeContent();
|
||||
}
|
||||
|
||||
@ -140,7 +140,7 @@ public void writeRecord(Object obj) {
|
||||
private void writeContent() {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Extracted data: " + fromDataFormat.getTextData());
|
||||
LOG.debug("Extracted data: " + fromDataFormat.getCSVTextData());
|
||||
}
|
||||
// NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process
|
||||
// The output of the mappers is finally written to the toDataFormat object after the matching process
|
||||
|
@ -102,7 +102,7 @@ public void write(SqoopWritable key, NullWritable value) throws InterruptedExcep
|
||||
free.acquire();
|
||||
checkIfConsumerThrew();
|
||||
// NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
|
||||
dataFormat.setTextData(key.getString());
|
||||
dataFormat.setCSVTextData(key.getString());
|
||||
filled.release();
|
||||
}
|
||||
|
||||
@ -172,7 +172,7 @@ public String readTextRecord() throws InterruptedException {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return dataFormat.getTextData();
|
||||
return dataFormat.getCSVTextData();
|
||||
} finally {
|
||||
releaseSema();
|
||||
}
|
||||
|
@ -207,9 +207,9 @@ public static class DummyRecordWriter extends RecordWriter<SqoopWritable, NullWr
|
||||
@Override
|
||||
public void write(SqoopWritable key, NullWritable value) {
|
||||
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
index++;
|
||||
assertEquals(dataFormat.getTextData().toString(), key.toString());
|
||||
assertEquals(dataFormat.getCSVTextData().toString(), key.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -253,9 +253,9 @@ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguratio
|
||||
String data;
|
||||
while ((data = context.getDataReader().readTextRecord()) != null) {
|
||||
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
index++;
|
||||
assertEquals(dataFormat.getTextData().toString(), data);
|
||||
assertEquals(dataFormat.getCSVTextData().toString(), data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -240,9 +240,9 @@ public static class DummyRecordWriter
|
||||
@Override
|
||||
public void write(SqoopWritable key, NullWritable value) {
|
||||
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
|
||||
dataFormat.setTextData(testData);
|
||||
dataFormat.setCSVTextData(testData);
|
||||
index++;
|
||||
assertEquals(dataFormat.getTextData().toString(), key.toString());
|
||||
assertEquals(dataFormat.getCSVTextData().toString(), key.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,7 +135,7 @@ public void testWhenLoaderThrows() throws Throwable {
|
||||
SqoopWritable writable = new SqoopWritable(dataFormat);
|
||||
try {
|
||||
for (int count = 0; count < 100; count++) {
|
||||
dataFormat.setTextData(String.valueOf(count));
|
||||
dataFormat.setCSVTextData(String.valueOf(count));
|
||||
writer.write(writable, null);
|
||||
}
|
||||
} catch (SqoopException ex) {
|
||||
@ -159,7 +159,7 @@ public void testSuccessfulContinuousLoader() throws Throwable {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
dataFormat.setTextData(builder.toString());
|
||||
dataFormat.setCSVTextData(builder.toString());
|
||||
writer.write(writable, null);
|
||||
}
|
||||
writer.close(null);
|
||||
@ -179,7 +179,7 @@ public void testSuccessfulLoader() throws Throwable {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
dataFormat.setTextData(builder.toString());
|
||||
dataFormat.setCSVTextData(builder.toString());
|
||||
writer.write(writable, null);
|
||||
|
||||
//Allow writer to complete.
|
||||
@ -205,7 +205,7 @@ public void testThrowingContinuousLoader() throws Throwable {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
dataFormat.setTextData(builder.toString());
|
||||
dataFormat.setCSVTextData(builder.toString());
|
||||
writer.write(writable, null);
|
||||
}
|
||||
writer.close(null);
|
||||
|
Loading…
Reference in New Issue
Block a user