5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 03:30:14 +08:00

SQOOP-1750: Support Map Type in CSV IDF

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-12-01 15:24:02 -08:00
parent 34d7066c38
commit d68c05d3ad
2 changed files with 236 additions and 9 deletions

View File

@ -30,6 +30,7 @@
import org.joda.time.LocalDate; import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime; import org.joda.time.LocalDateTime;
import org.json.simple.JSONArray; import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser; import org.json.simple.parser.JSONParser;
import java.io.DataInput; import java.io.DataInput;
@ -39,8 +40,11 @@
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -85,6 +89,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>(); private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>(); private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>(); private final List<Integer> listTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> mapTypeColumnIndices = new ArrayList<Integer>();
private Schema schema; private Schema schema;
@ -129,6 +134,8 @@ public void setSchema(Schema schema) {
byteTypeColumnIndices.add(i); byteTypeColumnIndices.add(i);
} else if (isColumnListType(col)) { } else if (isColumnListType(col)) {
listTypeColumnIndices.add(i); listTypeColumnIndices.add(i);
} else if (col.getType() == ColumnType.MAP) {
mapTypeColumnIndices.add(i);
} }
i++; i++;
} }
@ -147,7 +154,6 @@ private String[] getFieldStringArray() {
boolean quoted = false; boolean quoted = false;
boolean escaped = false; boolean escaped = false;
boolean insideJSON = false;
List<String> parsedData = new LinkedList<String>(); List<String> parsedData = new LinkedList<String>();
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
@ -167,7 +173,7 @@ private String[] getFieldStringArray() {
escaped = !escaped; escaped = !escaped;
break; break;
case SEPARATOR_CHARACTER: case SEPARATOR_CHARACTER:
if (quoted || insideJSON) { if (quoted) {
builder.append(c); builder.append(c);
} else { } else {
parsedData.add(builder.toString()); parsedData.add(builder.toString());
@ -217,12 +223,12 @@ public Object[] getObjectData() {
objectArray[i] = null; objectArray[i] = null;
continue; continue;
} }
objectArray[i] = parseStringArrayElement(fieldStringArray[i], columnArray[i]); objectArray[i] = parseCSVStringArrayElement(fieldStringArray[i], columnArray[i]);
} }
return objectArray; return objectArray;
} }
private Object parseStringArrayElement(String fieldString, Column column) { private Object parseCSVStringArrayElement(String fieldString, Column column) {
Object returnValue = null; Object returnValue = null;
switch (column.getType()) { switch (column.getType()) {
@ -271,6 +277,9 @@ private Object parseStringArrayElement(String fieldString, Column column) {
case SET: case SET:
returnValue = parseListElementFromJSON(fieldString); returnValue = parseListElementFromJSON(fieldString);
break; break;
case MAP:
returnValue = parseMapElementFromJSON(fieldString);
break;
default: default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
"Column type from schema was not recognized for " + column.getType()); "Column type from schema was not recognized for " + column.getType());
@ -287,11 +296,60 @@ private Object[] parseListElementFromJSON(String fieldString) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e); throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
} }
if (array != null) { if (array != null) {
return array.toArray(); return array.toArray();
} }
return null; return null;
} }
private Map<Object, Object> parseMapElementFromJSON(String fieldString) {
JSONObject object = null;
try {
object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString));
} catch (org.json.simple.parser.ParseException e) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e);
}
if (object != null) {
return toMap(object);
}
return null;
}
private List<Object> toList(JSONArray array) {
List<Object> list = new ArrayList<Object>();
for (int i = 0; i < array.size(); i++) {
Object value = array.get(i);
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
list.add(value);
}
return list;
}
@SuppressWarnings("unchecked")
private Map<Object, Object> toMap(JSONObject object) {
Map<Object, Object> elementMap = new HashMap<Object, Object>();
Set<Map.Entry<Object, Object>> entries = object.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
Object value = entry.getValue();
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
elementMap.put(entry.getKey(), value);
}
return elementMap;
}
/** /**
* Appends the actual java objects into CSV string {@inheritDoc} * Appends the actual java objects into CSV string {@inheritDoc}
*/ */
@ -351,6 +409,7 @@ public int compareTo(IntermediateDataFormat<?> o) {
* *
* @param stringArray * @param stringArray
*/ */
@SuppressWarnings("unchecked")
private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) { private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) {
for (int i : stringTypeColumnIndices) { for (int i : stringTypeColumnIndices) {
stringArray[i] = escapeString((String) stringArray[i]); stringArray[i] = escapeString((String) stringArray[i]);
@ -361,6 +420,16 @@ private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray)
for (int i : listTypeColumnIndices) { for (int i : listTypeColumnIndices) {
stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]); stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]);
} }
for (int i : mapTypeColumnIndices) {
stringArray[i] = encodeMap((Map<Object, Object>) stringArray[i], columnArray[i]);
}
}
@SuppressWarnings("unchecked")
private String encodeMap(Map<Object, Object> map, Column column) {
JSONObject object = new JSONObject();
object.putAll(map);
return encloseWithQuote(object.toJSONString());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -26,12 +26,15 @@
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Array;
import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit; import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Date; import org.apache.sqoop.schema.type.Date;
@ -43,8 +46,6 @@
public class TestCSVIntermediateDataFormat { public class TestCSVIntermediateDataFormat {
private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
private IntermediateDataFormat<?> dataFormat; private IntermediateDataFormat<?> dataFormat;
@Before @Before
@ -54,8 +55,10 @@ public void setUp() {
private String getByteFieldString(byte[] byteFieldData) { private String getByteFieldString(byte[] byteFieldData) {
try { try {
return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); return new StringBuilder("'")
} catch(UnsupportedEncodingException e) { .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET))
.append("'").toString();
} catch (UnsupportedEncodingException e) {
// Should never get to this point because ISO-8859-1 is a standard codec. // Should never get to this point because ISO-8859-1 is a standard codec.
return null; return null;
} }
@ -566,7 +569,162 @@ public void testArrayOfObjectsWithObjectArrayInCSVTextOut() {
String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'"; String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
assertEquals(expected, dataFormat.getTextData()); assertEquals(expected, dataFormat.getTextData());
} }
//**************test cases for map**********************
@Test
public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("testKey", "testValue");
// create an array inside the object array
Object[] data = new Object[2];
data[0] = map;
data[1] = "text";
dataFormat.setObjectData(data);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(map, expectedMap);
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new FixedPoint("number"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<Integer> intList = new ArrayList<Integer>();
intList.add(11);
intList.add(12);
givenMap.put("testKey", intList);
// create an array inside the object array
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
dataFormat.setObjectData(data);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(givenMap.toString(), expectedMap.toString());
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new Text("text"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<String> stringList = new ArrayList<String>();
stringList.add("A");
stringList.add("A");
givenMap.put("testKey", stringList);
// create an array inside the object array
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
dataFormat.setObjectData(data);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(givenMap.toString(), expectedMap.toString());
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
new Text("text"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
List<String> stringList = new ArrayList<String>();
stringList.add("A");
stringList.add("A");
Map<String, List<String>> anotherMap = new HashMap<String, List<String>>();
anotherMap.put("anotherKey", stringList);
givenMap.put("testKey", anotherMap);
// create an array inside the object array
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
dataFormat.setObjectData(data);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(givenMap.toString(), expectedMap.toString());
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
String testData = "'{\"testKey\":\"testValue\"}','text'";
dataFormat.setTextData(testData);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(givenMap, expectedMap);
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithComplexValueWithCSVTextInObjectArrayOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
String testData = "'{\"testKey\":\"testValue\"}','text'";
dataFormat.setTextData(testData);
@SuppressWarnings("unchecked")
Map<Object, Object> expectedMap = (Map<Object, Object>) dataFormat.getObjectData()[0];
assertEquals(givenMap, expectedMap);
assertEquals("text", dataFormat.getObjectData()[1]);
}
@Test
public void testMapWithObjectArrayInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
Map<Object, Object> givenMap = new HashMap<Object, Object>();
givenMap.put("testKey", "testValue");
Object[] data = new Object[2];
data[0] = givenMap;
data[1] = "text";
String testData = "'{\"testKey\":\"testValue\"}','text'";
dataFormat.setObjectData(data);
assertEquals(testData, dataFormat.getTextData());
}
@Test
public void testMapWithCSVTextInCSVTextOut() {
Schema schema = new Schema("test");
schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
String testData = "'{\"testKey\":\"testValue\"}','text'";
dataFormat.setTextData(testData);
assertEquals(testData, dataFormat.getTextData());
}
//**************test cases for schema******************* //**************test cases for schema*******************
@Test(expected=SqoopException.class) @Test(expected=SqoopException.class)
public void testEmptySchema() { public void testEmptySchema() {