diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 46cb9f8d..27950848 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -50,6 +50,10 @@ limitations under the License.
org.apache.sqoop
sqoop-common
+
+ org.apache.avro
+ avro
+
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
new file mode 100644
index 00000000..e47d8fe2
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.avro.Schema;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class SqoopAvroUtils {
+
+ public static final String COLUMN_TYPE = "columnType";
+ public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
+
+ /**
+ * Creates an Avro schema from a Sqoop schema.
+ */
+ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
+ String name = sqoopSchema.getName();
+ String doc = sqoopSchema.getNote();
+ String namespace = SQOOP_SCHEMA_NAMESPACE;
+ Schema schema = Schema.createRecord(name, doc, namespace, false);
+
+ List fields = new ArrayList();
+ for (Column column : sqoopSchema.getColumnsArray()) {
+ Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
+ field.addProp(COLUMN_TYPE, column.getType().toString());
+ fields.add(field);
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ public static Schema createAvroFieldSchema(Column column) {
+ Schema schema = toAvroFieldType(column);
+ if (!column.getNullable()) {
+ return schema;
+ } else {
+ List union = new ArrayList();
+ union.add(schema);
+ union.add(Schema.create(Schema.Type.NULL));
+ return Schema.createUnion(union);
+ }
+ }
+
+ public static Schema toAvroFieldType(Column column) throws IllegalArgumentException {
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ AbstractComplexListType listColumn = (AbstractComplexListType) column;
+ return Schema.createArray(toAvroFieldType(listColumn.getListType()));
+ case UNKNOWN:
+ case BINARY:
+ return Schema.create(Schema.Type.BYTES);
+ case BIT:
+ return Schema.create(Schema.Type.BOOLEAN);
+ case DATE:
+ case DATE_TIME:
+ case TIME:
+ // avro 1.8 will have date type
+ // https://issues.apache.org/jira/browse/AVRO-739
+ return Schema.create(Schema.Type.LONG);
+ case DECIMAL:
+ // TODO: is string ok, used it since kite code seems to use it
+ return Schema.create(Schema.Type.STRING);
+ case ENUM:
+ return createEnumSchema(column);
+ case FIXED_POINT:
+ Long byteSize = ((FixedPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Integer.SIZE) {
+ return Schema.create(Schema.Type.INT);
+ } else {
+ return Schema.create(Schema.Type.LONG);
+ }
+ case FLOATING_POINT:
+ byteSize = ((FloatingPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Float.SIZE) {
+ return Schema.create(Schema.Type.FLOAT);
+ } else {
+ return Schema.create(Schema.Type.DOUBLE);
+ }
+ case MAP:
+ org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column;
+ return Schema.createArray(toAvroFieldType(mapColumn.getValue()));
+ case TEXT:
+ return Schema.create(Schema.Type.STRING);
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name());
+ }
+ }
+
+ public static Schema createEnumSchema(Column column) {
+ Set options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
+ List listOptions = new ArrayList(options);
+ return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
+ }
+
+ public static byte[] getBytesFromByteBuffer(Object obj) {
+ ByteBuffer buffer = (ByteBuffer) obj;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+
+}
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 979aa4f8..26ff6297 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -68,15 +68,10 @@ public class SqoopIDFUtils {
public static final char QUOTE_CHARACTER = '\'';
// string related replacements
- private static final String[] replacements = {
- new String(new char[] { ESCAPE_CHARACTER, '\\' }),
- new String(new char[] { ESCAPE_CHARACTER, '0' }),
- new String(new char[] { ESCAPE_CHARACTER, 'n' }),
- new String(new char[] { ESCAPE_CHARACTER, 'r' }),
- new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
- new String(new char[] { ESCAPE_CHARACTER, '\"' }),
- new String(new char[] { ESCAPE_CHARACTER, '\'' })
- };
+ private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }),
+ new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }),
+ new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
+ new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) };
// http://www.joda.org/joda-time/key_format.html provides details on the
// formatter token
@@ -140,8 +135,9 @@ public static Object toFloatingPoint(String csvString, Column column) {
}
public static String encodeToCSVDecimal(Object obj) {
- return ((BigDecimal)obj).toString();
+ return ((BigDecimal) obj).toString();
}
+
public static Object toDecimal(String csvString, Column column) {
return new BigDecimal(csvString);
}
@@ -152,7 +148,8 @@ public static String encodeToCSVBit(Object obj) {
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
return bitStringValue;
} else {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: "
+ + bitStringValue);
}
}
@@ -161,7 +158,7 @@ public static Object toBit(String csvString) {
return TRUE_BIT_SET.contains(csvString);
} else {
// throw an exception for any unsupported value for BITs
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + csvString);
}
}
@@ -200,7 +197,6 @@ public static String encodeToCSVLocalDateTime(Object obj, Column col) {
}
}
-
public static String encodeToCSVDateTime(Object obj, Column col) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
@@ -236,6 +232,27 @@ public static Object toDateTime(String csvString, Column column) {
return returnValue;
}
+ public static Long toDateTimeInMillis(String csvString, Column column) {
+ long returnValue;
+ String dateTime = removeQuotes(csvString);
+ org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
+ if (col.hasFraction() && col.hasTimezone()) {
+ // After calling withOffsetParsed method, a string
+ // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
+ // -08:00 (a fixed zone, with no daylight savings rules)
+ returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+ } else if (col.hasFraction() && !col.hasTimezone()) {
+ // we use local date time explicitly to not include the timezone
+ returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+ } else if (col.hasTimezone()) {
+ returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+ } else {
+ // we use local date time explicitly to not include the timezone
+ returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+ }
+ return returnValue;
+ }
+
// ************ MAP Column Type utils*********
@SuppressWarnings("unchecked")
@@ -301,7 +318,7 @@ public static String encodeToCSVList(Object[] list, Column column) {
List