From e06190b2f8d10f1e0cef2abd66b052b56c1e679f Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Mon, 29 Feb 2016 11:37:06 -0800 Subject: [PATCH] SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from mysql (Abraham Fine via Jarek Jarcec Cecho) --- .../connector/common/SqoopAvroUtils.java | 19 +++++++-- .../idf/AVROIntermediateDataFormat.java | 39 +++++++++++-------- .../idf/TestAVROIntermediateDataFormat.java | 7 ++++ 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java index 89bc0f2c..f34521c7 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.common; import org.apache.avro.Schema; +import org.apache.log4j.Logger; import org.apache.sqoop.classification.InterfaceAudience; import org.apache.sqoop.classification.InterfaceStability; import org.apache.sqoop.common.SqoopException; @@ -36,6 +37,8 @@ @InterfaceStability.Unstable public class SqoopAvroUtils { + private static final Logger LOG = Logger.getLogger(SqoopAvroUtils.class); + public static final String COLUMN_TYPE = "columnType"; public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop"; @@ -44,14 +47,14 @@ public class SqoopAvroUtils { */ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) { // avro schema names cannot start with quotes, lets just remove them - String name = sqoopSchema.getName().replace("\"", ""); + String name = createAvroName(sqoopSchema.getName()); String doc = sqoopSchema.getNote(); String namespace = SQOOP_SCHEMA_NAMESPACE; Schema schema = Schema.createRecord(name, doc, namespace, false); List fields = new ArrayList(); for (Column column : sqoopSchema.getColumnsArray()) { - Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null); + Schema.Field field = new Schema.Field(createAvroName(column.getName()), createAvroFieldSchema(column), null, null); field.addProp(COLUMN_TYPE, column.getType().toString()); fields.add(field); } @@ -59,6 +62,16 @@ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema return schema; } + // From the avro docs: + // The name portion of a fullname, record field names, and enum symbols must: + // start with [A-Za-z_] + // subsequently contain only [A-Za-z0-9_] + public static String createAvroName(String name) { + String avroName = name.replaceFirst("^[0-9]", "").replaceAll("[^a-zA-Z0-9_]", ""); + LOG.debug("Replacing name: " + name + " with Avro name: " + avroName); + return avroName; + } + public static Schema createAvroFieldSchema(Column column) { Schema schema = toAvroFieldType(column); if (!column.isNullable()) { @@ -123,7 +136,7 @@ public static Schema createEnumSchema(Column column) { assert column instanceof org.apache.sqoop.schema.type.Enum; Set options = ((org.apache.sqoop.schema.type.Enum) column).getOptions(); List listOptions = new ArrayList(options); - return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions); + return Schema.createEnum(createAvroName(column.getName()), null, SQOOP_SCHEMA_NAMESPACE, listOptions); } public static byte[] getBytesFromByteBuffer(Object obj) { diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index b55f7a00..650e24cc 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -36,6 +36,7 @@ import org.apache.sqoop.classification.InterfaceAudience; import org.apache.sqoop.classification.InterfaceStability; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.common.SqoopAvroUtils; import org.apache.sqoop.error.code.IntermediateDataFormatError; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.utils.ClassUtils; @@ -166,11 +167,12 @@ public GenericRecord toAVRO(String csv) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); } + String name = SqoopAvroUtils.createAvroName(columns[i].getName()); if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) { - avroObject.put(columns[i].getName(), null); + avroObject.put(name, null); continue; } - avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i])); + avroObject.put(name, toAVRO(csvStringArray[i], columns[i])); } return avroObject; } @@ -250,56 +252,59 @@ public GenericRecord toAVRO(Object[] objectArray) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); } + + String name = SqoopAvroUtils.createAvroName(columns[i].getName()); + if (objectArray[i] == null) { - avroObject.put(columns[i].getName(), null); + avroObject.put(name, null); continue; } switch (columns[i].getType()) { case ARRAY: case SET: - avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i])); + avroObject.put(name, toList((Object[]) objectArray[i])); break; case ENUM: GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]), (String) objectArray[i]); - avroObject.put(columns[i].getName(), enumValue); + avroObject.put(name, enumValue); break; case TEXT: - avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i])); + avroObject.put(name, new Utf8((String) objectArray[i])); break; case BINARY: case UNKNOWN: - avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i])); + avroObject.put(name, ByteBuffer.wrap((byte[]) objectArray[i])); break; case MAP: case FIXED_POINT: case FLOATING_POINT: - avroObject.put(columns[i].getName(), objectArray[i]); + avroObject.put(name, objectArray[i]); break; case DECIMAL: // TODO: store as FIXED in SQOOP-16161 - avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString()); + avroObject.put(name, ((BigDecimal) objectArray[i]).toPlainString()); break; case DATE_TIME: if (objectArray[i] instanceof org.joda.time.DateTime) { - avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate() + avroObject.put(name, ((org.joda.time.DateTime) objectArray[i]).toDate() .getTime()); } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) { - avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i]) + avroObject.put(name, ((org.joda.time.LocalDateTime) objectArray[i]) .toDate().getTime()); } break; case TIME: - avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i]) + avroObject.put(name, ((org.joda.time.LocalTime) objectArray[i]) .toDateTimeToday().getMillis()); break; case DATE: - avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate() + avroObject.put(name, ((org.joda.time.LocalDate) objectArray[i]).toDate() .getTime()); break; case BIT: - avroObject.put(columns[i].getName(), Boolean.valueOf(objectArray[i].toString())); + avroObject.put(name, Boolean.valueOf(objectArray[i].toString())); break; default: throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, @@ -317,7 +322,7 @@ public String toCSV(GenericRecord record) { StringBuilder csvString = new StringBuilder(); for (int i = 0; i < columns.length; i++) { - Object obj = record.get(columns[i].getName()); + Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName())); if (obj == null && !columns[i].isNullable()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, columns[i].getName() + " does not support null values"); @@ -396,8 +401,8 @@ public Object[] toObject(GenericRecord record) { Object[] object = new Object[columns.length]; for (int i = 0; i < columns.length; i++) { - Object obj = record.get(columns[i].getName()); - Integer nameIndex = schema.getColumnNameIndex(columns[i].getName()); + Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName())); + Integer nameIndex = schema.getColumnNameIndex(SqoopAvroUtils.createAvroName(columns[i].getName())); Column column = columns[nameIndex]; // null is a possible value if (obj == null && !column.isNullable()) { diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java index 3c4d7ded..cd4445d8 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java @@ -39,6 +39,7 @@ import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.Text; import org.joda.time.LocalDateTime; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -545,4 +546,10 @@ public void testSchemaNotNullableWithAvro() { dataFormat.getData(); } + @Test + public void testSchemaWithBadCharacters() { + Schema schema = new Schema("9`\" blah`^&*(^&*(%$^&").addColumn(new Text("one").setNullable(false)); + AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(schema); + Assert.assertEquals(dataFormat.getAvroSchema().getName(), "blah"); + } }