From 293e9ef63625538a7020ebc16f6bacbd27a2f6ea Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Fri, 12 Dec 2014 20:26:26 -0600 Subject: [PATCH] SQOOP-1813: Sqoop2: Add SqoopIDFUtils class and unit tests (Veena Basavaraj via Abraham Elmahrek) --- .../sqoop/connector/common/SqoopIDFUtils.java | 396 ++++++++++++++++++ .../idf/CSVIntermediateDataFormat.java | 354 ++-------------- .../connector/idf/IntermediateDataFormat.java | 4 +- .../connector/common/TestSqoopIDFUtils.java | 162 +++++++ .../idf/TestCSVIntermediateDataFormat.java | 27 +- 5 files changed, 603 insertions(+), 340 deletions(-) create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java create mode 100644 connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java new file mode 100644 index 00000000..48adae1c --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormatError; +import org.apache.sqoop.schema.type.AbstractComplexListType; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.ColumnType; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; + +/** + * Utility methods for connectors to encode data into the sqoop expected formats + * documented in + * https://cwiki.apache.org/confluence/display/SQOOP/Intermediate+Data + * +Format+API + * + */ + +public class SqoopIDFUtils { + + public static final String NULL_VALUE = "NULL"; + + // ISO-8859-1 is an 8-bit codec that is supported in every java + // implementation. + public static final String BYTE_FIELD_CHARSET = "ISO-8859-1"; + + public static final char[] originals = { 0x5C, 0x00, 0x0A, 0x0D, 0x1A, 0x22, 0x27 }; + + public static final char CSV_SEPARATOR_CHARACTER = ','; + public static final char ESCAPE_CHARACTER = '\\'; + public static final char QUOTE_CHARACTER = '\''; + + // string related replacements + private static final String[] replacements = { + new String(new char[] { ESCAPE_CHARACTER, '\\' }), + new String(new char[] { ESCAPE_CHARACTER, '0' }), + new String(new char[] { ESCAPE_CHARACTER, 'n' }), + new String(new char[] { ESCAPE_CHARACTER, 'r' }), + new String(new char[] { ESCAPE_CHARACTER, 'Z' }), + new String(new char[] { ESCAPE_CHARACTER, '\"' }), + new String(new char[] { ESCAPE_CHARACTER, '\'' }) + }; + + // http://www.joda.org/joda-time/key_format.html provides details on the + // formatter token + // can have fraction and or timezone + public static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ"); + public static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); + public static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"); + public static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ"); + + // only date, no time + public static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd"); + // time with fraction only, no timezone + public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS"); + public static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss"); + + static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" }; + static final Set TRUE_BIT_SET = new HashSet(Arrays.asList(TRUE_BIT_VALUES)); + static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" }; + static final Set FALSE_BIT_SET = new HashSet(Arrays.asList(FALSE_BIT_VALUES)); + + // ******** Number Column Type utils*********** + + public static Object toFixedPoint(String csvString, Column column) { + Object returnValue; + Long byteSize = ((FixedPoint) column).getByteSize(); + if (byteSize != null && byteSize <= Integer.SIZE) { + returnValue = Integer.valueOf(csvString); + } else { + returnValue = Long.valueOf(csvString); + } + return returnValue; + } + + public static Object toFloatingPoint(String csvString, Column column) { + Object returnValue; + Long byteSize = ((FloatingPoint) column).getByteSize(); + if (byteSize != null && byteSize <= Float.SIZE) { + returnValue = Float.valueOf(csvString); + } else { + returnValue = Double.valueOf(csvString); + } + return returnValue; + } + + public static Object toDecimal(String csvString, Column column) { + return new BigDecimal(csvString); + } + + // ********** BIT Column Type utils****************** + public static void encodeToCSVBit(Object[] objectArray, int i) { + String bitStringValue = objectArray[i].toString(); + if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { + objectArray[i] = bitStringValue; + } else { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + + objectArray[i]); + } + } + + public static Object toBit(String csvString, Object returnValue) { + if ((TRUE_BIT_SET.contains(csvString)) || (FALSE_BIT_SET.contains(csvString))) { + returnValue = TRUE_BIT_SET.contains(csvString); + } else { + // throw an exception for any unsupported value for BITs + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + csvString); + } + return returnValue; + } + + // *********** DATE and TIME Column Type utils ********** + + public static void encodeToCSVDate(Object[] objectArray, int i) { + org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i]; + objectArray[i] = encloseWithQuote(df.print(date)); + } + + public static void encodeToCSVTime(Object[] objectArray, int i, Column col) { + if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { + objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i])); + } else { + objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i])); + } + } + + public static Object toDate(String csvString, Column column) { + return LocalDate.parse(removeQuotes(csvString)); + } + + public static Object toTime(String csvString, Column column) { + return LocalTime.parse(removeQuotes(csvString)); + } + + // *********** DATE TIME Column Type utils ********** + + public static void encodeToCSVLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) { + org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; + if (column.hasFraction()) { + objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime)); + } else { + objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime)); + } + } + + public static void encodeToCSVDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) { + org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; + if (column.hasFraction() && column.hasTimezone()) { + objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime)); + } else if (column.hasFraction() && !column.hasTimezone()) { + objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime)); + } else if (column.hasTimezone()) { + objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime)); + } else { + objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime)); + } + } + + public static Object toDateTime(String fieldString, Column column) { + Object returnValue; + String dateTime = removeQuotes(fieldString); + org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column); + if (col.hasFraction() && col.hasTimezone()) { + // After calling withOffsetParsed method, a string + // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of + // -08:00 (a fixed zone, with no daylight savings rules) + returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime); + } else if (col.hasFraction() && !col.hasTimezone()) { + // we use local date time explicitly to not include the timezone + returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime); + } else if (col.hasTimezone()) { + returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime); + } else { + // we use local date time explicitly to not include the timezone + returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime); + } + return returnValue; + } + + // ************ MAP Column Type utils********* + + @SuppressWarnings("unchecked") + public static String encodeToCSVMap(Map map, Column column) { + JSONObject object = new JSONObject(); + object.putAll(map); + return encloseWithQuote(object.toJSONString()); + } + + public static Map toMap(String csvString) { + + JSONObject object = null; + try { + object = (JSONObject) new JSONParser().parse(removeQuotes(csvString)); + } catch (org.json.simple.parser.ParseException e) { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e); + } + if (object != null) { + return toMap(object); + } + return null; + } + + private static List toList(JSONArray array) { + List list = new ArrayList(); + for (int i = 0; i < array.size(); i++) { + Object value = array.get(i); + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + list.add(value); + } + return list; + } + + @SuppressWarnings("unchecked") + private static Map toMap(JSONObject object) { + Map elementMap = new HashMap(); + Set> entries = object.entrySet(); + for (Map.Entry entry : entries) { + Object value = entry.getValue(); + + if (value instanceof JSONArray) { + value = toList((JSONArray) value); + } + + else if (value instanceof JSONObject) { + value = toMap((JSONObject) value); + } + elementMap.put(entry.getKey(), value); + } + return elementMap; + } + + // ************ LIST Column Type utils********* + + @SuppressWarnings("unchecked") + public static String encodeToCSVList(Object[] list, AbstractComplexListType column) { + List elementList = new ArrayList(); + for (int n = 0; n < list.length; n++) { + Column listType = ((AbstractComplexListType) column).getListType(); + if (isColumnListType(listType)) { + Object[] listElements = (Object[]) list[n]; + elementList.add((Arrays.deepToString(listElements))); + } else { + elementList.add(list[n]); + } + } + JSONArray array = new JSONArray(); + array.addAll(elementList); + return encloseWithQuote(array.toJSONString()); + } + + public static Object[] toList(String csvString) { + + JSONArray array = null; + try { + array = (JSONArray) new JSONParser().parse(removeQuotes(csvString)); + } catch (org.json.simple.parser.ParseException e) { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e); + } + if (array != null) { + return array.toArray(); + } + return null; + } + + // ************ TEXT Column Type utils********* + + private static String getRegExp(char character) { + return getRegExp(String.valueOf(character)); + } + + private static String getRegExp(String string) { + return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); + } + + public static String encodeToCSVString(String string) { + int j = 0; + String replacement = string; + try { + for (j = 0; j < replacements.length; j++) { + replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j])); + } + } catch (Exception e) { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + + " " + String.valueOf(j) + " " + e.getMessage()); + } + return encloseWithQuote(replacement); + } + + public static String toText(String csvString) { + // Remove the trailing and starting quotes. + csvString = removeQuotes(csvString); + int j = 0; + try { + for (j = 0; j < replacements.length; j++) { + csvString = csvString.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j]))); + } + } catch (Exception e) { + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, csvString + " " + + String.valueOf(j) + e.getMessage()); + } + + return csvString; + } + + // ************ BINARY Column type utils********* + + public static String encodeToCSVByteArray(byte[] bytes) { + try { + return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET)); + } catch (UnsupportedEncodingException e) { + // We should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set " + + BYTE_FIELD_CHARSET + " is not available."); + } + } + + public static byte[] toByteArray(String csvString) { + // Always encoded in BYTE_FIELD_CHARSET. + try { + return toText(csvString).getBytes(BYTE_FIELD_CHARSET); + } catch (UnsupportedEncodingException e) { + // Should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set " + + BYTE_FIELD_CHARSET + " is not available."); + } + } + + // *********** SQOOP CSV standard encoding utils******************** + + public static String encloseWithQuote(String string) { + StringBuilder builder = new StringBuilder(); + builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER); + return builder.toString(); + } + + public static String removeQuotes(String string) { + // validate that the string has quotes + if (string.startsWith(String.valueOf(QUOTE_CHARACTER)) && string.endsWith(String.valueOf(QUOTE_CHARACTER))) { + return string.substring(1, string.length() - 1); + } + return string; + } + + // ********* utility methods for column type classification *********** + public static boolean isColumnListType(Column listType) { + return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET); + } + + public static boolean isColumnStringType(Column stringType) { + return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM); + } + +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index e4a83b1b..b377e2d5 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -18,6 +18,8 @@ */ package org.apache.sqoop.connector.idf; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; + import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -25,30 +27,16 @@ import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.joda.time.LocalDate; -import org.joda.time.LocalTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; /** * A concrete implementation for the {@link #IntermediateDataFormat} that @@ -63,42 +51,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class); - public static final char SEPARATOR_CHARACTER = ','; - public static final char ESCAPE_CHARACTER = '\\'; - public static final char QUOTE_CHARACTER = '\''; - - public static final String NULL_VALUE = "NULL"; - - private static final char[] originals = { - 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 - }; - - - private static final String[] replacements = { - new String(new char[] { ESCAPE_CHARACTER, '\\'}), - new String(new char[] { ESCAPE_CHARACTER, '0'}), - new String(new char[] { ESCAPE_CHARACTER, 'n'}), - new String(new char[] { ESCAPE_CHARACTER, 'r'}), - new String(new char[] { ESCAPE_CHARACTER, 'Z'}), - new String(new char[] { ESCAPE_CHARACTER, '\"'}), - new String(new char[] { ESCAPE_CHARACTER, '\''}) - }; - - // ISO-8859-1 is an 8-bit codec that is supported in every java implementation. - static final String BYTE_FIELD_CHARSET = "ISO-8859-1"; - // http://www.joda.org/joda-time/key_format.html provides details on the formatter token - // can have fraction and or timezone - static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ"); - static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); - static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"); - static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ"); - - // only date, no time - static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd"); - // time with fraction only, no timezone - static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS"); - static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss"); - private final List stringTypeColumnIndices = new ArrayList(); private final List bitTypeColumnIndices = new ArrayList(); private final List byteTypeColumnIndices = new ArrayList(); @@ -108,10 +60,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { private final List dateTypeColumnIndices = new ArrayList(); private final List timeColumnIndices = new ArrayList(); - static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" }; - static final Set TRUE_BIT_SET = new HashSet(Arrays.asList(TRUE_BIT_VALUES)); - static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" }; - static final Set FALSE_BIT_SET = new HashSet(Arrays.asList(FALSE_BIT_VALUES)); private Schema schema; @@ -202,7 +150,7 @@ private String[] getFieldStringArray() { builder.append(ESCAPE_CHARACTER); escaped = !escaped; break; - case SEPARATOR_CHARACTER: + case CSV_SEPARATOR_CHARACTER: if (quoted) { builder.append(c); } else { @@ -258,61 +206,46 @@ public Object[] getObjectData() { return objectArray; } - private Object parseCSVStringArrayElement(String fieldString, Column column) { + private Object parseCSVStringArrayElement(String csvString, Column column) { Object returnValue = null; switch (column.getType()) { case ENUM: case TEXT: - returnValue = unescapeString(fieldString); + returnValue = toText(csvString); break; case BINARY: // Unknown is treated as a binary type case UNKNOWN: - returnValue = unescapeByteArray(fieldString); + returnValue = toByteArray(csvString); break; case FIXED_POINT: - Long byteSize = ((FixedPoint) column).getByteSize(); - if (byteSize != null && byteSize <= Integer.SIZE) { - returnValue = Integer.valueOf(fieldString); - } else { - returnValue = Long.valueOf(fieldString); - } + returnValue = toFixedPoint(csvString, column); break; case FLOATING_POINT: - byteSize = ((FloatingPoint) column).getByteSize(); - if (byteSize != null && byteSize <= Float.SIZE) { - returnValue = Float.valueOf(fieldString); - } else { - returnValue = Double.valueOf(fieldString); - } + returnValue = toFloatingPoint(csvString, column); break; case DECIMAL: - returnValue = new BigDecimal(fieldString); + returnValue = toDecimal(csvString, column); break; case DATE: - returnValue = LocalDate.parse(removeQuotes(fieldString)); + returnValue = toDate(csvString, column); break; case TIME: - returnValue = LocalTime.parse(removeQuotes(fieldString)); + returnValue = toTime(csvString, column); break; case DATE_TIME: - returnValue = parseDateTime(fieldString, column); + returnValue = toDateTime(csvString, column); break; case BIT: - if ((TRUE_BIT_SET.contains(fieldString)) || (FALSE_BIT_SET.contains(fieldString))) { - returnValue = TRUE_BIT_SET.contains(fieldString); - } else { - // throw an exception for any unsupported value for BITs - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString); - } + returnValue = toBit(csvString, returnValue); break; case ARRAY: case SET: - returnValue = parseListElementFromJSON(fieldString); + returnValue = toList(csvString); break; case MAP: - returnValue = parseMapElementFromJSON(fieldString); + returnValue = toMap(csvString); break; default: throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004, @@ -321,89 +254,7 @@ private Object parseCSVStringArrayElement(String fieldString, Column column) { return returnValue; } - private Object parseDateTime(String fieldString, Column column) { - Object returnValue; - String dateTime = removeQuotes(fieldString); - org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column); - if (col.hasFraction() && col.hasTimezone()) { - // After calling withOffsetParsed method, a string - // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of - // -08:00 (a fixed zone, with no daylight savings rules) - returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime); - } else if (col.hasFraction() && !col.hasTimezone()) { - // we use local date time explicitly to not include the timezone - returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime); - } else if (col.hasTimezone()) { - returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime); - } else { - // we use local date time explicitly to not include the timezone - returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime); - } - return returnValue; - } - - private Object[] parseListElementFromJSON(String fieldString) { - - JSONArray array = null; - try { - array = (JSONArray) new JSONParser().parse(removeQuotes(fieldString)); - } catch (org.json.simple.parser.ParseException e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e); - } - if (array != null) { - return array.toArray(); - } - return null; - } - - private Map parseMapElementFromJSON(String fieldString) { - - JSONObject object = null; - try { - object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString)); - } catch (org.json.simple.parser.ParseException e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e); - } - if (object != null) { - return toMap(object); - } - return null; - } - - private List toList(JSONArray array) { - List list = new ArrayList(); - for (int i = 0; i < array.size(); i++) { - Object value = array.get(i); - if (value instanceof JSONArray) { - value = toList((JSONArray) value); - } - - else if (value instanceof JSONObject) { - value = toMap((JSONObject) value); - } - list.add(value); - } - return list; - } - - @SuppressWarnings("unchecked") - private Map toMap(JSONObject object) { - Map elementMap = new HashMap(); - Set> entries = object.entrySet(); - for (Map.Entry entry : entries) { - Object value = entry.getValue(); - - if (value instanceof JSONArray) { - value = toList((JSONArray) value); - } - - else if (value instanceof JSONObject) { - value = toMap((JSONObject) value); - } - elementMap.put(entry.getKey(), value); - } - return elementMap; - } + /** * Appends the actual java objects into CSV string {@inheritDoc} @@ -420,8 +271,8 @@ public void setObjectData(Object[] data) { } } // ignore the null values while encoding the object array into csv string - encodeCSVStringElements(data, columnArray, nullValueIndices); - this.data = StringUtils.join(data, SEPARATOR_CHARACTER); + encodeToCSVText(data, columnArray, nullValueIndices); + this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER); } /** @@ -468,27 +319,22 @@ public int compareTo(IntermediateDataFormat o) { return data.compareTo(o.getCSVTextData()); } - /** - * Sanitize every element of the CSV string based on the column type - * - * @param objectArray - */ + /** + * Encode to the sqoop prescribed CSV String for every element in the objet array + * @param objectArray + * @param columnArray + * @param nullValueIndices + */ @SuppressWarnings("unchecked") - private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, Set nullValueIndices) { + private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set nullValueIndices) { for (int i : bitTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - String bitStringValue = objectArray[i].toString(); - if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { - objectArray[i] = bitStringValue; - } else { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " - + objectArray[i]); - } + encodeToCSVBit(objectArray, i); } } for (int i : stringTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - objectArray[i] = escapeString((String) objectArray[i]); + objectArray[i] = encodeToCSVString((String) objectArray[i]); } } for (int i : dateTimeTypeColumnIndices) { @@ -497,173 +343,41 @@ private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, if (objectArray[i] instanceof org.joda.time.DateTime) { org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i]; // check for fraction and time zone and then use the right formatter - formatDateTime(objectArray, i, col, dateTime); + encodeToCSVDateTime(objectArray, i, col, dateTime); } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i]; - formatLocalDateTime(objectArray, i, col, localDateTime); + encodeToCSVLocalDateTime(objectArray, i, col, localDateTime); } } } for (int i : dateTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i]; - objectArray[i] = encloseWithQuote(df.print(date)); + encodeToCSVDate(objectArray, i); } } for (int i : timeColumnIndices) { Column col = columnArray[i]; if (!nullValueIndices.contains(i)) { - if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) { - objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i])); - } else { - objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i])); - } + encodeToCSVTime(objectArray, i, col); } } for (int i : byteTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - objectArray[i] = escapeByteArrays((byte[]) objectArray[i]); + objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]); } } for (int i : listTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]); + objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType)columnArray[i]); } } for (int i : mapTypeColumnIndices) { if (!nullValueIndices.contains(i)) { - objectArray[i] = encodeMap((Map) objectArray[i], columnArray[i]); + objectArray[i] = encodeToCSVMap((Map) objectArray[i], columnArray[i]); } } } - private void formatLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) { - org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; - if (column.hasFraction()) { - objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime)); - } else { - objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime)); - } - } - - private void formatDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) { - org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; - if (column.hasFraction() && column.hasTimezone()) { - objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime)); - } else if (column.hasFraction() && !column.hasTimezone()) { - objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime)); - } else if (column.hasTimezone()) { - objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime)); - } else { - objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime)); - } - } - - @SuppressWarnings("unchecked") - private String encodeMap(Map map, Column column) { - JSONObject object = new JSONObject(); - object.putAll(map); - return encloseWithQuote(object.toJSONString()); - } - - @SuppressWarnings("unchecked") - private String encodeList(Object[] list, Column column) { - List elementList = new ArrayList(); - for (int n = 0; n < list.length; n++) { - Column listType = ((AbstractComplexListType) column).getListType(); - if (isColumnListType(listType)) { - Object[] listElements = (Object[]) list[n]; - elementList.add((Arrays.deepToString(listElements))); - } else { - elementList.add(list[n]); - } - } - JSONArray array = new JSONArray(); - array.addAll(elementList); - return encloseWithQuote(array.toJSONString()); - } - - private boolean isColumnListType(Column listType) { - return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET); - } - - private boolean isColumnStringType(Column stringType) { - return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM); - } - - private String escapeByteArrays(byte[] bytes) { - try { - return escapeString(new String(bytes, BYTE_FIELD_CHARSET)); - } catch (UnsupportedEncodingException e) { - // We should never hit this case. - // This character set should be distributed with Java. - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, - "The character set " + BYTE_FIELD_CHARSET + " is not available."); - } - } - - private String getRegExp(char orig) { - return getRegExp(String.valueOf(orig)); - } - - private String getRegExp(String orig) { - return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); - } - - private String escapeString(String orig) { - int j = 0; - String replacement = orig; - try { - for (j = 0; j < replacements.length; j++) { - replacement = replacement.replaceAll(getRegExp(originals[j]), - Matcher.quoteReplacement(replacements[j])); - } - } catch (Exception e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, orig - + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage()); - } - return encloseWithQuote(replacement); - } - - private String encloseWithQuote(String string) { - StringBuilder builder = new StringBuilder(); - builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER); - return builder.toString(); - } - - private String unescapeString(String orig) { - // Remove the trailing and starting quotes. - orig = removeQuotes(orig); - int j = 0; - try { - for (j = 0; j < replacements.length; j++) { - orig = orig.replaceAll(getRegExp(replacements[j]), - Matcher.quoteReplacement(String.valueOf(originals[j]))); - } - } catch (Exception e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, orig - + " " + String.valueOf(j) + e.getMessage()); - } - - return orig; - } - - private String removeQuotes(String string) { - return string.substring(1, string.length() - 1); - } - - private byte[] unescapeByteArray(String orig) { - // Always encoded in BYTE_FIELD_CHARSET. - try { - return unescapeString(orig).getBytes(BYTE_FIELD_CHARSET); - } catch (UnsupportedEncodingException e) { - // Should never hit this case. - // This character set should be distributed with Java. - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, - "The character set " + BYTE_FIELD_CHARSET + " is not available."); - } - } - public String toString() { return data; } diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index e2199487..eca7c588 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -67,7 +67,7 @@ public void setData(T data) { this.data = data; } /** - * Get one row of data as CSV text. Use SqoopDataUtils for reading and writing + * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing * into the sqoop specified CSV text format for each {@link #ColumnType} field in the row * Why a "native" internal format and then return CSV text too? * Imagine a connector that moves data from a system that stores data as a @@ -93,7 +93,7 @@ public void setData(T data) { /** * Get one row of data as an Object array. Sqoop uses defined object representation - * for each column type. For instance org.joda.time to represent date.Use SqoopDataUtils + * for each column type. For instance org.joda.time to represent date.Use {@link #SqoopIDFUtils} * for reading and writing into the sqoop specified object format * for each {@link #ColumnType} field in the row *

diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java new file mode 100644 index 00000000..0dde2e7c --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java @@ -0,0 +1,162 @@ +package org.apache.sqoop.connector.common; + +import static org.junit.Assert.*; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; + +import org.apache.sqoop.schema.type.AbstractComplexListType; +import org.apache.sqoop.schema.type.Array; +import org.apache.sqoop.schema.type.Text; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestSqoopIDFUtils { + + public static String getByteFieldString(byte[] byteFieldData) { + try { + return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_CHARSET)).append("'").toString(); + } catch (UnsupportedEncodingException e) { + // Should never get to this point because ISO-8859-1 is a standard codec. + return null; + } + } + + @Test + public void testEncloseStringWithQuotes() { + String test = "test"; + String quotedText = encloseWithQuote(test); + assertEquals(quotedText, "'test'"); + + } + + @Test + public void testStringWithQuotesToEncloseStringWithQuotes() { + String test = "'test'"; + String quotedText = encloseWithQuote(test); + assertEquals(quotedText, "''test''"); + + } + + @Test + public void testRemoveQuotes() { + String test = "'test'"; + String quotedText = removeQuotes(test); + assertEquals(quotedText, "test"); + } + + @Test + public void testStringWithNoQuotesRemoveQuotes() { + String test = "test"; + String quotedText = removeQuotes(test); + assertEquals(quotedText, "test"); + } + + @Test + public void testStingWithNoQuotesRemoveQuotes() { + String test = "test"; + String quotedText = removeQuotes(test); + assertEquals(quotedText, "test"); + } + + @Test + public void testExample1EncodeToCSVString() { + String test = "test"; + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'test'"); + } + + @Test + public void testExample2EncodeToCSVString() { + String test = "test,test1"; + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'test,test1'"); + } + + @Test + public void testExample3EncodeToCSVString() { + String test = "test,'test1"; + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'test,\\'test1'"); + } + + @Test + public void testExample4EncodeToCSVString() { + String test = "test,\"test1"; + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'test,\\\"test1'"); + } + + @Test + public void testExample4ToString() { + String test = "'test,\\\"test1'"; + String expectedString = "test,\"test1"; + String toString = toText(test); + assertEquals(toString, expectedString); + } + + public void testExample5EncodeToCSVString() { + String test = new String(new char[] { 0x0A }); + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'\\n'"); + } + + public void testExample5ToString() { + String test = "'\\n'"; + String expectedString = new String(new char[] { 0x0A }); + String toString = toText(test); + assertEquals(toString, expectedString); + } + + public void testExample6EncodeToCSVString() { + String test = new String(new char[] { 0x0D }); + String encodedText = encodeToCSVString(test); + assertEquals(encodedText, "'\\r'"); + } + + @Test + public void testEncodeByteToCSVString() { + // byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 }; + String encodedText = encodeToCSVByteArray(bytes); + String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r"); + assertEquals(encodedText, expectedText); + } + + @Test + public void testEncodeArrayIntegersToCSVString() { + List list = new ArrayList(); + list.add(1); + list.add(2); + AbstractComplexListType array = new Array("a", new Text("t")); + String encodedText = encodeToCSVList(list.toArray(), array); + assertEquals(encodedText, "'[1,2]'"); + } + + @Test + public void testEncodeArrayStringsToCSVString() { + List list = new ArrayList(); + list.add("A"); + list.add("B"); + AbstractComplexListType array = new Array("a", new Text("t")); + String encodedText = encodeToCSVList(list.toArray(), array); + assertEquals(encodedText, "'[\"A\",\"B\"]'"); + } + + @Test + public void testEncodeMapToCSVString() { + List list = new ArrayList(); + list.add("A"); + list.add("B"); + Map map = new HashMap(); + map.put("A", list); + org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text( + "tr"))); + String encodedText = encodeToCSVMap(map, mapCol); + assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'"); + } + +} \ No newline at end of file diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 1a2a96fb..83a95ec6 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -22,8 +22,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; +import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -57,16 +58,6 @@ public void setUp() { dataFormat = new CSVIntermediateDataFormat(); } - private String getByteFieldString(byte[] byteFieldData) { - try { - return new StringBuilder("'") - .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET)) - .append("'").toString(); - } catch (UnsupportedEncodingException e) { - // Should never get to this point because ISO-8859-1 is a standard codec. - return null; - } - } //**************test cases for null and empty input******************* @@ -114,7 +105,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() { String csvText = dataFormat.getCSVTextData(); String[] textValues = csvText.split(","); for (String text : textValues) { - assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE); + assertEquals(text, NULL_VALUE); } } @@ -183,7 +174,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() { String csvText = dataFormat.getCSVTextData(); String[] textValues = csvText.split(","); for (String text : textValues) { - assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE); + assertEquals(text, NULL_VALUE); } } @@ -252,8 +243,8 @@ public void testInputAsObjectArayInCSVTextOut() { dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,'54','random data'," + - getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'"; + String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + + ",'\\n','TEST_ENUM'"; assertEquals(testData, dataFormat.getCSVTextData()); } @@ -315,8 +306,8 @@ public void testObjectArrayWithNullInCSVTextOut() { dataFormat.setObjectData(in); //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,NULL,'random data'," + - getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'"; + String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + + ",'\\n','TEST_ENUM'"; assertEquals(testData, dataFormat.getCSVTextData()); } @@ -521,7 +512,7 @@ public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOu dataFormat.setSchema(schema); // current date time org.joda.time.DateTime dateTime = new org.joda.time.DateTime(); - String dateTimeString = CSVIntermediateDataFormat.dtfWithFractionNoTimeZone.print(dateTime); + String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime); dataFormat.setCSVTextData("'" + dateTimeString + "'"); assertEquals(dateTimeString.replace(" ", "T"), dataFormat.getObjectData()[0].toString()); }