From 3bb9c595d9d6df99b758c664604e5eccd971ef5c Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Wed, 7 Jan 2015 18:28:39 -0800 Subject: [PATCH] SQOOP-1902: Sqoop2: Avro IDF class and unit tests (Veena Basavaraj via Abraham Elmahrek) --- connector/connector-sdk/pom.xml | 4 + .../connector/common/SqoopAvroUtils.java | 128 ++++++ .../sqoop/connector/common/SqoopIDFUtils.java | 91 +++- .../idf/AVROIntermediateDataFormat.java | 431 ++++++++++++++++++ .../idf/AVROIntermediateDataFormatError.java | 45 ++ .../idf/CSVIntermediateDataFormat.java | 2 +- .../idf/CSVIntermediateDataFormatError.java | 7 +- .../connector/idf/IntermediateDataFormat.java | 8 +- .../idf/IntermediateDataFormatError.java | 3 + .../idf/JSONIntermediateDataFormat.java | 17 +- .../idf/TestAVROIntermediateDataFormat.java | 319 +++++++++++++ pom.xml | 6 + 12 files changed, 1016 insertions(+), 45 deletions(-) create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java create mode 100644 connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml index 46cb9f8d..27950848 100644 --- a/connector/connector-sdk/pom.xml +++ b/connector/connector-sdk/pom.xml @@ -50,6 +50,10 @@ limitations under the License. org.apache.sqoop sqoop-common + + org.apache.avro + avro + diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java new file mode 100644 index 00000000..e47d8fe2 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +import org.apache.avro.Schema; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormatError; +import org.apache.sqoop.schema.type.AbstractComplexListType; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class SqoopAvroUtils { + + public static final String COLUMN_TYPE = "columnType"; + public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop"; + + /** + * Creates an Avro schema from a Sqoop schema. + */ + public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) { + String name = sqoopSchema.getName(); + String doc = sqoopSchema.getNote(); + String namespace = SQOOP_SCHEMA_NAMESPACE; + Schema schema = Schema.createRecord(name, doc, namespace, false); + + List fields = new ArrayList(); + for (Column column : sqoopSchema.getColumnsArray()) { + Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null); + field.addProp(COLUMN_TYPE, column.getType().toString()); + fields.add(field); + } + schema.setFields(fields); + return schema; + } + + public static Schema createAvroFieldSchema(Column column) { + Schema schema = toAvroFieldType(column); + if (!column.getNullable()) { + return schema; + } else { + List union = new ArrayList(); + union.add(schema); + union.add(Schema.create(Schema.Type.NULL)); + return Schema.createUnion(union); + } + } + + public static Schema toAvroFieldType(Column column) throws IllegalArgumentException { + switch (column.getType()) { + case ARRAY: + case SET: + AbstractComplexListType listColumn = (AbstractComplexListType) column; + return Schema.createArray(toAvroFieldType(listColumn.getListType())); + case UNKNOWN: + case BINARY: + return Schema.create(Schema.Type.BYTES); + case BIT: + return Schema.create(Schema.Type.BOOLEAN); + case DATE: + case DATE_TIME: + case TIME: + // avro 1.8 will have date type + // https://issues.apache.org/jira/browse/AVRO-739 + return Schema.create(Schema.Type.LONG); + case DECIMAL: + // TODO: is string ok, used it since kite code seems to use it + return Schema.create(Schema.Type.STRING); + case ENUM: + return createEnumSchema(column); + case FIXED_POINT: + Long byteSize = ((FixedPoint) column).getByteSize(); + if (byteSize != null && byteSize <= Integer.SIZE) { + return Schema.create(Schema.Type.INT); + } else { + return Schema.create(Schema.Type.LONG); + } + case FLOATING_POINT: + byteSize = ((FloatingPoint) column).getByteSize(); + if (byteSize != null && byteSize <= Float.SIZE) { + return Schema.create(Schema.Type.FLOAT); + } else { + return Schema.create(Schema.Type.DOUBLE); + } + case MAP: + org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column; + return Schema.createArray(toAvroFieldType(mapColumn.getValue())); + case TEXT: + return Schema.create(Schema.Type.STRING); + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name()); + } + } + + public static Schema createEnumSchema(Column column) { + Set options = ((org.apache.sqoop.schema.type.Enum) column).getOptions(); + List listOptions = new ArrayList(options); + return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions); + } + + public static byte[] getBytesFromByteBuffer(Object obj) { + ByteBuffer buffer = (ByteBuffer) obj; + byte[] bytes = new byte[buffer.remaining()]; + buffer.duplicate().get(bytes); + return bytes; + } + +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java index 979aa4f8..26ff6297 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -68,15 +68,10 @@ public class SqoopIDFUtils { public static final char QUOTE_CHARACTER = '\''; // string related replacements - private static final String[] replacements = { - new String(new char[] { ESCAPE_CHARACTER, '\\' }), - new String(new char[] { ESCAPE_CHARACTER, '0' }), - new String(new char[] { ESCAPE_CHARACTER, 'n' }), - new String(new char[] { ESCAPE_CHARACTER, 'r' }), - new String(new char[] { ESCAPE_CHARACTER, 'Z' }), - new String(new char[] { ESCAPE_CHARACTER, '\"' }), - new String(new char[] { ESCAPE_CHARACTER, '\'' }) - }; + private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }), + new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }), + new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }), + new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) }; // http://www.joda.org/joda-time/key_format.html provides details on the // formatter token @@ -140,8 +135,9 @@ public static Object toFloatingPoint(String csvString, Column column) { } public static String encodeToCSVDecimal(Object obj) { - return ((BigDecimal)obj).toString(); + return ((BigDecimal) obj).toString(); } + public static Object toDecimal(String csvString, Column column) { return new BigDecimal(csvString); } @@ -152,7 +148,8 @@ public static String encodeToCSVBit(Object obj) { if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) { return bitStringValue; } else { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue); + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + + bitStringValue); } } @@ -161,7 +158,7 @@ public static Object toBit(String csvString) { return TRUE_BIT_SET.contains(csvString); } else { // throw an exception for any unsupported value for BITs - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString); + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + csvString); } } @@ -200,7 +197,6 @@ public static String encodeToCSVLocalDateTime(Object obj, Column col) { } } - public static String encodeToCSVDateTime(Object obj, Column col) { org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj; org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col; @@ -236,6 +232,27 @@ public static Object toDateTime(String csvString, Column column) { return returnValue; } + public static Long toDateTimeInMillis(String csvString, Column column) { + long returnValue; + String dateTime = removeQuotes(csvString); + org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column); + if (col.hasFraction() && col.hasTimezone()) { + // After calling withOffsetParsed method, a string + // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of + // -08:00 (a fixed zone, with no daylight savings rules) + returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime(); + } else if (col.hasFraction() && !col.hasTimezone()) { + // we use local date time explicitly to not include the timezone + returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime).toDate().getTime(); + } else if (col.hasTimezone()) { + returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime(); + } else { + // we use local date time explicitly to not include the timezone + returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime).toDate().getTime(); + } + return returnValue; + } + // ************ MAP Column Type utils********* @SuppressWarnings("unchecked") @@ -301,7 +318,7 @@ public static String encodeToCSVList(Object[] list, Column column) { List elementList = new ArrayList(); for (int n = 0; n < list.length; n++) { Column listType = ((AbstractComplexListType) column).getListType(); - //2 level nesting supported + // 2 level nesting supported if (isColumnListType(listType)) { Object[] listElements = (Object[]) list[n]; JSONArray subArray = new JSONArray(); @@ -332,6 +349,44 @@ public static Object[] toList(String csvString) { return null; } + @SuppressWarnings("unchecked") + public static JSONArray toJSONArray(Object[] objectArray) { + JSONArray jsonArray = new JSONArray(); + for (int i = 0; i < objectArray.length; i++) { + Object value = objectArray[i]; + if (value instanceof Object[]) { + value = toJSONArray((Object[]) value); + } + jsonArray.add(value); + } + return jsonArray; + } + + public static List toList(Object[] objectArray) { + List objList = new ArrayList(); + for (int i = 0; i < objectArray.length; i++) { + Object value = objectArray[i]; + if (value instanceof Object[]) { + value = toList((Object[]) value); + } + objList.add(value); + } + return objList; + } + + @SuppressWarnings("unchecked") + public static Object[] toObjectArray(List list) { + Object[] array = new Object[list.size()]; + for (int i = 0; i < list.size(); i++) { + Object value = list.get(i); + if (value instanceof List) { + value = toObjectArray((List) value); + } + array[i] = value; + } + return array; + } + // ************ TEXT Column Type utils********* private static String getRegExp(char character) { @@ -350,8 +405,8 @@ public static String encodeToCSVString(String string) { replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j])); } } catch (Exception e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + " " - + String.valueOf(j) + " " + e.getMessage()); + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + + " " + String.valueOf(j) + " " + e.getMessage()); } return encloseWithQuote(replacement); } @@ -365,8 +420,8 @@ public static String toText(String string) { string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j]))); } } catch (Exception e) { - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " " + String.valueOf(j) - + e.getMessage()); + throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " " + + String.valueOf(j) + e.getMessage()); } return string; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java new file mode 100644 index 00000000..b12b59ad --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; +import static org.apache.sqoop.connector.common.SqoopAvroUtils.*; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.util.Utf8; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.utils.ClassUtils; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * IDF representing the intermediate format in Avro object + */ +public class AVROIntermediateDataFormat extends IntermediateDataFormat { + + private Schema avroSchema; + + // need this default constructor for reflection magic used in execution engine + public AVROIntermediateDataFormat() { + } + + // We need schema at all times + public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) { + setSchema(schema); + avroSchema = createAvroSchema(schema); + } + + /** + * {@inheritDoc} + */ + @Override + public void setCSVTextData(String text) { + // convert the CSV text to avro + this.data = toAVRO(text); + } + + /** + * {@inheritDoc} + */ + @Override + public String getCSVTextData() { + // convert avro to sqoop CSV + return toCSV(data); + } + + /** + * {@inheritDoc} + */ + @Override + public void setObjectData(Object[] data) { + // convert the object array to avro + this.data = toAVRO(data); + } + + /** + * {@inheritDoc} + */ + @Override + public Object[] getObjectData() { + // convert avro to object array + return toObject(data); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + // do we need to write the schema? + DatumWriter writer = new GenericDatumWriter(avroSchema); + BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null); + writer.write(data, encoder); + } + + /** + * {@inheritDoc} + */ + @Override + public void read(DataInput in) throws IOException { + DatumReader reader = new GenericDatumReader(avroSchema); + Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null); + data = reader.read(null, decoder); + } + + /** + * {@inheritDoc} + */ + @Override + public Set getJars() { + + Set jars = super.getJars(); + jars.add(ClassUtils.jarForClass(GenericRecord.class)); + return jars; + } + + private GenericRecord toAVRO(String csv) { + + String[] csvStringArray = parseCSVString(csv); + + if (csvStringArray == null) { + return null; + } + + if (csvStringArray.length != schema.getColumnsArray().length) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv + + " has the wrong number of fields."); + } + GenericRecord avroObject = new GenericData.Record(avroSchema); + Column[] columnArray = schema.getColumnsArray(); + for (int i = 0; i < csvStringArray.length; i++) { + // check for NULL field and assume this field is nullable as per the sqoop + // schema + if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].getNullable()) { + avroObject.put(columnArray[i].getName(), null); + continue; + } + avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i])); + } + return avroObject; + } + + private Object toAVRO(String csvString, Column column) { + Object returnValue = null; + + switch (column.getType()) { + case ARRAY: + case SET: + Object[] list = toList(csvString); + // store as a java collection + returnValue = Arrays.asList(list); + break; + case MAP: + // store as a map + returnValue = toMap(csvString); + break; + case ENUM: + returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString))); + break; + case TEXT: + returnValue = new Utf8(removeQuotes(csvString)); + break; + case BINARY: + case UNKNOWN: + // avro accepts byte buffer for binary data + returnValue = ByteBuffer.wrap(toByteArray(csvString)); + break; + case FIXED_POINT: + returnValue = toFixedPoint(csvString, column); + break; + case FLOATING_POINT: + returnValue = toFloatingPoint(csvString, column); + break; + case DECIMAL: + // TODO: store as FIXED in SQOOP-16161 + returnValue = removeQuotes(csvString); + break; + case DATE: + // until 1.8 avro store as long + returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime(); + break; + case TIME: + // until 1.8 avro store as long + returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis(); + break; + case DATE_TIME: + // until 1.8 avro store as long + returnValue = toDateTimeInMillis(csvString, column); + break; + case BIT: + returnValue = Boolean.valueOf(removeQuotes(csvString)); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, + "Column type from schema was not recognized for " + column.getType()); + } + return returnValue; + } + + private GenericRecord toAVRO(Object[] data) { + + if (data == null) { + return null; + } + + if (data.length != schema.getColumnsArray().length) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString() + + " has the wrong number of fields."); + } + // get avro schema from sqoop schema + GenericRecord avroObject = new GenericData.Record(avroSchema); + Column[] cols = schema.getColumnsArray(); + for (int i = 0; i < data.length; i++) { + switch (cols[i].getType()) { + case ARRAY: + case SET: + avroObject.put(cols[i].getName(), toList((Object[]) data[i])); + break; + case MAP: + avroObject.put(cols[i].getName(), data[i]); + break; + case ENUM: + GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]); + avroObject.put(cols[i].getName(), enumValue); + break; + case TEXT: + avroObject.put(cols[i].getName(), new Utf8((String) data[i])); + break; + case BINARY: + case UNKNOWN: + avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i])); + break; + case FIXED_POINT: + case FLOATING_POINT: + avroObject.put(cols[i].getName(), data[i]); + break; + case DECIMAL: + // TODO: store as FIXED in SQOOP-16161 + avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString()); + break; + case DATE_TIME: + if (data[i] instanceof org.joda.time.DateTime) { + avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime()); + } else if (data[i] instanceof org.joda.time.LocalDateTime) { + avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime()); + } + break; + case TIME: + avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis()); + break; + case DATE: + avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime()); + break; + case BIT: + avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i])); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + cols[i].getType()); + } + } + + return avroObject; + } + + @SuppressWarnings("unchecked") + private String toCSV(GenericRecord record) { + Column[] cols = this.schema.getColumnsArray(); + + StringBuilder csvString = new StringBuilder(); + for (int i = 0; i < cols.length; i++) { + + Object obj = record.get(cols[i].getName()); + + if (obj == null) { + throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName()); + } + + switch (cols[i].getType()) { + case ARRAY: + case SET: + List objList = (List) obj; + csvString.append(encodeToCSVList(toObjectArray(objList), cols[i])); + break; + case MAP: + Map objMap = (Map) obj; + csvString.append(encodeToCSVMap(objMap, cols[i])); + break; + case ENUM: + case TEXT: + csvString.append(encodeToCSVString(obj.toString())); + break; + case BINARY: + case UNKNOWN: + csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj))); + break; + case FIXED_POINT: + csvString.append(encodeToCSVFixedPoint(obj, cols[i])); + break; + case FLOATING_POINT: + csvString.append(encodeToCSVFloatingPoint(obj, cols[i])); + break; + case DECIMAL: + // stored as string + csvString.append(encodeToCSVDecimal(obj)); + break; + case DATE: + // stored as long + Long dateInMillis = (Long) obj; + csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis))); + break; + case TIME: + // stored as long + Long timeInMillis = (Long) obj; + csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i])); + break; + case DATE_TIME: + // stored as long + Long dateTimeInMillis = (Long) obj; + csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i])); + break; + case BIT: + csvString.append(encodeToCSVBit(obj)); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + cols[i].getType()); + } + if (i < cols.length - 1) { + csvString.append(CSV_SEPARATOR_CHARACTER); + } + + } + + return csvString.toString(); + } + + @SuppressWarnings("unchecked") + private Object[] toObject(GenericRecord record) { + + if (data == null) { + return null; + } + Column[] cols = schema.getColumnsArray(); + Object[] object = new Object[cols.length]; + + for (int i = 0; i < cols.length; i++) { + Object obj = record.get(cols[i].getName()); + if (obj == null) { + throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName()); + } + Integer nameIndex = schema.getColumnNameIndex(cols[i].getName()); + Column column = cols[nameIndex]; + switch (column.getType()) { + case ARRAY: + case SET: + object[nameIndex] = toObjectArray((List) obj); + break; + case MAP: + object[nameIndex] = obj; + break; + case ENUM: + // stored as enum symbol + case TEXT: + // stored as UTF8 + object[nameIndex] = obj.toString(); + break; + case BINARY: + case UNKNOWN: + // stored as byte buffer + object[nameIndex] = getBytesFromByteBuffer(obj); + break; + case FIXED_POINT: + case FLOATING_POINT: + // stored as java objects in avro as well + object[nameIndex] = obj; + break; + case DECIMAL: + // stored as string + object[nameIndex] = obj.toString(); + break; + case DATE: + Long dateInMillis = (Long) obj; + object[nameIndex] = new org.joda.time.LocalDate(dateInMillis); + break; + case TIME: + Long timeInMillis = (Long) obj; + object[nameIndex] = new org.joda.time.LocalTime(timeInMillis); + break; + case DATE_TIME: + Long dateTimeInMillis = (Long) obj; + if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) { + object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis); + } else { + object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis); + } + break; + case BIT: + object[nameIndex] = toBit(obj.toString()); + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, + "Column type from schema was not recognized for " + cols[i].getType()); + } + + } + return object; + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java new file mode 100644 index 00000000..6af21a35 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.ErrorCode; + +public enum AVROIntermediateDataFormatError implements ErrorCode { + /** An unknown error has occurred. */ + AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."), + + AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.") + + ; + + private final String message; + + private AVROIntermediateDataFormatError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index be1147d6..33b5d0a6 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -152,7 +152,7 @@ private Object toObject(String csvString, Column column) { returnValue = toMap(csvString); break; default: - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004, + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + column.getType()); } return returnValue; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java index a88db455..9aae2512 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java @@ -33,17 +33,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode { /** Error while escaping a row. */ CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."), - /** Column type isn't known by Intermediate Data Format. */ - CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), - /** * For arrays and maps we use JSON representation and incorrect representation * results in parse exception */ - CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."), + CSV_INTERMEDIATE_DATA_FORMAT_0004("JSON parse internal error."), /** Unsupported bit values */ - CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."), + CSV_INTERMEDIATE_DATA_FORMAT_0005("Unsupported bit value."), ; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index adeb2ec0..055b41ce 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -18,11 +18,10 @@ */ package org.apache.sqoop.connector.idf; -import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType; +import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType; -import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; @@ -30,9 +29,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.Set; /** diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java index bda75fc1..1f583b2b 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -32,6 +32,9 @@ public enum IntermediateDataFormatError implements ErrorCode { INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"), + /** Column type isn't known by Intermediate Data Format. */ + INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), + ; private final String message; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java index 9329cf8a..90294f0a 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java @@ -189,8 +189,8 @@ private Object toJSON(String csvString, Column column) { returnValue = Boolean.valueOf(removeQuotes(csvString)); break; default: - throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004, - "Column type from schema was not recognized for " + column.getType()); + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, + "Column type from schema was not recognized for " + column.getType()); } return returnValue; } @@ -261,19 +261,6 @@ private JSONObject toJSON(Object[] data) { return object; } - @SuppressWarnings("unchecked") - public static JSONArray toJSONArray(Object[] objectArray) { - JSONArray jsonArray = new JSONArray(); - for (int i = 0; i < objectArray.length; i++) { - Object value = objectArray[i]; - if (value instanceof Object[]) { - value = toJSONArray((Object[]) value); - } - jsonArray.add(value); - } - return jsonArray; - } - private String toCSV(JSONObject json) { Column[] cols = this.schema.getColumnsArray(); diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java new file mode 100644 index 00000000..b00b3b98 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema; +import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString; +import static org.junit.Assert.assertEquals; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.sqoop.connector.common.SqoopAvroUtils; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Array; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.joda.time.LocalDateTime; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestAVROIntermediateDataFormat { + + private AVROIntermediateDataFormat dataFormat; + private org.apache.avro.Schema avroSchema; + private final static String csvArray = "'[[11,11],[14,15]]'"; + private final static String map = "'{\"testKey\":\"testValue\"}'"; + private final static String csvSet = "'[[11,12],[14,15]]'"; + private final static String csvDate = "'2014-10-01'"; + private final static String csvDateTime = "'2014-10-01 12:00:00.000'"; + private final static String csvTime = "'12:59:59'"; + private Column enumCol; + // no time zone + private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0); + private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59); + private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); + + @Before + public void setUp() { + createAvroIDF(); + } + + private void createAvroIDF() { + Schema sqoopSchema = new Schema("test"); + Set options = new HashSet(); + options.add("ENUM"); + options.add("NUME"); + enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options); + sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three")) + .addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol) + .addColumn(new Array("eight", new Array("array", new FixedPoint("ft")))) + .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten")) + .addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false)) + .addColumn(new org.apache.sqoop.schema.type.Time("twelve", false)) + .addColumn(new org.apache.sqoop.schema.type.Date("thirteen")) + .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen")) + .addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw")))); + dataFormat = new AVROIntermediateDataFormat(sqoopSchema); + avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema); + } + + /** + * setCSVGetData setCSVGetObjectArray setCSVGetCSV + */ + @Test + public void testInputAsCSVTextInAndDataOut() { + + String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate + + ",13.44," + csvSet; + dataFormat.setCSVTextData(csvText); + GenericRecord avroObject = createAvroGenericRecord(); + assertEquals(avroObject.toString(), dataFormat.getData().toString()); + } + + @Test + public void testInputAsCSVTextInAndObjectArrayOut() { + String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate + + ",13.44," + csvSet; + dataFormat.setCSVTextData(csvText); + assertEquals(dataFormat.getObjectData().length, 15); + assertObjectArray(); + + } + + private void assertObjectArray() { + Object[] out = dataFormat.getObjectData(); + assertEquals(10L, out[0]); + assertEquals(34, out[1]); + assertEquals("54", out[2]); + assertEquals("random data", out[3]); + assertEquals(-112, ((byte[]) out[4])[0]); + assertEquals(54, ((byte[]) out[4])[1]); + assertEquals("10", out[5]); + assertEquals("ENUM", out[6]); + + Object[] givenArrayOne = new Object[2]; + givenArrayOne[0] = 11; + givenArrayOne[1] = 11; + Object[] givenArrayTwo = new Object[2]; + givenArrayTwo[0] = 14; + givenArrayTwo[1] = 15; + Object[] arrayOfArrays = new Object[2]; + arrayOfArrays[0] = givenArrayOne; + arrayOfArrays[1] = givenArrayTwo; + Map map = new HashMap(); + map.put("testKey", "testValue"); + Object[] set0 = new Object[2]; + set0[0] = 11; + set0[1] = 12; + Object[] set1 = new Object[2]; + set1[0] = 14; + set1[1] = 15; + Object[] set = new Object[2]; + set[0] = set0; + set[1] = set1; + out[14] = set; + assertEquals(arrayOfArrays.length, 2); + assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7])); + assertEquals(map, out[8]); + assertEquals(true, out[9]); + assertEquals(dateTime, out[10]); + assertEquals(time, out[11]); + assertEquals(date, out[12]); + assertEquals(13.44, out[13]); + assertEquals(set.length, 2); + assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14])); + + } + + @Test + public void testInputAsCSVTextInCSVTextOut() { + String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate + + ",13.44," + csvSet; + dataFormat.setCSVTextData(csvText); + assertEquals(csvText, dataFormat.getCSVTextData()); + } + + private GenericRecord createAvroGenericRecord() { + GenericRecord avroObject = new GenericData.Record(avroSchema); + avroObject.put("one", 10L); + avroObject.put("two", 34); + avroObject.put("three", new Utf8("54")); + avroObject.put("four", new Utf8("random data")); + // store byte array in byte buffer + byte[] b = new byte[] { (byte) -112, (byte) 54 }; + avroObject.put("five", ByteBuffer.wrap(b)); + avroObject.put("six", new Utf8(String.valueOf(0x0A))); + avroObject.put("seven", new GenericData.EnumSymbol(createEnumSchema(enumCol), "ENUM")); + + List givenArrayOne = new ArrayList(); + givenArrayOne.add(11); + givenArrayOne.add(11); + List givenArrayTwo = new ArrayList(); + givenArrayTwo.add(14); + givenArrayTwo.add(15); + List arrayOfArrays = new ArrayList(); + + arrayOfArrays.add(givenArrayOne); + arrayOfArrays.add(givenArrayTwo); + + Map map = new HashMap(); + map.put("testKey", "testValue"); + + avroObject.put("eight", arrayOfArrays); + avroObject.put("nine", map); + avroObject.put("ten", true); + + // expect dates as strings + avroObject.put("eleven", dateTime.toDate().getTime()); + avroObject.put("twelve", time.toDateTimeToday().getMillis()); + avroObject.put("thirteen", date.toDate().getTime()); + avroObject.put("fourteen", 13.44); + List givenSetOne = new ArrayList(); + givenSetOne.add(11); + givenSetOne.add(12); + List givenSetTwo = new ArrayList(); + givenSetTwo.add(14); + givenSetTwo.add(15); + List set = new ArrayList(); + set.add(givenSetOne); + set.add(givenSetTwo); + avroObject.put("fifteen", set); + return avroObject; + } + + /** + * setDataGetCSV setDataGetObjectArray setDataGetData + */ + @Test + public void testInputAsDataInAndCSVOut() { + + String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate + + ",13.44," + csvSet; + dataFormat.setData(createAvroGenericRecord()); + assertEquals(csvExpected, dataFormat.getCSVTextData()); + } + + @Test + public void testInputAsDataInAndObjectArrayOut() { + GenericRecord avroObject = createAvroGenericRecord(); + dataFormat.setData(avroObject); + assertObjectArray(); + } + + @Test + public void testInputAsDataInAndDataOut() { + GenericRecord avroObject = createAvroGenericRecord(); + dataFormat.setData(avroObject); + assertEquals(avroObject, dataFormat.getData()); + } + + private Object[] createObjectArray() { + Object[] out = new Object[15]; + out[0] = 10L; + out[1] = 34; + out[2] = "54"; + out[3] = "random data"; + out[4] = new byte[] { (byte) -112, (byte) 54 }; + out[5] = String.valueOf(0x0A); + out[6] = "ENUM"; + + Object[] givenArrayOne = new Object[2]; + givenArrayOne[0] = 11; + givenArrayOne[1] = 11; + Object[] givenArrayTwo = new Object[2]; + givenArrayTwo[0] = 14; + givenArrayTwo[1] = 15; + + Object[] arrayOfArrays = new Object[2]; + arrayOfArrays[0] = givenArrayOne; + arrayOfArrays[1] = givenArrayTwo; + + Map map = new HashMap(); + map.put("testKey", "testValue"); + + out[7] = arrayOfArrays; + out[8] = map; + out[9] = true; + out[10] = dateTime; + out[11] = time; + out[12] = date; + + out[13] = 13.44; + Object[] set0 = new Object[2]; + set0[0] = 11; + set0[1] = 12; + Object[] set1 = new Object[2]; + set1[0] = 14; + set1[1] = 15; + + Object[] set = new Object[2]; + set[0] = set0; + set[1] = set1; + out[14] = set; + return out; + } + + /** + * setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray + */ + @Test + public void testInputAsObjectArrayInAndDataOut() { + + Object[] out = createObjectArray(); + dataFormat.setObjectData(out); + GenericRecord avroObject = createAvroGenericRecord(); + // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types + assertEquals(avroObject.toString(), dataFormat.getData().toString()); + + } + + @Test + public void testInputAsObjectArrayInAndCSVOut() { + Object[] out = createObjectArray(); + dataFormat.setObjectData(out); + String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate + + ",13.44," + csvSet; + assertEquals(csvText, dataFormat.getCSVTextData()); + } + + @Test + public void testInputAsObjectArrayInAndObjectArrayOut() { + Object[] out = createObjectArray(); + dataFormat.setObjectData(out); + assertObjectArray(); + } +} diff --git a/pom.xml b/pom.xml index 4dbc48f7..d1f0a45c 100644 --- a/pom.xml +++ b/pom.xml @@ -123,6 +123,7 @@ limitations under the License. 2.2.2 2.2.2 2.2.2 + 1.7.7 @@ -599,6 +600,11 @@ limitations under the License. hadoop-common ${hadoop.2.version} + + org.apache.avro + avro + ${avro.version} +