mirror of
https://github.com/apache/sqoop.git
synced 2025-05-08 07:11:01 +08:00
SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from mysql
(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
parent
f9d7c3a8e5
commit
e06190b2f8
@ -18,6 +18,7 @@
|
||||
package org.apache.sqoop.connector.common;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.classification.InterfaceAudience;
|
||||
import org.apache.sqoop.classification.InterfaceStability;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
@ -36,6 +37,8 @@
|
||||
@InterfaceStability.Unstable
|
||||
public class SqoopAvroUtils {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(SqoopAvroUtils.class);
|
||||
|
||||
public static final String COLUMN_TYPE = "columnType";
|
||||
public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
|
||||
|
||||
@ -44,14 +47,14 @@ public class SqoopAvroUtils {
|
||||
*/
|
||||
public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
|
||||
// avro schema names cannot start with quotes, lets just remove them
|
||||
String name = sqoopSchema.getName().replace("\"", "");
|
||||
String name = createAvroName(sqoopSchema.getName());
|
||||
String doc = sqoopSchema.getNote();
|
||||
String namespace = SQOOP_SCHEMA_NAMESPACE;
|
||||
Schema schema = Schema.createRecord(name, doc, namespace, false);
|
||||
|
||||
List<Schema.Field> fields = new ArrayList<Schema.Field>();
|
||||
for (Column column : sqoopSchema.getColumnsArray()) {
|
||||
Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
|
||||
Schema.Field field = new Schema.Field(createAvroName(column.getName()), createAvroFieldSchema(column), null, null);
|
||||
field.addProp(COLUMN_TYPE, column.getType().toString());
|
||||
fields.add(field);
|
||||
}
|
||||
@ -59,6 +62,16 @@ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema
|
||||
return schema;
|
||||
}
|
||||
|
||||
// From the avro docs:
|
||||
// The name portion of a fullname, record field names, and enum symbols must:
|
||||
// start with [A-Za-z_]
|
||||
// subsequently contain only [A-Za-z0-9_]
|
||||
public static String createAvroName(String name) {
|
||||
String avroName = name.replaceFirst("^[0-9]", "").replaceAll("[^a-zA-Z0-9_]", "");
|
||||
LOG.debug("Replacing name: " + name + " with Avro name: " + avroName);
|
||||
return avroName;
|
||||
}
|
||||
|
||||
public static Schema createAvroFieldSchema(Column column) {
|
||||
Schema schema = toAvroFieldType(column);
|
||||
if (!column.isNullable()) {
|
||||
@ -123,7 +136,7 @@ public static Schema createEnumSchema(Column column) {
|
||||
assert column instanceof org.apache.sqoop.schema.type.Enum;
|
||||
Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
|
||||
List<String> listOptions = new ArrayList<String>(options);
|
||||
return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
|
||||
return Schema.createEnum(createAvroName(column.getName()), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
|
||||
}
|
||||
|
||||
public static byte[] getBytesFromByteBuffer(Object obj) {
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.sqoop.classification.InterfaceAudience;
|
||||
import org.apache.sqoop.classification.InterfaceStability;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.SqoopAvroUtils;
|
||||
import org.apache.sqoop.error.code.IntermediateDataFormatError;
|
||||
import org.apache.sqoop.schema.type.Column;
|
||||
import org.apache.sqoop.utils.ClassUtils;
|
||||
@ -166,11 +167,12 @@ public GenericRecord toAVRO(String csv) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
String name = SqoopAvroUtils.createAvroName(columns[i].getName());
|
||||
if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
|
||||
avroObject.put(columns[i].getName(), null);
|
||||
avroObject.put(name, null);
|
||||
continue;
|
||||
}
|
||||
avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i]));
|
||||
avroObject.put(name, toAVRO(csvStringArray[i], columns[i]));
|
||||
}
|
||||
return avroObject;
|
||||
}
|
||||
@ -250,56 +252,59 @@ public GenericRecord toAVRO(Object[] objectArray) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
|
||||
String name = SqoopAvroUtils.createAvroName(columns[i].getName());
|
||||
|
||||
if (objectArray[i] == null) {
|
||||
avroObject.put(columns[i].getName(), null);
|
||||
avroObject.put(name, null);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (columns[i].getType()) {
|
||||
case ARRAY:
|
||||
case SET:
|
||||
avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i]));
|
||||
avroObject.put(name, toList((Object[]) objectArray[i]));
|
||||
break;
|
||||
case ENUM:
|
||||
GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
|
||||
(String) objectArray[i]);
|
||||
avroObject.put(columns[i].getName(), enumValue);
|
||||
avroObject.put(name, enumValue);
|
||||
break;
|
||||
case TEXT:
|
||||
avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i]));
|
||||
avroObject.put(name, new Utf8((String) objectArray[i]));
|
||||
break;
|
||||
case BINARY:
|
||||
case UNKNOWN:
|
||||
avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i]));
|
||||
avroObject.put(name, ByteBuffer.wrap((byte[]) objectArray[i]));
|
||||
break;
|
||||
case MAP:
|
||||
case FIXED_POINT:
|
||||
case FLOATING_POINT:
|
||||
avroObject.put(columns[i].getName(), objectArray[i]);
|
||||
avroObject.put(name, objectArray[i]);
|
||||
break;
|
||||
case DECIMAL:
|
||||
// TODO: store as FIXED in SQOOP-16161
|
||||
avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString());
|
||||
avroObject.put(name, ((BigDecimal) objectArray[i]).toPlainString());
|
||||
break;
|
||||
case DATE_TIME:
|
||||
if (objectArray[i] instanceof org.joda.time.DateTime) {
|
||||
avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate()
|
||||
avroObject.put(name, ((org.joda.time.DateTime) objectArray[i]).toDate()
|
||||
.getTime());
|
||||
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
|
||||
avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i])
|
||||
avroObject.put(name, ((org.joda.time.LocalDateTime) objectArray[i])
|
||||
.toDate().getTime());
|
||||
}
|
||||
break;
|
||||
case TIME:
|
||||
avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i])
|
||||
avroObject.put(name, ((org.joda.time.LocalTime) objectArray[i])
|
||||
.toDateTimeToday().getMillis());
|
||||
break;
|
||||
case DATE:
|
||||
avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate()
|
||||
avroObject.put(name, ((org.joda.time.LocalDate) objectArray[i]).toDate()
|
||||
.getTime());
|
||||
break;
|
||||
case BIT:
|
||||
avroObject.put(columns[i].getName(), Boolean.valueOf(objectArray[i].toString()));
|
||||
avroObject.put(name, Boolean.valueOf(objectArray[i].toString()));
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
@ -317,7 +322,7 @@ public String toCSV(GenericRecord record) {
|
||||
StringBuilder csvString = new StringBuilder();
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
|
||||
Object obj = record.get(columns[i].getName());
|
||||
Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
|
||||
if (obj == null && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
@ -396,8 +401,8 @@ public Object[] toObject(GenericRecord record) {
|
||||
Object[] object = new Object[columns.length];
|
||||
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
Object obj = record.get(columns[i].getName());
|
||||
Integer nameIndex = schema.getColumnNameIndex(columns[i].getName());
|
||||
Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
|
||||
Integer nameIndex = schema.getColumnNameIndex(SqoopAvroUtils.createAvroName(columns[i].getName()));
|
||||
Column column = columns[nameIndex];
|
||||
// null is a possible value
|
||||
if (obj == null && !column.isNullable()) {
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.sqoop.schema.type.FixedPoint;
|
||||
import org.apache.sqoop.schema.type.Text;
|
||||
import org.joda.time.LocalDateTime;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@ -545,4 +546,10 @@ public void testSchemaNotNullableWithAvro() {
|
||||
dataFormat.getData();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaWithBadCharacters() {
|
||||
Schema schema = new Schema("9`\" blah`^&*(^&*(%$^&").addColumn(new Text("one").setNullable(false));
|
||||
AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(schema);
|
||||
Assert.assertEquals(dataFormat.getAvroSchema().getName(), "blah");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user