mirror of
https://github.com/apache/sqoop.git
synced 2025-05-11 22:41:50 +08:00
SQOOP-1936: Sqoop2: Sort by comparing IDF data in shuffle phase
(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
parent
2d54e26a08
commit
7631d29333
@ -58,8 +58,7 @@ public CSVIntermediateDataFormat(Schema schema) {
|
||||
*/
|
||||
@Override
|
||||
public String getCSVTextData() {
|
||||
// TODO:SQOOP-1936 to enable schema validation after we use compareTo
|
||||
return this.data;
|
||||
return super.getData();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -81,14 +80,14 @@ public Object[] getObjectData() {
|
||||
if (csvStringArray == null) {
|
||||
return null;
|
||||
}
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
|
||||
if (csvStringArray.length != schema.getColumnsArray().length) {
|
||||
if (csvStringArray.length != columns.length) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"The data " + getCSVTextData() + " has the wrong number of fields.");
|
||||
}
|
||||
|
||||
Object[] objectArray = new Object[csvStringArray.length];
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
for (int i = 0; i < csvStringArray.length; i++) {
|
||||
if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
|
@ -44,10 +44,13 @@
|
||||
* Any conversion to the format dictated by the corresponding data source from the native or CSV text format
|
||||
* has to be done by the connector themselves both in FROM and TO
|
||||
*
|
||||
* NOTE: we cannot use the generic for comparable, since the comparison can be arbitrary for instance,
|
||||
* purely based on text format
|
||||
* @param <T> - Each data format may have a native representation of the
|
||||
* data, represented by the parameter.
|
||||
*/
|
||||
public abstract class IntermediateDataFormat<T> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
public abstract class IntermediateDataFormat<T> implements Comparable {
|
||||
|
||||
protected volatile T data;
|
||||
|
||||
@ -203,4 +206,15 @@ public boolean equals(Object obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.data.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Object o) {
|
||||
IntermediateDataFormat<?> idf = (IntermediateDataFormat<?>) o;
|
||||
return toString().compareTo(idf.toString());
|
||||
}
|
||||
|
||||
}
|
@ -419,4 +419,8 @@ private Object[] toObject(JSONObject json) {
|
||||
return object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.data.toJSONString();
|
||||
}
|
||||
}
|
||||
|
@ -1143,8 +1143,7 @@ public void testNotSettingSchemaAndGetData() {
|
||||
dataFormat.getData();
|
||||
}
|
||||
|
||||
//SQOOP-1936 to enable schema validation after we use compareTo
|
||||
@Test
|
||||
@Test(expectedExceptions = SqoopException.class)
|
||||
public void testNotSettingSchemaAndGetCSVData() {
|
||||
dataFormat = new CSVIntermediateDataFormat();
|
||||
dataFormat.getCSVTextData();
|
||||
|
@ -68,12 +68,12 @@ public void readFields(DataInput in) throws IOException {
|
||||
|
||||
@Override
|
||||
public int compareTo(SqoopWritable o) {
|
||||
return toString().compareTo(o.toString());
|
||||
return toIDF.compareTo(o.toIDF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toIDF.getCSVTextData();
|
||||
return toIDF.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -50,15 +50,6 @@ public void setUp() {
|
||||
writable = new SqoopWritable(idfMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringInStringOut() {
|
||||
String testData = "Live Long and prosper";
|
||||
writable.setString(testData);
|
||||
verify(idfMock, times(1)).setCSVTextData(testData);
|
||||
writable.toString();
|
||||
verify(idfMock, times(1)).getCSVTextData();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWrite() throws IOException {
|
||||
String testData = "One ring to rule them all";
|
||||
|
Loading…
Reference in New Issue
Block a user