diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 4f2baf9b..bdab7a44 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -30,6 +30,7 @@ import org.joda.time.LocalDate; import org.joda.time.LocalDateTime; import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import java.io.DataInput; @@ -39,8 +40,11 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; @@ -85,6 +89,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { private final List stringTypeColumnIndices = new ArrayList(); private final List byteTypeColumnIndices = new ArrayList(); private final List listTypeColumnIndices = new ArrayList(); + private final List mapTypeColumnIndices = new ArrayList(); private Schema schema; @@ -129,6 +134,8 @@ public void setSchema(Schema schema) { byteTypeColumnIndices.add(i); } else if (isColumnListType(col)) { listTypeColumnIndices.add(i); + } else if (col.getType() == ColumnType.MAP) { + mapTypeColumnIndices.add(i); } i++; } @@ -147,7 +154,6 @@ private String[] getFieldStringArray() { boolean quoted = false; boolean escaped = false; - boolean insideJSON = false; List parsedData = new LinkedList(); StringBuilder builder = new StringBuilder(); @@ -167,7 +173,7 @@ private String[] getFieldStringArray() { escaped = !escaped; break; case SEPARATOR_CHARACTER: - if (quoted || insideJSON) { + if (quoted) { builder.append(c); } else { parsedData.add(builder.toString()); @@ -217,12 +223,12 @@ public Object[] getObjectData() { objectArray[i] = null; continue; } - objectArray[i] = parseStringArrayElement(fieldStringArray[i], columnArray[i]); + objectArray[i] = parseCSVStringArrayElement(fieldStringArray[i], columnArray[i]); } return objectArray; } - private Object parseStringArrayElement(String fieldString, Column column) { + private Object parseCSVStringArrayElement(String fieldString, Column column) { Object returnValue = null; switch (column.getType()) { @@ -271,6 +277,9 @@ private Object parseStringArrayElement(String fieldString, Column column) { case SET: returnValue = parseListElementFromJSON(fieldString); break; + case MAP: + returnValue = parseMapElementFromJSON(fieldString); + break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + column.getType()); @@ -287,11 +296,60 @@ private Object[] parseListElementFromJSON(String fieldString) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e); } if (array != null) { - return array.toArray(); + return array.toArray(); } return null; } + private Map parseMapElementFromJSON(String fieldString) { + + JSONObject object = null; + try { + object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString)); + } catch (org.json.simple.parser.ParseException e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0008, e); + } + if (object != null) { + return toMap(object); + } + return null; + } + + private List toList(JSONArray array) { + List list = new ArrayList(); + for (int i = 0; i < array.size(); i++) { + Object value = array.get(i); + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + list.add(value); + } + return list; + } + + @SuppressWarnings("unchecked") + private Map toMap(JSONObject object) { + Map elementMap = new HashMap(); + Set> entries = object.entrySet(); + for (Map.Entry entry : entries) { + Object value = entry.getValue(); + + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + elementMap.put(entry.getKey(), value); + } + return elementMap; + } + /** * Appends the actual java objects into CSV string {@inheritDoc} */ @@ -351,6 +409,7 @@ public int compareTo(IntermediateDataFormat o) { * * @param stringArray */ + @SuppressWarnings("unchecked") private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) { for (int i : stringTypeColumnIndices) { stringArray[i] = escapeString((String) stringArray[i]); @@ -361,6 +420,16 @@ private void encodeCSVStringElements(Object[] stringArray, Column[] columnArray) for (int i : listTypeColumnIndices) { stringArray[i] = encodeList((Object[]) stringArray[i], columnArray[i]); } + for (int i : mapTypeColumnIndices) { + stringArray[i] = encodeMap((Map) stringArray[i], columnArray[i]); + } + } + + @SuppressWarnings("unchecked") + private String encodeMap(Map map, Column column) { + JSONObject object = new JSONObject(); + object.putAll(map); + return encloseWithQuote(object.toJSONString()); } @SuppressWarnings("unchecked") diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index b6298978..bd082aab 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -26,12 +26,15 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; import org.apache.sqoop.schema.type.Date; @@ -43,8 +46,6 @@ public class TestCSVIntermediateDataFormat { - private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - private IntermediateDataFormat dataFormat; @Before @@ -54,8 +55,10 @@ public void setUp() { private String getByteFieldString(byte[] byteFieldData) { try { - return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); - } catch(UnsupportedEncodingException e) { + return new StringBuilder("'") + .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET)) + .append("'").toString(); + } catch (UnsupportedEncodingException e) { // Should never get to this point because ISO-8859-1 is a standard codec. return null; } @@ -566,7 +569,162 @@ public void testArrayOfObjectsWithObjectArrayInCSVTextOut() { String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'"; assertEquals(expected, dataFormat.getTextData()); } + //**************test cases for map********************** + @Test + public void testMapWithSimpleValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map map = new HashMap(); + map.put("testKey", "testValue"); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = map; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(map, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexIntegerListValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new FixedPoint("number")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + List intList = new ArrayList(); + intList.add(11); + intList.add(12); + givenMap.put("testKey", intList); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexStringListValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new Text("text")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + List stringList = new ArrayList(); + stringList.add("A"); + stringList.add("A"); + givenMap.put("testKey", stringList); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexMapValueWithObjectArrayInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value", + new Text("text")))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + List stringList = new ArrayList(); + stringList.add("A"); + stringList.add("A"); + Map> anotherMap = new HashMap>(); + anotherMap.put("anotherKey", stringList); + givenMap.put("testKey", anotherMap); + // create an array inside the object array + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + dataFormat.setObjectData(data); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(givenMap.toString(), expectedMap.toString()); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithCSVTextInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(givenMap, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithComplexValueWithCSVTextInObjectArrayOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + @SuppressWarnings("unchecked") + Map expectedMap = (Map) dataFormat.getObjectData()[0]; + assertEquals(givenMap, expectedMap); + assertEquals("text", dataFormat.getObjectData()[1]); + } + + @Test + public void testMapWithObjectArrayInCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + Map givenMap = new HashMap(); + givenMap.put("testKey", "testValue"); + Object[] data = new Object[2]; + data[0] = givenMap; + data[1] = "text"; + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setObjectData(data); + assertEquals(testData, dataFormat.getTextData()); + } + + @Test + public void testMapWithCSVTextInCSVTextOut() { + Schema schema = new Schema("test"); + schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value"))); + schema.addColumn(new org.apache.sqoop.schema.type.Text("2")); + dataFormat.setSchema(schema); + String testData = "'{\"testKey\":\"testValue\"}','text'"; + dataFormat.setTextData(testData); + assertEquals(testData, dataFormat.getTextData()); + } //**************test cases for schema******************* @Test(expected=SqoopException.class) public void testEmptySchema() {