mirror of
https://github.com/apache/sqoop.git
synced 2025-05-21 03:10:49 +08:00
SQOOP-308. Generated Avro Schema cannot handle nullable fields.
git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1154059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a0d34feeaa
commit
1014f4b54a
@ -18,9 +18,6 @@
|
|||||||
|
|
||||||
package com.cloudera.sqoop.orm;
|
package com.cloudera.sqoop.orm;
|
||||||
|
|
||||||
import com.cloudera.sqoop.SqoopOptions;
|
|
||||||
import com.cloudera.sqoop.manager.ConnManager;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.sql.Types;
|
import java.sql.Types;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -31,6 +28,9 @@
|
|||||||
import org.apache.avro.Schema.Field;
|
import org.apache.avro.Schema.Field;
|
||||||
import org.apache.avro.Schema.Type;
|
import org.apache.avro.Schema.Type;
|
||||||
|
|
||||||
|
import com.cloudera.sqoop.SqoopOptions;
|
||||||
|
import com.cloudera.sqoop.manager.ConnManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an Avro schema to represent a table from a database.
|
* Creates an Avro schema to represent a table from a database.
|
||||||
*/
|
*/
|
||||||
@ -106,7 +106,12 @@ private Type toAvroType(int sqlType) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Schema toAvroSchema(int sqlType) {
|
public Schema toAvroSchema(int sqlType) {
|
||||||
return Schema.create(toAvroType(sqlType));
|
// All types are assumed nullabl;e make a union of the "true" type for
|
||||||
|
// a column and NULL.
|
||||||
|
List<Schema> childSchemas = new ArrayList<Schema>();
|
||||||
|
childSchemas.add(Schema.create(toAvroType(sqlType)));
|
||||||
|
childSchemas.add(Schema.create(Schema.Type.NULL));
|
||||||
|
return Schema.createUnion(childSchemas);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,10 @@
|
|||||||
|
|
||||||
package com.cloudera.sqoop;
|
package com.cloudera.sqoop;
|
||||||
|
|
||||||
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
|
|
||||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
|
||||||
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
|
||||||
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -39,6 +37,11 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
|
||||||
|
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||||
|
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||||
|
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests --as-avrodatafile.
|
* Tests --as-avrodatafile.
|
||||||
*/
|
*/
|
||||||
@ -89,16 +92,67 @@ public void testAvroImport() throws IOException {
|
|||||||
assertEquals(2, fields.size());
|
assertEquals(2, fields.size());
|
||||||
|
|
||||||
assertEquals("INTFIELD1", fields.get(0).name());
|
assertEquals("INTFIELD1", fields.get(0).name());
|
||||||
assertEquals(Schema.Type.INT, fields.get(0).schema().getType());
|
assertEquals(Schema.Type.UNION, fields.get(0).schema().getType());
|
||||||
|
assertEquals(Schema.Type.INT, fields.get(0).schema().getTypes().get(0).getType());
|
||||||
|
assertEquals(Schema.Type.NULL, fields.get(0).schema().getTypes().get(1).getType());
|
||||||
|
|
||||||
assertEquals("INTFIELD2", fields.get(1).name());
|
assertEquals("INTFIELD2", fields.get(1).name());
|
||||||
assertEquals(Schema.Type.INT, fields.get(1).schema().getType());
|
assertEquals(Schema.Type.UNION, fields.get(1).schema().getType());
|
||||||
|
assertEquals(Schema.Type.INT, fields.get(1).schema().getTypes().get(0).getType());
|
||||||
|
assertEquals(Schema.Type.NULL, fields.get(1).schema().getTypes().get(1).getType());
|
||||||
|
|
||||||
GenericRecord record1 = reader.next();
|
GenericRecord record1 = reader.next();
|
||||||
assertEquals(1, record1.get("INTFIELD1"));
|
assertEquals(1, record1.get("INTFIELD1"));
|
||||||
assertEquals(8, record1.get("INTFIELD2"));
|
assertEquals(8, record1.get("INTFIELD2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNullableAvroImport() throws IOException, SQLException {
|
||||||
|
addNullRecord(); // Add a pair of NULL values to twointtable.
|
||||||
|
runImport(getOutputArgv(true));
|
||||||
|
|
||||||
|
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
|
||||||
|
DataFileReader<GenericRecord> reader = read(outputFile);
|
||||||
|
boolean foundNullRecord = false;
|
||||||
|
|
||||||
|
// Iterate thru the records in the output file til we find one that
|
||||||
|
// matches (NULL, NULL).
|
||||||
|
for (GenericRecord record : reader) {
|
||||||
|
LOG.debug("Input record: " + record);
|
||||||
|
if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) {
|
||||||
|
LOG.debug("Got null record");
|
||||||
|
foundNullRecord = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(foundNullRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a record to the TWOINTTABLE that contains (NULL, NULL).
|
||||||
|
*
|
||||||
|
* @throws SQLException if there's a problem doing the INSERT statement.
|
||||||
|
*/
|
||||||
|
private void addNullRecord() throws SQLException {
|
||||||
|
Connection connection = null;
|
||||||
|
Statement st = null;
|
||||||
|
try {
|
||||||
|
connection = this.getManager().getConnection();
|
||||||
|
st = connection.createStatement();
|
||||||
|
st.executeUpdate("INSERT INTO " + getTableName()
|
||||||
|
+ " VALUES(NULL, NULL)");
|
||||||
|
|
||||||
|
connection.commit();
|
||||||
|
} finally {
|
||||||
|
if (null != st) {
|
||||||
|
st.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null != connection) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private DataFileReader<GenericRecord> read(Path filename) throws IOException {
|
private DataFileReader<GenericRecord> read(Path filename) throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
|
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
|
||||||
|
Loading…
Reference in New Issue
Block a user