5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 05:40:09 +08:00

SQOOP-1902: Sqoop2: Avro IDF class and unit tests

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-01-07 18:28:39 -08:00
parent 6d009c6c06
commit 3bb9c595d9
12 changed files with 1016 additions and 45 deletions

View File

@ -50,6 +50,10 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId> <groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId> <artifactId>sqoop-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.common;
import org.apache.avro.Schema;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class SqoopAvroUtils {
public static final String COLUMN_TYPE = "columnType";
public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
/**
* Creates an Avro schema from a Sqoop schema.
*/
public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
String name = sqoopSchema.getName();
String doc = sqoopSchema.getNote();
String namespace = SQOOP_SCHEMA_NAMESPACE;
Schema schema = Schema.createRecord(name, doc, namespace, false);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
for (Column column : sqoopSchema.getColumnsArray()) {
Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
field.addProp(COLUMN_TYPE, column.getType().toString());
fields.add(field);
}
schema.setFields(fields);
return schema;
}
public static Schema createAvroFieldSchema(Column column) {
Schema schema = toAvroFieldType(column);
if (!column.getNullable()) {
return schema;
} else {
List<Schema> union = new ArrayList<Schema>();
union.add(schema);
union.add(Schema.create(Schema.Type.NULL));
return Schema.createUnion(union);
}
}
public static Schema toAvroFieldType(Column column) throws IllegalArgumentException {
switch (column.getType()) {
case ARRAY:
case SET:
AbstractComplexListType listColumn = (AbstractComplexListType) column;
return Schema.createArray(toAvroFieldType(listColumn.getListType()));
case UNKNOWN:
case BINARY:
return Schema.create(Schema.Type.BYTES);
case BIT:
return Schema.create(Schema.Type.BOOLEAN);
case DATE:
case DATE_TIME:
case TIME:
// avro 1.8 will have date type
// https://issues.apache.org/jira/browse/AVRO-739
return Schema.create(Schema.Type.LONG);
case DECIMAL:
// TODO: is string ok, used it since kite code seems to use it
return Schema.create(Schema.Type.STRING);
case ENUM:
return createEnumSchema(column);
case FIXED_POINT:
Long byteSize = ((FixedPoint) column).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
return Schema.create(Schema.Type.INT);
} else {
return Schema.create(Schema.Type.LONG);
}
case FLOATING_POINT:
byteSize = ((FloatingPoint) column).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
return Schema.create(Schema.Type.FLOAT);
} else {
return Schema.create(Schema.Type.DOUBLE);
}
case MAP:
org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column;
return Schema.createArray(toAvroFieldType(mapColumn.getValue()));
case TEXT:
return Schema.create(Schema.Type.STRING);
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name());
}
}
public static Schema createEnumSchema(Column column) {
Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
List<String> listOptions = new ArrayList<String>(options);
return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
}
public static byte[] getBytesFromByteBuffer(Object obj) {
ByteBuffer buffer = (ByteBuffer) obj;
byte[] bytes = new byte[buffer.remaining()];
buffer.duplicate().get(bytes);
return bytes;
}
}

View File

@ -68,15 +68,10 @@ public class SqoopIDFUtils {
public static final char QUOTE_CHARACTER = '\''; public static final char QUOTE_CHARACTER = '\'';
// string related replacements // string related replacements
private static final String[] replacements = { private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }),
new String(new char[] { ESCAPE_CHARACTER, '\\' }), new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }),
new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
new String(new char[] { ESCAPE_CHARACTER, 'n' }), new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) };
new String(new char[] { ESCAPE_CHARACTER, 'r' }),
new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
new String(new char[] { ESCAPE_CHARACTER, '\"' }),
new String(new char[] { ESCAPE_CHARACTER, '\'' })
};
// http://www.joda.org/joda-time/key_format.html provides details on the // http://www.joda.org/joda-time/key_format.html provides details on the
// formatter token // formatter token
@ -140,8 +135,9 @@ public static Object toFloatingPoint(String csvString, Column column) {
} }
public static String encodeToCSVDecimal(Object obj) { public static String encodeToCSVDecimal(Object obj) {
return ((BigDecimal)obj).toString(); return ((BigDecimal) obj).toString();
} }
public static Object toDecimal(String csvString, Column column) { public static Object toDecimal(String csvString, Column column) {
return new BigDecimal(csvString); return new BigDecimal(csvString);
} }
@ -152,7 +148,8 @@ public static String encodeToCSVBit(Object obj) {
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
return bitStringValue; return bitStringValue;
} else { } else {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue); throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: "
+ bitStringValue);
} }
} }
@ -161,7 +158,7 @@ public static Object toBit(String csvString) {
return TRUE_BIT_SET.contains(csvString); return TRUE_BIT_SET.contains(csvString);
} else { } else {
// throw an exception for any unsupported value for BITs // throw an exception for any unsupported value for BITs
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString); throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + csvString);
} }
} }
@ -200,7 +197,6 @@ public static String encodeToCSVLocalDateTime(Object obj, Column col) {
} }
} }
public static String encodeToCSVDateTime(Object obj, Column col) { public static String encodeToCSVDateTime(Object obj, Column col) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj; org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
@ -236,6 +232,27 @@ public static Object toDateTime(String csvString, Column column) {
return returnValue; return returnValue;
} }
public static Long toDateTimeInMillis(String csvString, Column column) {
long returnValue;
String dateTime = removeQuotes(csvString);
org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
if (col.hasFraction() && col.hasTimezone()) {
// After calling withOffsetParsed method, a string
// '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
// -08:00 (a fixed zone, with no daylight savings rules)
returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
} else if (col.hasFraction() && !col.hasTimezone()) {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
} else if (col.hasTimezone()) {
returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
} else {
// we use local date time explicitly to not include the timezone
returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
}
return returnValue;
}
// ************ MAP Column Type utils********* // ************ MAP Column Type utils*********
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -301,7 +318,7 @@ public static String encodeToCSVList(Object[] list, Column column) {
List<Object> elementList = new ArrayList<Object>(); List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) { for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType(); Column listType = ((AbstractComplexListType) column).getListType();
//2 level nesting supported // 2 level nesting supported
if (isColumnListType(listType)) { if (isColumnListType(listType)) {
Object[] listElements = (Object[]) list[n]; Object[] listElements = (Object[]) list[n];
JSONArray subArray = new JSONArray(); JSONArray subArray = new JSONArray();
@ -332,6 +349,44 @@ public static Object[] toList(String csvString) {
return null; return null;
} }
@SuppressWarnings("unchecked")
public static JSONArray toJSONArray(Object[] objectArray) {
JSONArray jsonArray = new JSONArray();
for (int i = 0; i < objectArray.length; i++) {
Object value = objectArray[i];
if (value instanceof Object[]) {
value = toJSONArray((Object[]) value);
}
jsonArray.add(value);
}
return jsonArray;
}
public static List<Object> toList(Object[] objectArray) {
List<Object> objList = new ArrayList<Object>();
for (int i = 0; i < objectArray.length; i++) {
Object value = objectArray[i];
if (value instanceof Object[]) {
value = toList((Object[]) value);
}
objList.add(value);
}
return objList;
}
@SuppressWarnings("unchecked")
public static Object[] toObjectArray(List<Object> list) {
Object[] array = new Object[list.size()];
for (int i = 0; i < list.size(); i++) {
Object value = list.get(i);
if (value instanceof List) {
value = toObjectArray((List<Object>) value);
}
array[i] = value;
}
return array;
}
// ************ TEXT Column Type utils********* // ************ TEXT Column Type utils*********
private static String getRegExp(char character) { private static String getRegExp(char character) {
@ -350,8 +405,8 @@ public static String encodeToCSVString(String string) {
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j])); replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
} }
} catch (Exception e) { } catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + " " throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
+ String.valueOf(j) + " " + e.getMessage()); + " " + String.valueOf(j) + " " + e.getMessage());
} }
return encloseWithQuote(replacement); return encloseWithQuote(replacement);
} }
@ -365,8 +420,8 @@ public static String toText(String string) {
string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j]))); string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
} }
} catch (Exception e) { } catch (Exception e) {
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " " + String.valueOf(j) throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " "
+ e.getMessage()); + String.valueOf(j) + e.getMessage());
} }
return string; return string;

View File

@ -0,0 +1,431 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
import static org.apache.sqoop.connector.common.SqoopAvroUtils.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.utils.ClassUtils;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* IDF representing the intermediate format in Avro object
*/
public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRecord> {
private Schema avroSchema;
// need this default constructor for reflection magic used in execution engine
public AVROIntermediateDataFormat() {
}
// We need schema at all times
public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) {
setSchema(schema);
avroSchema = createAvroSchema(schema);
}
/**
* {@inheritDoc}
*/
@Override
public void setCSVTextData(String text) {
// convert the CSV text to avro
this.data = toAVRO(text);
}
/**
* {@inheritDoc}
*/
@Override
public String getCSVTextData() {
// convert avro to sqoop CSV
return toCSV(data);
}
/**
* {@inheritDoc}
*/
@Override
public void setObjectData(Object[] data) {
// convert the object array to avro
this.data = toAVRO(data);
}
/**
* {@inheritDoc}
*/
@Override
public Object[] getObjectData() {
// convert avro to object array
return toObject(data);
}
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
// do we need to write the schema?
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null);
writer.write(data, encoder);
}
/**
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(avroSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null);
data = reader.read(null, decoder);
}
/**
* {@inheritDoc}
*/
@Override
public Set<String> getJars() {
Set<String> jars = super.getJars();
jars.add(ClassUtils.jarForClass(GenericRecord.class));
return jars;
}
private GenericRecord toAVRO(String csv) {
String[] csvStringArray = parseCSVString(csv);
if (csvStringArray == null) {
return null;
}
if (csvStringArray.length != schema.getColumnsArray().length) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
+ " has the wrong number of fields.");
}
GenericRecord avroObject = new GenericData.Record(avroSchema);
Column[] columnArray = schema.getColumnsArray();
for (int i = 0; i < csvStringArray.length; i++) {
// check for NULL field and assume this field is nullable as per the sqoop
// schema
if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].getNullable()) {
avroObject.put(columnArray[i].getName(), null);
continue;
}
avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i]));
}
return avroObject;
}
private Object toAVRO(String csvString, Column column) {
Object returnValue = null;
switch (column.getType()) {
case ARRAY:
case SET:
Object[] list = toList(csvString);
// store as a java collection
returnValue = Arrays.asList(list);
break;
case MAP:
// store as a map
returnValue = toMap(csvString);
break;
case ENUM:
returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString)));
break;
case TEXT:
returnValue = new Utf8(removeQuotes(csvString));
break;
case BINARY:
case UNKNOWN:
// avro accepts byte buffer for binary data
returnValue = ByteBuffer.wrap(toByteArray(csvString));
break;
case FIXED_POINT:
returnValue = toFixedPoint(csvString, column);
break;
case FLOATING_POINT:
returnValue = toFloatingPoint(csvString, column);
break;
case DECIMAL:
// TODO: store as FIXED in SQOOP-16161
returnValue = removeQuotes(csvString);
break;
case DATE:
// until 1.8 avro store as long
returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime();
break;
case TIME:
// until 1.8 avro store as long
returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis();
break;
case DATE_TIME:
// until 1.8 avro store as long
returnValue = toDateTimeInMillis(csvString, column);
break;
case BIT:
returnValue = Boolean.valueOf(removeQuotes(csvString));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
"Column type from schema was not recognized for " + column.getType());
}
return returnValue;
}
private GenericRecord toAVRO(Object[] data) {
if (data == null) {
return null;
}
if (data.length != schema.getColumnsArray().length) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+ " has the wrong number of fields.");
}
// get avro schema from sqoop schema
GenericRecord avroObject = new GenericData.Record(avroSchema);
Column[] cols = schema.getColumnsArray();
for (int i = 0; i < data.length; i++) {
switch (cols[i].getType()) {
case ARRAY:
case SET:
avroObject.put(cols[i].getName(), toList((Object[]) data[i]));
break;
case MAP:
avroObject.put(cols[i].getName(), data[i]);
break;
case ENUM:
GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]);
avroObject.put(cols[i].getName(), enumValue);
break;
case TEXT:
avroObject.put(cols[i].getName(), new Utf8((String) data[i]));
break;
case BINARY:
case UNKNOWN:
avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i]));
break;
case FIXED_POINT:
case FLOATING_POINT:
avroObject.put(cols[i].getName(), data[i]);
break;
case DECIMAL:
// TODO: store as FIXED in SQOOP-16161
avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString());
break;
case DATE_TIME:
if (data[i] instanceof org.joda.time.DateTime) {
avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime());
} else if (data[i] instanceof org.joda.time.LocalDateTime) {
avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime());
}
break;
case TIME:
avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis());
break;
case DATE:
avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime());
break;
case BIT:
avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i]));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
"Column type from schema was not recognized for " + cols[i].getType());
}
}
return avroObject;
}
@SuppressWarnings("unchecked")
private String toCSV(GenericRecord record) {
Column[] cols = this.schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
for (int i = 0; i < cols.length; i++) {
Object obj = record.get(cols[i].getName());
if (obj == null) {
throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
}
switch (cols[i].getType()) {
case ARRAY:
case SET:
List<Object> objList = (List<Object>) obj;
csvString.append(encodeToCSVList(toObjectArray(objList), cols[i]));
break;
case MAP:
Map<Object, Object> objMap = (Map<Object, Object>) obj;
csvString.append(encodeToCSVMap(objMap, cols[i]));
break;
case ENUM:
case TEXT:
csvString.append(encodeToCSVString(obj.toString()));
break;
case BINARY:
case UNKNOWN:
csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj)));
break;
case FIXED_POINT:
csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
break;
case FLOATING_POINT:
csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
break;
case DECIMAL:
// stored as string
csvString.append(encodeToCSVDecimal(obj));
break;
case DATE:
// stored as long
Long dateInMillis = (Long) obj;
csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis)));
break;
case TIME:
// stored as long
Long timeInMillis = (Long) obj;
csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
break;
case DATE_TIME:
// stored as long
Long dateTimeInMillis = (Long) obj;
csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
break;
case BIT:
csvString.append(encodeToCSVBit(obj));
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
"Column type from schema was not recognized for " + cols[i].getType());
}
if (i < cols.length - 1) {
csvString.append(CSV_SEPARATOR_CHARACTER);
}
}
return csvString.toString();
}
@SuppressWarnings("unchecked")
private Object[] toObject(GenericRecord record) {
if (data == null) {
return null;
}
Column[] cols = schema.getColumnsArray();
Object[] object = new Object[cols.length];
for (int i = 0; i < cols.length; i++) {
Object obj = record.get(cols[i].getName());
if (obj == null) {
throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
}
Integer nameIndex = schema.getColumnNameIndex(cols[i].getName());
Column column = cols[nameIndex];
switch (column.getType()) {
case ARRAY:
case SET:
object[nameIndex] = toObjectArray((List<Object>) obj);
break;
case MAP:
object[nameIndex] = obj;
break;
case ENUM:
// stored as enum symbol
case TEXT:
// stored as UTF8
object[nameIndex] = obj.toString();
break;
case BINARY:
case UNKNOWN:
// stored as byte buffer
object[nameIndex] = getBytesFromByteBuffer(obj);
break;
case FIXED_POINT:
case FLOATING_POINT:
// stored as java objects in avro as well
object[nameIndex] = obj;
break;
case DECIMAL:
// stored as string
object[nameIndex] = obj.toString();
break;
case DATE:
Long dateInMillis = (Long) obj;
object[nameIndex] = new org.joda.time.LocalDate(dateInMillis);
break;
case TIME:
Long timeInMillis = (Long) obj;
object[nameIndex] = new org.joda.time.LocalTime(timeInMillis);
break;
case DATE_TIME:
Long dateTimeInMillis = (Long) obj;
if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) {
object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis);
} else {
object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis);
}
break;
case BIT:
object[nameIndex] = toBit(obj.toString());
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
"Column type from schema was not recognized for " + cols[i].getType());
}
}
return object;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.common.ErrorCode;
public enum AVROIntermediateDataFormatError implements ErrorCode {
/** An unknown error has occurred. */
AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.")
;
private final String message;
private AVROIntermediateDataFormatError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -152,7 +152,7 @@ private Object toObject(String csvString, Column column) {
returnValue = toMap(csvString); returnValue = toMap(csvString);
break; break;
default: default:
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004, throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
"Column type from schema was not recognized for " + column.getType()); "Column type from schema was not recognized for " + column.getType());
} }
return returnValue; return returnValue;

View File

@ -33,17 +33,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode {
/** Error while escaping a row. */ /** Error while escaping a row. */
CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."), CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
/** Column type isn't known by Intermediate Data Format. */
CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
/** /**
* For arrays and maps we use JSON representation and incorrect representation * For arrays and maps we use JSON representation and incorrect representation
* results in parse exception * results in parse exception
*/ */
CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."), CSV_INTERMEDIATE_DATA_FORMAT_0004("JSON parse internal error."),
/** Unsupported bit values */ /** Unsupported bit values */
CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."), CSV_INTERMEDIATE_DATA_FORMAT_0005("Unsupported bit value."),
; ;

View File

@ -18,11 +18,10 @@
*/ */
package org.apache.sqoop.connector.idf; package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType;
import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType; import org.apache.sqoop.schema.type.ColumnType;
@ -30,9 +29,6 @@
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**

View File

@ -32,6 +32,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"), INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
/** Column type isn't known by Intermediate Data Format. */
INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
; ;
private final String message; private final String message;

View File

@ -189,7 +189,7 @@ private Object toJSON(String csvString, Column column) {
returnValue = Boolean.valueOf(removeQuotes(csvString)); returnValue = Boolean.valueOf(removeQuotes(csvString));
break; break;
default: default:
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004, throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
"Column type from schema was not recognized for " + column.getType()); "Column type from schema was not recognized for " + column.getType());
} }
return returnValue; return returnValue;
@ -261,19 +261,6 @@ private JSONObject toJSON(Object[] data) {
return object; return object;
} }
@SuppressWarnings("unchecked")
public static JSONArray toJSONArray(Object[] objectArray) {
JSONArray jsonArray = new JSONArray();
for (int i = 0; i < objectArray.length; i++) {
Object value = objectArray[i];
if (value instanceof Object[]) {
value = toJSONArray((Object[]) value);
}
jsonArray.add(value);
}
return jsonArray;
}
private String toCSV(JSONObject json) { private String toCSV(JSONObject json) {
Column[] cols = this.schema.getColumnsArray(); Column[] cols = this.schema.getColumnsArray();

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import static org.junit.Assert.assertEquals;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.sqoop.connector.common.SqoopAvroUtils;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Array;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.Bit;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
import org.joda.time.LocalDateTime;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestAVROIntermediateDataFormat {
private AVROIntermediateDataFormat dataFormat;
private org.apache.avro.Schema avroSchema;
private final static String csvArray = "'[[11,11],[14,15]]'";
private final static String map = "'{\"testKey\":\"testValue\"}'";
private final static String csvSet = "'[[11,12],[14,15]]'";
private final static String csvDate = "'2014-10-01'";
private final static String csvDateTime = "'2014-10-01 12:00:00.000'";
private final static String csvTime = "'12:59:59'";
private Column enumCol;
// no time zone
private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
@Before
public void setUp() {
createAvroIDF();
}
private void createAvroIDF() {
Schema sqoopSchema = new Schema("test");
Set<String> options = new HashSet<String>();
options.add("ENUM");
options.add("NUME");
enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options);
sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three"))
.addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol)
.addColumn(new Array("eight", new Array("array", new FixedPoint("ft"))))
.addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten"))
.addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false))
.addColumn(new org.apache.sqoop.schema.type.Time("twelve", false))
.addColumn(new org.apache.sqoop.schema.type.Date("thirteen"))
.addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen"))
.addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw"))));
dataFormat = new AVROIntermediateDataFormat(sqoopSchema);
avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema);
}
/**
* setCSVGetData setCSVGetObjectArray setCSVGetCSV
*/
@Test
public void testInputAsCSVTextInAndDataOut() {
String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ ",13.44," + csvSet;
dataFormat.setCSVTextData(csvText);
GenericRecord avroObject = createAvroGenericRecord();
assertEquals(avroObject.toString(), dataFormat.getData().toString());
}
@Test
public void testInputAsCSVTextInAndObjectArrayOut() {
String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ ",13.44," + csvSet;
dataFormat.setCSVTextData(csvText);
assertEquals(dataFormat.getObjectData().length, 15);
assertObjectArray();
}
private void assertObjectArray() {
Object[] out = dataFormat.getObjectData();
assertEquals(10L, out[0]);
assertEquals(34, out[1]);
assertEquals("54", out[2]);
assertEquals("random data", out[3]);
assertEquals(-112, ((byte[]) out[4])[0]);
assertEquals(54, ((byte[]) out[4])[1]);
assertEquals("10", out[5]);
assertEquals("ENUM", out[6]);
Object[] givenArrayOne = new Object[2];
givenArrayOne[0] = 11;
givenArrayOne[1] = 11;
Object[] givenArrayTwo = new Object[2];
givenArrayTwo[0] = 14;
givenArrayTwo[1] = 15;
Object[] arrayOfArrays = new Object[2];
arrayOfArrays[0] = givenArrayOne;
arrayOfArrays[1] = givenArrayTwo;
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("testKey", "testValue");
Object[] set0 = new Object[2];
set0[0] = 11;
set0[1] = 12;
Object[] set1 = new Object[2];
set1[0] = 14;
set1[1] = 15;
Object[] set = new Object[2];
set[0] = set0;
set[1] = set1;
out[14] = set;
assertEquals(arrayOfArrays.length, 2);
assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7]));
assertEquals(map, out[8]);
assertEquals(true, out[9]);
assertEquals(dateTime, out[10]);
assertEquals(time, out[11]);
assertEquals(date, out[12]);
assertEquals(13.44, out[13]);
assertEquals(set.length, 2);
assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14]));
}
@Test
public void testInputAsCSVTextInCSVTextOut() {
String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ ",13.44," + csvSet;
dataFormat.setCSVTextData(csvText);
assertEquals(csvText, dataFormat.getCSVTextData());
}
private GenericRecord createAvroGenericRecord() {
GenericRecord avroObject = new GenericData.Record(avroSchema);
avroObject.put("one", 10L);
avroObject.put("two", 34);
avroObject.put("three", new Utf8("54"));
avroObject.put("four", new Utf8("random data"));
// store byte array in byte buffer
byte[] b = new byte[] { (byte) -112, (byte) 54 };
avroObject.put("five", ByteBuffer.wrap(b));
avroObject.put("six", new Utf8(String.valueOf(0x0A)));
avroObject.put("seven", new GenericData.EnumSymbol(createEnumSchema(enumCol), "ENUM"));
List<Object> givenArrayOne = new ArrayList<Object>();
givenArrayOne.add(11);
givenArrayOne.add(11);
List<Object> givenArrayTwo = new ArrayList<Object>();
givenArrayTwo.add(14);
givenArrayTwo.add(15);
List<Object> arrayOfArrays = new ArrayList<Object>();
arrayOfArrays.add(givenArrayOne);
arrayOfArrays.add(givenArrayTwo);
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("testKey", "testValue");
avroObject.put("eight", arrayOfArrays);
avroObject.put("nine", map);
avroObject.put("ten", true);
// expect dates as strings
avroObject.put("eleven", dateTime.toDate().getTime());
avroObject.put("twelve", time.toDateTimeToday().getMillis());
avroObject.put("thirteen", date.toDate().getTime());
avroObject.put("fourteen", 13.44);
List<Object> givenSetOne = new ArrayList<Object>();
givenSetOne.add(11);
givenSetOne.add(12);
List<Object> givenSetTwo = new ArrayList<Object>();
givenSetTwo.add(14);
givenSetTwo.add(15);
List<Object> set = new ArrayList<Object>();
set.add(givenSetOne);
set.add(givenSetTwo);
avroObject.put("fifteen", set);
return avroObject;
}
/**
* setDataGetCSV setDataGetObjectArray setDataGetData
*/
@Test
public void testInputAsDataInAndCSVOut() {
String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ ",13.44," + csvSet;
dataFormat.setData(createAvroGenericRecord());
assertEquals(csvExpected, dataFormat.getCSVTextData());
}
@Test
public void testInputAsDataInAndObjectArrayOut() {
GenericRecord avroObject = createAvroGenericRecord();
dataFormat.setData(avroObject);
assertObjectArray();
}
@Test
public void testInputAsDataInAndDataOut() {
GenericRecord avroObject = createAvroGenericRecord();
dataFormat.setData(avroObject);
assertEquals(avroObject, dataFormat.getData());
}
private Object[] createObjectArray() {
Object[] out = new Object[15];
out[0] = 10L;
out[1] = 34;
out[2] = "54";
out[3] = "random data";
out[4] = new byte[] { (byte) -112, (byte) 54 };
out[5] = String.valueOf(0x0A);
out[6] = "ENUM";
Object[] givenArrayOne = new Object[2];
givenArrayOne[0] = 11;
givenArrayOne[1] = 11;
Object[] givenArrayTwo = new Object[2];
givenArrayTwo[0] = 14;
givenArrayTwo[1] = 15;
Object[] arrayOfArrays = new Object[2];
arrayOfArrays[0] = givenArrayOne;
arrayOfArrays[1] = givenArrayTwo;
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("testKey", "testValue");
out[7] = arrayOfArrays;
out[8] = map;
out[9] = true;
out[10] = dateTime;
out[11] = time;
out[12] = date;
out[13] = 13.44;
Object[] set0 = new Object[2];
set0[0] = 11;
set0[1] = 12;
Object[] set1 = new Object[2];
set1[0] = 14;
set1[1] = 15;
Object[] set = new Object[2];
set[0] = set0;
set[1] = set1;
out[14] = set;
return out;
}
/**
* setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray
*/
@Test
public void testInputAsObjectArrayInAndDataOut() {
Object[] out = createObjectArray();
dataFormat.setObjectData(out);
GenericRecord avroObject = createAvroGenericRecord();
// SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types
assertEquals(avroObject.toString(), dataFormat.getData().toString());
}
@Test
public void testInputAsObjectArrayInAndCSVOut() {
Object[] out = createObjectArray();
dataFormat.setObjectData(out);
String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ ",13.44," + csvSet;
assertEquals(csvText, dataFormat.getCSVTextData());
}
@Test
public void testInputAsObjectArrayInAndObjectArrayOut() {
Object[] out = createObjectArray();
dataFormat.setObjectData(out);
assertObjectArray();
}
}

View File

@ -123,6 +123,7 @@ limitations under the License.
<jackson.core.version>2.2.2</jackson.core.version> <jackson.core.version>2.2.2</jackson.core.version>
<jackson.databind.version>2.2.2</jackson.databind.version> <jackson.databind.version>2.2.2</jackson.databind.version>
<jackson.annotations.version>2.2.2</jackson.annotations.version> <jackson.annotations.version>2.2.2</jackson.annotations.version>
<avro.version>1.7.7</avro.version>
</properties> </properties>
<dependencies> <dependencies>
@ -599,6 +600,11 @@ limitations under the License.
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
<version>${hadoop.2.version}</version> <version>${hadoop.2.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>