diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 7cef93c3..05c132eb 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -92,24 +92,20 @@ public Void run() throws Exception { filewriter.initialize(filepath, context.getSchema(), conf, codec); - if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { - String record; - while ((record = reader.readTextRecord()) != null) { - if (context.getSchema() instanceof ByteArraySchema) { - filewriter.write(SqoopIDFUtils.toText(record)); + + String nullValue; + if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { + nullValue = SqoopIDFUtils.DEFAULT_NULL_VALUE; } else { - filewriter.write(record); + nullValue = toJobConfig.toJobConfig.nullValue; } - rowsWritten++; - } - } else { - Object[] record; - while ((record = reader.readArrayRecord()) != null) { - filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue)); - rowsWritten++; - } - } - filewriter.destroy(); + + Object[] record; + while ((record = reader.readArrayRecord()) != null) { + filewriter.write(record, nullValue); + rowsWritten++; + } + filewriter.destroy(); } catch (IOException e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java index 31023e73..63c5e0de 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java @@ -28,7 +28,7 @@ public abstract class GenericHdfsWriter { public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException; - public abstract void write(String csv) throws IOException; + public abstract void write(Object[] record, String nullValue) throws IOException; public abstract void destroy() throws IOException; diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java index 4ec813b0..a478b8aa 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java @@ -55,8 +55,8 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres } @Override - public void write(String csv) throws IOException { - avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv)); + public void write(Object[] record, String nullValue) throws IOException { + avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(record)); } @Override diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java index dcce8617..482321ad 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -23,6 +23,8 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.connector.common.SqoopIDFUtils; +import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.schema.Schema; import java.io.IOException; @@ -31,9 +33,11 @@ public class HdfsSequenceWriter extends GenericHdfsWriter { private SequenceFile.Writer filewriter; private Text text; + private Schema schema; @SuppressWarnings("deprecation") public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { + this.schema = schema; if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), conf, filepath, Text.class, NullWritable.class, @@ -47,9 +51,14 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres } @Override - public void write(String csv) throws IOException { - text.set(csv); - filewriter.append(text, NullWritable.get()); + public void write(Object[] record, String nullValue) throws IOException { + if (schema instanceof ByteArraySchema) { + text.set(new String(((byte[]) record[0]), SqoopIDFUtils.BYTE_FIELD_CHARSET)); + } else { + text.set(SqoopIDFUtils.toCSV(record, schema, nullValue)); + } + + filewriter.append(text, NullWritable.get()); } public void destroy() throws IOException { diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java index 384e3309..c1bea3ca 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.HdfsConstants; +import org.apache.sqoop.schema.ByteArraySchema; import org.apache.sqoop.schema.Schema; import java.io.BufferedWriter; @@ -33,9 +35,11 @@ public class HdfsTextWriter extends GenericHdfsWriter { private BufferedWriter filewriter; + private Schema schema; @Override public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { + this.schema = schema; FileSystem fs = filepath.getFileSystem(conf); DataOutputStream filestream = fs.create(filepath, false); @@ -50,8 +54,12 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres } @Override - public void write(String csv) throws IOException { - filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER); + public void write(Object[] record, String nullValue) throws IOException { + if (schema instanceof ByteArraySchema) { + filewriter.write(new String(((byte[]) record[0]), SqoopIDFUtils.BYTE_FIELD_CHARSET) + HdfsConstants.DEFAULT_RECORD_DELIMITER); + } else { + filewriter.write(SqoopIDFUtils.toCSV(record, schema, nullValue) + HdfsConstants.DEFAULT_RECORD_DELIMITER); + } } diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index cbd555a8..602f4a9a 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -20,6 +20,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,6 +54,7 @@ import org.apache.sqoop.schema.type.Text; import org.apache.sqoop.utils.ClassUtils; import org.testng.Assert; +import org.testng.ITest; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -63,7 +65,7 @@ import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; -public class TestLoader extends TestHdfsBase { +public class TestLoader extends TestHdfsBase implements ITest { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; @@ -74,6 +76,8 @@ public class TestLoader extends TestHdfsBase { private String user = "test_user"; private Schema schema; + private String methodName; + @Factory(dataProvider="test-hdfs-loader") public TestLoader(ToFormat outputFormat, ToCompression compression) @@ -100,14 +104,21 @@ public static Object[][] data() { return parameters.toArray(new Object[0][]); } - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception {} + @BeforeMethod + public void findMethodName(Method method) { + methodName = method.getName(); + } @AfterMethod(alwaysRun = true) public void tearDown() throws IOException { FileUtils.delete(outputDirectory); } + @Override + public String getTestName() { + return methodName + "[" + outputFormat.name() + ", " + compression + "]"; + } + @Test public void testLoader() throws Exception { FileSystem fs = FileSystem.get(new Configuration()); @@ -123,7 +134,11 @@ public void testLoader() throws Exception { @Override public Object[] readArrayRecord() { assertTestUser(user); - return null; + if (index++ < NUMBER_OF_ROWS_PER_FILE) { + return new Object[] {index, (float)index, String.valueOf(index)}; + } else { + return null; + } } @Override diff --git a/test/src/main/java/org/apache/sqoop/test/data/Cities.java b/test/src/main/java/org/apache/sqoop/test/data/Cities.java index f2c69bb4..a3c6e0dc 100644 --- a/test/src/main/java/org/apache/sqoop/test/data/Cities.java +++ b/test/src/main/java/org/apache/sqoop/test/data/Cities.java @@ -51,6 +51,7 @@ public DataSet loadBasicData() { provider.insertRow(tableBaseName, 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); provider.insertRow(tableBaseName, 3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); provider.insertRow(tableBaseName, 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + provider.insertRow(tableBaseName, 5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); return this; } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java index 68855257..8ff52dd8 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java @@ -66,7 +66,8 @@ public void test() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); // Second execution @@ -76,10 +77,12 @@ public void test() throws Exception { "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'", "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); dropTable(); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java index c6ce1e87..c2803de4 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java @@ -43,7 +43,8 @@ public void test() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" }; createFromFile("input-0001", sampleData); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java index 37306e2e..592e8cc0 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/HdfsIncrementalReadTest.java @@ -88,14 +88,16 @@ public void testBasic() throws Exception { // And last execution createFromFile("input-0003", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); executeJob(job); - assertEquals(provider.rowCount(getTableName()), 4); + assertEquals(provider.rowCount(getTableName()), 5); assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java index 1e8c688c..a196034a 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java @@ -124,7 +124,8 @@ private String[] getCsv() { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000'," + nullValue, "3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" }; } @@ -164,7 +165,7 @@ public void testFromHdfs() throws Exception { sqoopSchema, conf, null); for (String line : getCsv()) { - parquetWriter.write(line); + parquetWriter.write(SqoopIDFUtils.fromCSV(line, sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); } parquetWriter.destroy(); @@ -199,11 +200,12 @@ public void testFromHdfs() throws Exception { executeJob(job); - Assert.assertEquals(4L, provider.rowCount(getTableName())); + Assert.assertEquals(5L, provider.rowCount(getTableName())); assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); } @Test @@ -217,6 +219,7 @@ public void testToHdfs() throws Exception { provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + provider.insertRow(getTableName(), 5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector"); fillRdbmsLinkConfig(rdbmsLinkFrom); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java index 330da56f..722c126a 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java @@ -145,7 +145,8 @@ public void testOutputDirectoryIsEmpty() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); dropTable(); diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java index d55563dd..e8f2b1ae 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java @@ -17,14 +17,16 @@ */ package org.apache.sqoop.integration.connector.hdfs; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.sqoop.connector.common.SqoopAvroUtils; +import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; import org.apache.sqoop.model.MJob; @@ -40,20 +42,37 @@ import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.Assert; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) public class ParquetTest extends SqoopTestCase { + private Schema sqoopSchema; + private org.apache.avro.Schema avroSchema; + + @BeforeClass + public void setUp() { + sqoopSchema = new Schema("ParquetTest"); + sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE / Byte.SIZE), true)); + sqoopSchema.addColumn(new Text("country")); + sqoopSchema.addColumn(new DateTime("some_date", true, false)); + sqoopSchema.addColumn(new Text("city")); + + avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema); + } + + @AfterMethod public void dropTable() { super.dropTable(); @@ -94,30 +113,48 @@ public void toParquetTest() throws Exception { saveJob(job); executeJob(job); - String[] expectedOutput = - {"'1','USA','2004-10-23 00:00:00.000','San Francisco'", - "'2','USA','2004-10-24 00:00:00.000','Sunnyvale'", - "'3','Czech Republic','2004-10-25 00:00:00.000','Brno'", - "'4','USA','2004-10-26 00:00:00.000','Palo Alto'"}; - - - Multiset setLines = HashMultiset.create(Arrays.asList(expectedOutput)); - - List notFound = new LinkedList<>(); + List expectedAvroRecords = new ArrayList<>(); + expectedAvroRecords.addAll(Arrays.asList( + new GenericRecordBuilder(avroSchema) + .set(SqoopAvroUtils.createAvroName("id"), 1) + .set(SqoopAvroUtils.createAvroName("country"), "USA") + .set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 23, 0, 0, 0, 0).toDate().getTime()) + .set(SqoopAvroUtils.createAvroName("city"), "San Francisco").build(), + new GenericRecordBuilder(avroSchema) + .set(SqoopAvroUtils.createAvroName("id"), 2) + .set(SqoopAvroUtils.createAvroName("country"), "USA") + .set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 24, 0, 0, 0, 0).toDate().getTime()) + .set(SqoopAvroUtils.createAvroName("city"), "Sunnyvale").build(), + new GenericRecordBuilder(avroSchema) + .set(SqoopAvroUtils.createAvroName("id"), 3) + .set(SqoopAvroUtils.createAvroName("country"), "Czech Republic") + .set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 25, 0, 0, 0, 0).toDate().getTime()) + .set(SqoopAvroUtils.createAvroName("city"), "Brno").build(), + new GenericRecordBuilder(avroSchema) + .set(SqoopAvroUtils.createAvroName("id"), 4) + .set(SqoopAvroUtils.createAvroName("country"), "USA") + .set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 26, 0, 0, 0, 0).toDate().getTime()) + .set(SqoopAvroUtils.createAvroName("city"), "Palo Alto").build(), + new GenericRecordBuilder(avroSchema) + .set(SqoopAvroUtils.createAvroName("id"), 5) + .set(SqoopAvroUtils.createAvroName("country"), "USA") + .set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 27, 0, 0, 0, 0).toDate().getTime()) + .set(SqoopAvroUtils.createAvroName("city"), "Martha's Vineyard").build() + )); + List notFound = new LinkedList<>(); Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); for (Path file : files) { ParquetReader avroParquetReader = AvroParquetReader.builder(file).build(); GenericRecord record; while ((record = avroParquetReader.read()) != null) { - String recordAsLine = recordToLine(record); - if (!setLines.remove(recordAsLine)) { - notFound.add(recordAsLine); + if (!expectedAvroRecords.remove(record)) { + notFound.add(record); } } } - if (!setLines.isEmpty() || !notFound.isEmpty()) { + if (!expectedAvroRecords.isEmpty() || !notFound.isEmpty()) { fail("Output do not match expectations."); } } @@ -126,12 +163,6 @@ public void toParquetTest() throws Exception { public void fromParquetTest() throws Exception { createTableCities(); - Schema sqoopSchema = new Schema("cities"); - sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true)); - sqoopSchema.addColumn(new Text("country")); - sqoopSchema.addColumn(new DateTime("some_date", true, false)); - sqoopSchema.addColumn(new Text("city")); - HdfsParquetWriter parquetWriter = new HdfsParquetWriter(); Configuration conf = new Configuration(); @@ -141,8 +172,8 @@ public void fromParquetTest() throws Exception { new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")), sqoopSchema, conf, null); - parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'"); - parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'"); + parquetWriter.write(SqoopIDFUtils.fromCSV("1,'USA','2004-10-23 00:00:00.000','San Francisco'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); + parquetWriter.write(SqoopIDFUtils.fromCSV("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); parquetWriter.destroy(); @@ -150,8 +181,9 @@ public void fromParquetTest() throws Exception { new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")), sqoopSchema, conf, null); - parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'"); - parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'"); + parquetWriter.write(SqoopIDFUtils.fromCSV("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); + parquetWriter.write(SqoopIDFUtils.fromCSV("4,'USA','2004-10-26 00:00:00.000','Palo Alto'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); + parquetWriter.write(SqoopIDFUtils.fromCSV("5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE); parquetWriter.destroy(); @@ -172,20 +204,12 @@ public void fromParquetTest() throws Exception { saveJob(job); executeJob(job); - assertEquals(provider.rowCount(getTableName()), 4); + Assert.assertEquals(provider.rowCount(getTableName()), 5); assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); - } - - public String recordToLine(GenericRecord genericRecord) { - String line = ""; - line += "\'" + String.valueOf(genericRecord.get(0)) + "\',"; - line += "\'" + String.valueOf(genericRecord.get(1)) + "\',"; - line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',"; - line += "\'" + String.valueOf(genericRecord.get(3)) + "\'"; - return line; + assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java index c8576999..e2e74179 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java @@ -108,7 +108,8 @@ public void testImportExport() throws Exception { "1,'USA','2004-10-23','San Francisco'", "2,'USA','2004-10-24','Sunnyvale'", "3,'Czech Republic','2004-10-25','Brno'", - "4,'USA','2004-10-26','Palo Alto'" + "4,'USA','2004-10-26','Palo Alto'", + "5,'USA','2004-10-27','Martha\\'s Vineyard'" ); // This re-creates the table completely @@ -129,6 +130,7 @@ public void testImportExport() throws Exception { assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); + assertRowInCities(4, "USA", "2004-10-27", "Martha's Vineyard"); } @Test diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java index 933bc08d..e05cbdf2 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java @@ -52,7 +52,8 @@ public void testBasic() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); // RDBMS link MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); @@ -80,10 +81,11 @@ public void testBasic() throws Exception { executeJob(job); - assertEquals(4L, provider.rowCount(getTableName())); + assertEquals(5L, provider.rowCount(getTableName())); assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard"); } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 7e660918..6eb7ed8f 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -73,7 +73,8 @@ public void testCities() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); // Clean up testing table @@ -157,7 +158,8 @@ public void testColumns() throws Exception { "1,'USA'", "2,'USA'", "3,'Czech Republic'", - "4,'USA'" + "4,'USA'", + "5,'USA'" ); // Clean up testing table @@ -197,7 +199,8 @@ public void testSql() throws Exception { "1", "2", "3", - "4" + "4", + "5" ); // Clean up testing table @@ -243,7 +246,8 @@ public void testDuplicateColumns() throws Exception { "1,1", "2,2", "3,3", - "4,4" + "4,4", + "5,5" ); // Clean up testing table @@ -292,7 +296,8 @@ public void testAllowNullsWithOneExtractor() throws Exception { "1,'USA','2004-10-23 00:00:00.000','San Francisco'", "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", - "4,'USA','2004-10-26 00:00:00.000','Palo Alto'" + "4,'USA','2004-10-26 00:00:00.000','Palo Alto'", + "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'" ); // Clean up testing table diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java index 10f36144..1ffbe6f7 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java @@ -100,7 +100,8 @@ public void testCities() throws Exception { "\"1\"", "\"2\"", "\"3\"", - "\"4\"" + "\"4\"", + "\"5\"" ); } }