5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 00:52:47 +08:00

SQOOP-1813: Sqoop2: Add SqoopIDFUtils class and unit tests

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-12 20:26:26 -06:00
parent dda5e1edd3
commit 293e9ef636
5 changed files with 603 additions and 340 deletions

View File

@ -0,0 +1,396 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.common;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormatError;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
/**
* Utility methods for connectors to encode data into the sqoop expected formats
* documented in
* https://cwiki.apache.org/confluence/display/SQOOP/Intermediate+Data
* +Format+API
*
*/
public class SqoopIDFUtils {
public static final String NULL_VALUE = "NULL";
// ISO-8859-1 is an 8-bit codec that is supported in every java
// implementation.
public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
public static final char[] originals = { 0x5C, 0x00, 0x0A, 0x0D, 0x1A, 0x22, 0x27 };
public static final char CSV_SEPARATOR_CHARACTER = ',';
public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\'';
// string related replacements
private static final String[] replacements = {
new String(new char[] { ESCAPE_CHARACTER, '\\' }),
new String(new char[] { ESCAPE_CHARACTER, '0' }),
new String(new char[] { ESCAPE_CHARACTER, 'n' }),
new String(new char[] { ESCAPE_CHARACTER, 'r' }),
new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
new String(new char[] { ESCAPE_CHARACTER, '\"' }),
new String(new char[] { ESCAPE_CHARACTER, '\'' })
};
// http://www.joda.org/joda-time/key_format.html provides details on the
// formatter token
// can have fraction and or timezone
public static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ");
public static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
public static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ");
// only date, no time
public static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
// time with fraction only, no timezone
public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
public static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
// ******** Number Column Type utils***********
public static Object toFixedPoint(String csvString, Column column) {
Object returnValue;
Long byteSize = ((FixedPoint) column).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
returnValue = Integer.valueOf(csvString);
} else {
returnValue = Long.valueOf(csvString);
}
return returnValue;
}
public static Object toFloatingPoint(String csvString, Column column) {
Object returnValue;
Long byteSize = ((FloatingPoint) column).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
returnValue = Float.valueOf(csvString);
} else {
returnValue = Double.valueOf(csvString);
}
return returnValue;
}
public static Object toDecimal(String csvString, Column column) {
return new BigDecimal(csvString);
}
// ********** BIT Column Type utils******************
public static void encodeToCSVBit(Object[] objectArray, int i) {
String bitStringValue = objectArray[i].toString();
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
objectArray[i] = bitStringValue;
} else {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
+ objectArray[i]);
}
}
public static Object toBit(String csvString, Object returnValue) {
if ((TRUE_BIT_SET.contains(csvString)) || (FALSE_BIT_SET.contains(csvString))) {
returnValue = TRUE_BIT_SET.contains(csvString);
} else {
// throw an exception for any unsupported value for BITs
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + csvString);
}
return returnValue;
}
// *********** DATE and TIME Column Type utils **********
public static void encodeToCSVDate(Object[] objectArray, int i) {
org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
objectArray[i] = encloseWithQuote(df.print(date));
}
public static void encodeToCSVTime(Object[] objectArray, int i, Column col) {
if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
} else {
objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
}
}
public static Object toDate(String csvString, Column column) {
return LocalDate.parse(removeQuotes(csvString));
}
public static Object toTime(String csvString, Column column) {
return LocalTime.parse(removeQuotes(csvString));
}
// *********** DATE TIME Column Type utils **********
public static void encodeToCSVLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction()) {
objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
} else {
objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
}
}
public static void encodeToCSVDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction() && column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
} else if (column.hasFraction() && !column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
} else if (column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
} else {
objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
}
}
public static Object toDateTime(String fieldString, Column column) {
Object returnValue;
String dateTime = removeQuotes(fieldString);
org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
if (col.hasFraction() && col.hasTimezone()) {
// After calling withOffsetParsed method, a string
// '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
// -08:00 (a fixed zone, with no daylight savings rules)
returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime);
} else if (col.hasFraction() && !col.hasTimezone()) {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime);
} else if (col.hasTimezone()) {
returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime);
} else {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime);
}
return returnValue;
}
// ************ MAP Column Type utils*********
@SuppressWarnings("unchecked")
public static String encodeToCSVMap(Map<Object, Object> map, Column column) {
JSONObject object = new JSONObject();
object.putAll(map);
return encloseWithQuote(object.toJSONString());
}
public static Map<Object, Object> toMap(String csvString) {
JSONObject object = null;
try {
object = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
} catch (org.json.simple.parser.ParseException e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
}
if (object != null) {
return toMap(object);
}
return null;
}
private static List<Object> toList(JSONArray array) {
List<Object> list = new ArrayList<Object>();
for (int i = 0; i < array.size(); i++) {
Object value = array.get(i);
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
list.add(value);
}
return list;
}
@SuppressWarnings("unchecked")
private static Map<Object, Object> toMap(JSONObject object) {
Map<Object, Object> elementMap = new HashMap<Object, Object>();
Set<Map.Entry<Object, Object>> entries = object.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
Object value = entry.getValue();
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
elementMap.put(entry.getKey(), value);
}
return elementMap;
}
// ************ LIST Column Type utils*********
@SuppressWarnings("unchecked")
public static String encodeToCSVList(Object[] list, AbstractComplexListType column) {
List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType();
if (isColumnListType(listType)) {
Object[] listElements = (Object[]) list[n];
elementList.add((Arrays.deepToString(listElements)));
} else {
elementList.add(list[n]);
}
}
JSONArray array = new JSONArray();
array.addAll(elementList);
return encloseWithQuote(array.toJSONString());
}
public static Object[] toList(String csvString) {
JSONArray array = null;
try {
array = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
} catch (org.json.simple.parser.ParseException e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
}
if (array != null) {
return array.toArray();
}
return null;
}
// ************ TEXT Column Type utils*********
private static String getRegExp(char character) {
return getRegExp(String.valueOf(character));
}
private static String getRegExp(String string) {
return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
}
public static String encodeToCSVString(String string) {
int j = 0;
String replacement = string;
try {
for (j = 0; j < replacements.length; j++) {
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
}
} catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
+ " " + String.valueOf(j) + " " + e.getMessage());
}
return encloseWithQuote(replacement);
}
public static String toText(String csvString) {
// Remove the trailing and starting quotes.
csvString = removeQuotes(csvString);
int j = 0;
try {
for (j = 0; j < replacements.length; j++) {
csvString = csvString.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
}
} catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, csvString + " "
+ String.valueOf(j) + e.getMessage());
}
return csvString;
}
// ************ BINARY Column type utils*********
public static String encodeToCSVByteArray(byte[] bytes) {
try {
return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
} catch (UnsupportedEncodingException e) {
// We should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set "
+ BYTE_FIELD_CHARSET + " is not available.");
}
}
public static byte[] toByteArray(String csvString) {
// Always encoded in BYTE_FIELD_CHARSET.
try {
return toText(csvString).getBytes(BYTE_FIELD_CHARSET);
} catch (UnsupportedEncodingException e) {
// Should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set "
+ BYTE_FIELD_CHARSET + " is not available.");
}
}
// *********** SQOOP CSV standard encoding utils********************
public static String encloseWithQuote(String string) {
StringBuilder builder = new StringBuilder();
builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
return builder.toString();
}
public static String removeQuotes(String string) {
// validate that the string has quotes
if (string.startsWith(String.valueOf(QUOTE_CHARACTER)) && string.endsWith(String.valueOf(QUOTE_CHARACTER))) {
return string.substring(1, string.length() - 1);
}
return string;
}
// ********* utility methods for column type classification ***********
public static boolean isColumnListType(Column listType) {
return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET);
}
public static boolean isColumnStringType(Column stringType) {
return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
}
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
@ -25,30 +27,16 @@
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
/**
* A concrete implementation for the {@link #IntermediateDataFormat} that
@ -63,42 +51,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
public static final char SEPARATOR_CHARACTER = ',';
public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\'';
public static final String NULL_VALUE = "NULL";
private static final char[] originals = {
0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
};
private static final String[] replacements = {
new String(new char[] { ESCAPE_CHARACTER, '\\'}),
new String(new char[] { ESCAPE_CHARACTER, '0'}),
new String(new char[] { ESCAPE_CHARACTER, 'n'}),
new String(new char[] { ESCAPE_CHARACTER, 'r'}),
new String(new char[] { ESCAPE_CHARACTER, 'Z'}),
new String(new char[] { ESCAPE_CHARACTER, '\"'}),
new String(new char[] { ESCAPE_CHARACTER, '\''})
};
// ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
// http://www.joda.org/joda-time/key_format.html provides details on the formatter token
// can have fraction and or timezone
static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ");
static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ");
// only date, no time
static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
// time with fraction only, no timezone
static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> bitTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
@ -108,10 +60,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> dateTypeColumnIndices = new ArrayList<Integer>();
private final List<Integer> timeColumnIndices = new ArrayList<Integer>();
static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
private Schema schema;
@ -202,7 +150,7 @@ private String[] getFieldStringArray() {
builder.append(ESCAPE_CHARACTER);
escaped = !escaped;
break;
case SEPARATOR_CHARACTER:
case CSV_SEPARATOR_CHARACTER:
if (quoted) {
builder.append(c);
} else {
@ -258,61 +206,46 @@ public Object[] getObjectData() {
return objectArray;
}
private Object parseCSVStringArrayElement(String fieldString, Column column) {
private Object parseCSVStringArrayElement(String csvString, Column column) {
Object returnValue = null;
switch (column.getType()) {
case ENUM:
case TEXT:
returnValue = unescapeString(fieldString);
returnValue = toText(csvString);
break;
case BINARY:
// Unknown is treated as a binary type
case UNKNOWN:
returnValue = unescapeByteArray(fieldString);
returnValue = toByteArray(csvString);
break;
case FIXED_POINT:
Long byteSize = ((FixedPoint) column).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
returnValue = Integer.valueOf(fieldString);
} else {
returnValue = Long.valueOf(fieldString);
}
returnValue = toFixedPoint(csvString, column);
break;
case FLOATING_POINT:
byteSize = ((FloatingPoint) column).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
returnValue = Float.valueOf(fieldString);
} else {
returnValue = Double.valueOf(fieldString);
}
returnValue = toFloatingPoint(csvString, column);
break;
case DECIMAL:
returnValue = new BigDecimal(fieldString);
returnValue = toDecimal(csvString, column);
break;
case DATE:
returnValue = LocalDate.parse(removeQuotes(fieldString));
returnValue = toDate(csvString, column);
break;
case TIME:
returnValue = LocalTime.parse(removeQuotes(fieldString));
returnValue = toTime(csvString, column);
break;
case DATE_TIME:
returnValue = parseDateTime(fieldString, column);
returnValue = toDateTime(csvString, column);
break;
case BIT:
if ((TRUE_BIT_SET.contains(fieldString)) || (FALSE_BIT_SET.contains(fieldString))) {
returnValue = TRUE_BIT_SET.contains(fieldString);
} else {
// throw an exception for any unsupported value for BITs
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString);
}
returnValue = toBit(csvString, returnValue);
break;
case ARRAY:
case SET:
returnValue = parseListElementFromJSON(fieldString);
returnValue = toList(csvString);
break;
case MAP:
returnValue = parseMapElementFromJSON(fieldString);
returnValue = toMap(csvString);
break;
default:
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
@ -321,89 +254,7 @@ private Object parseCSVStringArrayElement(String fieldString, Column column) {
return returnValue;
}
private Object parseDateTime(String fieldString, Column column) {
Object returnValue;
String dateTime = removeQuotes(fieldString);
org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
if (col.hasFraction() && col.hasTimezone()) {
// After calling withOffsetParsed method, a string
// '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
// -08:00 (a fixed zone, with no daylight savings rules)
returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime);
} else if (col.hasFraction() && !col.hasTimezone()) {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime);
} else if (col.hasTimezone()) {
returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime);
} else {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime);
}
return returnValue;
}
private Object[] parseListElementFromJSON(String fieldString) {
JSONArray array = null;
try {
array = (JSONArray) new JSONParser().parse(removeQuotes(fieldString));
} catch (org.json.simple.parser.ParseException e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
}
if (array != null) {
return array.toArray();
}
return null;
}
private Map<Object, Object> parseMapElementFromJSON(String fieldString) {
JSONObject object = null;
try {
object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString));
} catch (org.json.simple.parser.ParseException e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
}
if (object != null) {
return toMap(object);
}
return null;
}
private List<Object> toList(JSONArray array) {
List<Object> list = new ArrayList<Object>();
for (int i = 0; i < array.size(); i++) {
Object value = array.get(i);
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
list.add(value);
}
return list;
}
@SuppressWarnings("unchecked")
private Map<Object, Object> toMap(JSONObject object) {
Map<Object, Object> elementMap = new HashMap<Object, Object>();
Set<Map.Entry<Object, Object>> entries = object.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
Object value = entry.getValue();
if (value instanceof JSONArray) {
value = toList((JSONArray) value);
}
else if (value instanceof JSONObject) {
value = toMap((JSONObject) value);
}
elementMap.put(entry.getKey(), value);
}
return elementMap;
}
/**
* Appends the actual java objects into CSV string {@inheritDoc}
@ -420,8 +271,8 @@ public void setObjectData(Object[] data) {
}
}
// ignore the null values while encoding the object array into csv string
encodeCSVStringElements(data, columnArray, nullValueIndices);
this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
encodeToCSVText(data, columnArray, nullValueIndices);
this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
}
/**
@ -468,27 +319,22 @@ public int compareTo(IntermediateDataFormat<?> o) {
return data.compareTo(o.getCSVTextData());
}
/**
* Sanitize every element of the CSV string based on the column type
*
* @param objectArray
*/
/**
* Encode to the sqoop prescribed CSV String for every element in the objet array
* @param objectArray
* @param columnArray
* @param nullValueIndices
*/
@SuppressWarnings("unchecked")
private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
for (int i : bitTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
String bitStringValue = objectArray[i].toString();
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
objectArray[i] = bitStringValue;
} else {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
+ objectArray[i]);
}
encodeToCSVBit(objectArray, i);
}
}
for (int i : stringTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = escapeString((String) objectArray[i]);
objectArray[i] = encodeToCSVString((String) objectArray[i]);
}
}
for (int i : dateTimeTypeColumnIndices) {
@ -497,173 +343,41 @@ private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray,
if (objectArray[i] instanceof org.joda.time.DateTime) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
// check for fraction and time zone and then use the right formatter
formatDateTime(objectArray, i, col, dateTime);
encodeToCSVDateTime(objectArray, i, col, dateTime);
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
formatLocalDateTime(objectArray, i, col, localDateTime);
encodeToCSVLocalDateTime(objectArray, i, col, localDateTime);
}
}
}
for (int i : dateTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
objectArray[i] = encloseWithQuote(df.print(date));
encodeToCSVDate(objectArray, i);
}
}
for (int i : timeColumnIndices) {
Column col = columnArray[i];
if (!nullValueIndices.contains(i)) {
if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
} else {
objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
}
encodeToCSVTime(objectArray, i, col);
}
}
for (int i : byteTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]);
}
}
for (int i : listTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType)columnArray[i]);
}
}
for (int i : mapTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], columnArray[i]);
objectArray[i] = encodeToCSVMap((Map<Object, Object>) objectArray[i], columnArray[i]);
}
}
}
private void formatLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction()) {
objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
} else {
objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
}
}
private void formatDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction() && column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
} else if (column.hasFraction() && !column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
} else if (column.hasTimezone()) {
objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
} else {
objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
}
}
@SuppressWarnings("unchecked")
private String encodeMap(Map<Object, Object> map, Column column) {
JSONObject object = new JSONObject();
object.putAll(map);
return encloseWithQuote(object.toJSONString());
}
@SuppressWarnings("unchecked")
private String encodeList(Object[] list, Column column) {
List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType();
if (isColumnListType(listType)) {
Object[] listElements = (Object[]) list[n];
elementList.add((Arrays.deepToString(listElements)));
} else {
elementList.add(list[n]);
}
}
JSONArray array = new JSONArray();
array.addAll(elementList);
return encloseWithQuote(array.toJSONString());
}
private boolean isColumnListType(Column listType) {
return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET);
}
private boolean isColumnStringType(Column stringType) {
return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
}
private String escapeByteArrays(byte[] bytes) {
try {
return escapeString(new String(bytes, BYTE_FIELD_CHARSET));
} catch (UnsupportedEncodingException e) {
// We should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
"The character set " + BYTE_FIELD_CHARSET + " is not available.");
}
}
private String getRegExp(char orig) {
return getRegExp(String.valueOf(orig));
}
private String getRegExp(String orig) {
return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
}
private String escapeString(String orig) {
int j = 0;
String replacement = orig;
try {
for (j = 0; j < replacements.length; j++) {
replacement = replacement.replaceAll(getRegExp(originals[j]),
Matcher.quoteReplacement(replacements[j]));
}
} catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, orig
+ " " + replacement + " " + String.valueOf(j) + " " + e.getMessage());
}
return encloseWithQuote(replacement);
}
private String encloseWithQuote(String string) {
StringBuilder builder = new StringBuilder();
builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
return builder.toString();
}
private String unescapeString(String orig) {
// Remove the trailing and starting quotes.
orig = removeQuotes(orig);
int j = 0;
try {
for (j = 0; j < replacements.length; j++) {
orig = orig.replaceAll(getRegExp(replacements[j]),
Matcher.quoteReplacement(String.valueOf(originals[j])));
}
} catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, orig
+ " " + String.valueOf(j) + e.getMessage());
}
return orig;
}
private String removeQuotes(String string) {
return string.substring(1, string.length() - 1);
}
private byte[] unescapeByteArray(String orig) {
// Always encoded in BYTE_FIELD_CHARSET.
try {
return unescapeString(orig).getBytes(BYTE_FIELD_CHARSET);
} catch (UnsupportedEncodingException e) {
// Should never hit this case.
// This character set should be distributed with Java.
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
"The character set " + BYTE_FIELD_CHARSET + " is not available.");
}
}
public String toString() {
return data;
}

View File

@ -67,7 +67,7 @@ public void setData(T data) {
this.data = data;
}
/**
* Get one row of data as CSV text. Use SqoopDataUtils for reading and writing
* Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
* into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
* Why a "native" internal format and then return CSV text too?
* Imagine a connector that moves data from a system that stores data as a
@ -93,7 +93,7 @@ public void setData(T data) {
/**
* Get one row of data as an Object array. Sqoop uses defined object representation
* for each column type. For instance org.joda.time to represent date.Use SqoopDataUtils
* for each column type. For instance org.joda.time to represent date.Use {@link #SqoopIDFUtils}
* for reading and writing into the sqoop specified object format
* for each {@link #ColumnType} field in the row
* </p>

View File

@ -0,0 +1,162 @@
package org.apache.sqoop.connector.common;
import static org.junit.Assert.*;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Array;
import org.apache.sqoop.schema.type.Text;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestSqoopIDFUtils {
public static String getByteFieldString(byte[] byteFieldData) {
try {
return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_CHARSET)).append("'").toString();
} catch (UnsupportedEncodingException e) {
// Should never get to this point because ISO-8859-1 is a standard codec.
return null;
}
}
@Test
public void testEncloseStringWithQuotes() {
String test = "test";
String quotedText = encloseWithQuote(test);
assertEquals(quotedText, "'test'");
}
@Test
public void testStringWithQuotesToEncloseStringWithQuotes() {
String test = "'test'";
String quotedText = encloseWithQuote(test);
assertEquals(quotedText, "''test''");
}
@Test
public void testRemoveQuotes() {
String test = "'test'";
String quotedText = removeQuotes(test);
assertEquals(quotedText, "test");
}
@Test
public void testStringWithNoQuotesRemoveQuotes() {
String test = "test";
String quotedText = removeQuotes(test);
assertEquals(quotedText, "test");
}
@Test
public void testStingWithNoQuotesRemoveQuotes() {
String test = "test";
String quotedText = removeQuotes(test);
assertEquals(quotedText, "test");
}
@Test
public void testExample1EncodeToCSVString() {
String test = "test";
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'test'");
}
@Test
public void testExample2EncodeToCSVString() {
String test = "test,test1";
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'test,test1'");
}
@Test
public void testExample3EncodeToCSVString() {
String test = "test,'test1";
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'test,\\'test1'");
}
@Test
public void testExample4EncodeToCSVString() {
String test = "test,\"test1";
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'test,\\\"test1'");
}
@Test
public void testExample4ToString() {
String test = "'test,\\\"test1'";
String expectedString = "test,\"test1";
String toString = toText(test);
assertEquals(toString, expectedString);
}
public void testExample5EncodeToCSVString() {
String test = new String(new char[] { 0x0A });
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'\\n'");
}
public void testExample5ToString() {
String test = "'\\n'";
String expectedString = new String(new char[] { 0x0A });
String toString = toText(test);
assertEquals(toString, expectedString);
}
public void testExample6EncodeToCSVString() {
String test = new String(new char[] { 0x0D });
String encodedText = encodeToCSVString(test);
assertEquals(encodedText, "'\\r'");
}
@Test
public void testEncodeByteToCSVString() {
// byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 };
String encodedText = encodeToCSVByteArray(bytes);
String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r");
assertEquals(encodedText, expectedText);
}
@Test
public void testEncodeArrayIntegersToCSVString() {
List<Integer> list = new ArrayList<Integer>();
list.add(1);
list.add(2);
AbstractComplexListType array = new Array("a", new Text("t"));
String encodedText = encodeToCSVList(list.toArray(), array);
assertEquals(encodedText, "'[1,2]'");
}
@Test
public void testEncodeArrayStringsToCSVString() {
List<String> list = new ArrayList<String>();
list.add("A");
list.add("B");
AbstractComplexListType array = new Array("a", new Text("t"));
String encodedText = encodeToCSVList(list.toArray(), array);
assertEquals(encodedText, "'[\"A\",\"B\"]'");
}
@Test
public void testEncodeMapToCSVString() {
List<String> list = new ArrayList<String>();
list.add("A");
list.add("B");
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("A", list);
org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text(
"tr")));
String encodedText = encodeToCSVMap(map, mapCol);
assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'");
}
}

View File

@ -22,8 +22,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -57,16 +58,6 @@ public void setUp() {
dataFormat = new CSVIntermediateDataFormat();
}
private String getByteFieldString(byte[] byteFieldData) {
try {
return new StringBuilder("'")
.append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET))
.append("'").toString();
} catch (UnsupportedEncodingException e) {
// Should never get to this point because ISO-8859-1 is a standard codec.
return null;
}
}
//**************test cases for null and empty input*******************
@ -114,7 +105,7 @@ public void testNullValueAsObjectArrayInAndCSVTextOut() {
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
assertEquals(text, NULL_VALUE);
}
}
@ -183,7 +174,7 @@ public void testNullValueAsCSVTextInAndCSVTextOut() {
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
assertEquals(text, NULL_VALUE);
}
}
@ -252,8 +243,8 @@ public void testInputAsObjectArayInCSVTextOut() {
dataFormat.setObjectData(in);
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,'54','random data'," +
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
+ ",'\\n','TEST_ENUM'";
assertEquals(testData, dataFormat.getCSVTextData());
}
@ -315,8 +306,8 @@ public void testObjectArrayWithNullInCSVTextOut() {
dataFormat.setObjectData(in);
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,NULL,'random data'," +
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
+ ",'\\n','TEST_ENUM'";
assertEquals(testData, dataFormat.getCSVTextData());
}
@ -521,7 +512,7 @@ public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOu
dataFormat.setSchema(schema);
// current date time
org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
String dateTimeString = CSVIntermediateDataFormat.dtfWithFractionNoTimeZone.print(dateTime);
String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime);
dataFormat.setCSVTextData("'" + dateTimeString + "'");
assertEquals(dateTimeString.replace(" ", "T"), dataFormat.getObjectData()[0].toString());
}