mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 12:52:24 +08:00
SQOOP-2998: Sqoop2: Strings with a ' (single quote) read from hdfs may be improperly escaped
(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
parent
f972e2cb9a
commit
d3955ab570
@ -92,24 +92,20 @@ public Void run() throws Exception {
|
||||
|
||||
filewriter.initialize(filepath, context.getSchema(), conf, codec);
|
||||
|
||||
if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
|
||||
String record;
|
||||
while ((record = reader.readTextRecord()) != null) {
|
||||
if (context.getSchema() instanceof ByteArraySchema) {
|
||||
filewriter.write(SqoopIDFUtils.toText(record));
|
||||
|
||||
String nullValue;
|
||||
if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
|
||||
nullValue = SqoopIDFUtils.DEFAULT_NULL_VALUE;
|
||||
} else {
|
||||
filewriter.write(record);
|
||||
nullValue = toJobConfig.toJobConfig.nullValue;
|
||||
}
|
||||
rowsWritten++;
|
||||
}
|
||||
} else {
|
||||
Object[] record;
|
||||
while ((record = reader.readArrayRecord()) != null) {
|
||||
filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue));
|
||||
rowsWritten++;
|
||||
}
|
||||
}
|
||||
filewriter.destroy();
|
||||
|
||||
Object[] record;
|
||||
while ((record = reader.readArrayRecord()) != null) {
|
||||
filewriter.write(record, nullValue);
|
||||
rowsWritten++;
|
||||
}
|
||||
filewriter.destroy();
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
|
||||
|
@ -28,7 +28,7 @@ public abstract class GenericHdfsWriter {
|
||||
|
||||
public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException;
|
||||
|
||||
public abstract void write(String csv) throws IOException;
|
||||
public abstract void write(Object[] record, String nullValue) throws IOException;
|
||||
|
||||
public abstract void destroy() throws IOException;
|
||||
|
||||
|
@ -55,8 +55,8 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String csv) throws IOException {
|
||||
avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv));
|
||||
public void write(Object[] record, String nullValue) throws IOException {
|
||||
avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(record));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -23,6 +23,8 @@
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.schema.ByteArraySchema;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -31,9 +33,11 @@ public class HdfsSequenceWriter extends GenericHdfsWriter {
|
||||
|
||||
private SequenceFile.Writer filewriter;
|
||||
private Text text;
|
||||
private Schema schema;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
|
||||
this.schema = schema;
|
||||
if (codec != null) {
|
||||
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
|
||||
conf, filepath, Text.class, NullWritable.class,
|
||||
@ -47,9 +51,14 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String csv) throws IOException {
|
||||
text.set(csv);
|
||||
filewriter.append(text, NullWritable.get());
|
||||
public void write(Object[] record, String nullValue) throws IOException {
|
||||
if (schema instanceof ByteArraySchema) {
|
||||
text.set(new String(((byte[]) record[0]), SqoopIDFUtils.BYTE_FIELD_CHARSET));
|
||||
} else {
|
||||
text.set(SqoopIDFUtils.toCSV(record, schema, nullValue));
|
||||
}
|
||||
|
||||
filewriter.append(text, NullWritable.get());
|
||||
}
|
||||
|
||||
public void destroy() throws IOException {
|
||||
|
@ -22,7 +22,9 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.connector.hdfs.HdfsConstants;
|
||||
import org.apache.sqoop.schema.ByteArraySchema;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
@ -33,9 +35,11 @@
|
||||
public class HdfsTextWriter extends GenericHdfsWriter {
|
||||
|
||||
private BufferedWriter filewriter;
|
||||
private Schema schema;
|
||||
|
||||
@Override
|
||||
public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
|
||||
this.schema = schema;
|
||||
FileSystem fs = filepath.getFileSystem(conf);
|
||||
|
||||
DataOutputStream filestream = fs.create(filepath, false);
|
||||
@ -50,8 +54,12 @@ public void initialize(Path filepath, Schema schema, Configuration conf, Compres
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(String csv) throws IOException {
|
||||
filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER);
|
||||
public void write(Object[] record, String nullValue) throws IOException {
|
||||
if (schema instanceof ByteArraySchema) {
|
||||
filewriter.write(new String(((byte[]) record[0]), SqoopIDFUtils.BYTE_FIELD_CHARSET) + HdfsConstants.DEFAULT_RECORD_DELIMITER);
|
||||
} else {
|
||||
filewriter.write(SqoopIDFUtils.toCSV(record, schema, nullValue) + HdfsConstants.DEFAULT_RECORD_DELIMITER);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -53,6 +54,7 @@
|
||||
import org.apache.sqoop.schema.type.Text;
|
||||
import org.apache.sqoop.utils.ClassUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.ITest;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.DataProvider;
|
||||
@ -63,7 +65,7 @@
|
||||
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
|
||||
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
|
||||
|
||||
public class TestLoader extends TestHdfsBase {
|
||||
public class TestLoader extends TestHdfsBase implements ITest {
|
||||
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
|
||||
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
|
||||
|
||||
@ -74,6 +76,8 @@ public class TestLoader extends TestHdfsBase {
|
||||
private String user = "test_user";
|
||||
private Schema schema;
|
||||
|
||||
private String methodName;
|
||||
|
||||
@Factory(dataProvider="test-hdfs-loader")
|
||||
public TestLoader(ToFormat outputFormat,
|
||||
ToCompression compression)
|
||||
@ -100,14 +104,21 @@ public static Object[][] data() {
|
||||
return parameters.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
@BeforeMethod(alwaysRun = true)
|
||||
public void setUp() throws Exception {}
|
||||
@BeforeMethod
|
||||
public void findMethodName(Method method) {
|
||||
methodName = method.getName();
|
||||
}
|
||||
|
||||
@AfterMethod(alwaysRun = true)
|
||||
public void tearDown() throws IOException {
|
||||
FileUtils.delete(outputDirectory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTestName() {
|
||||
return methodName + "[" + outputFormat.name() + ", " + compression + "]";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoader() throws Exception {
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
@ -123,7 +134,11 @@ public void testLoader() throws Exception {
|
||||
@Override
|
||||
public Object[] readArrayRecord() {
|
||||
assertTestUser(user);
|
||||
return null;
|
||||
if (index++ < NUMBER_OF_ROWS_PER_FILE) {
|
||||
return new Object[] {index, (float)index, String.valueOf(index)};
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,6 +51,7 @@ public DataSet loadBasicData() {
|
||||
provider.insertRow(tableBaseName, 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
|
||||
provider.insertRow(tableBaseName, 3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
provider.insertRow(tableBaseName, 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
provider.insertRow(tableBaseName, 5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
|
||||
return this;
|
||||
}
|
||||
|
@ -66,7 +66,8 @@ public void test() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
// Second execution
|
||||
@ -76,10 +77,12 @@ public void test() throws Exception {
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'",
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
dropTable();
|
||||
|
@ -43,7 +43,8 @@ public void test() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
};
|
||||
|
||||
createFromFile("input-0001", sampleData);
|
||||
|
@ -88,14 +88,16 @@ public void testBasic() throws Exception {
|
||||
|
||||
// And last execution
|
||||
createFromFile("input-0003",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
executeJob(job);
|
||||
assertEquals(provider.rowCount(getTableName()), 4);
|
||||
assertEquals(provider.rowCount(getTableName()), 5);
|
||||
assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
|
||||
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
|
||||
assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -124,7 +124,8 @@ private String[] getCsv() {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000'," + nullValue,
|
||||
"3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
};
|
||||
}
|
||||
|
||||
@ -164,7 +165,7 @@ public void testFromHdfs() throws Exception {
|
||||
sqoopSchema, conf, null);
|
||||
|
||||
for (String line : getCsv()) {
|
||||
parquetWriter.write(line);
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV(line, sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
}
|
||||
|
||||
parquetWriter.destroy();
|
||||
@ -199,11 +200,12 @@ public void testFromHdfs() throws Exception {
|
||||
|
||||
executeJob(job);
|
||||
|
||||
Assert.assertEquals(4L, provider.rowCount(getTableName()));
|
||||
Assert.assertEquals(5L, provider.rowCount(getTableName()));
|
||||
assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
|
||||
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
|
||||
assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -217,6 +219,7 @@ public void testToHdfs() throws Exception {
|
||||
provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
|
||||
provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
provider.insertRow(getTableName(), 5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
|
||||
MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector");
|
||||
fillRdbmsLinkConfig(rdbmsLinkFrom);
|
||||
|
@ -145,7 +145,8 @@ public void testOutputDirectoryIsEmpty() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
dropTable();
|
||||
|
@ -17,14 +17,16 @@
|
||||
*/
|
||||
package org.apache.sqoop.integration.connector.hdfs;
|
||||
|
||||
import com.google.common.collect.HashMultiset;
|
||||
import com.google.common.collect.Multiset;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.GenericRecordBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
import org.apache.sqoop.connector.common.SqoopAvroUtils;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
|
||||
import org.apache.sqoop.model.MJob;
|
||||
@ -40,20 +42,37 @@
|
||||
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
|
||||
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
|
||||
import org.apache.sqoop.test.utils.HdfsUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
|
||||
public class ParquetTest extends SqoopTestCase {
|
||||
|
||||
private Schema sqoopSchema;
|
||||
private org.apache.avro.Schema avroSchema;
|
||||
|
||||
@BeforeClass
|
||||
public void setUp() {
|
||||
sqoopSchema = new Schema("ParquetTest");
|
||||
sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE / Byte.SIZE), true));
|
||||
sqoopSchema.addColumn(new Text("country"));
|
||||
sqoopSchema.addColumn(new DateTime("some_date", true, false));
|
||||
sqoopSchema.addColumn(new Text("city"));
|
||||
|
||||
avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema);
|
||||
}
|
||||
|
||||
|
||||
@AfterMethod
|
||||
public void dropTable() {
|
||||
super.dropTable();
|
||||
@ -94,30 +113,48 @@ public void toParquetTest() throws Exception {
|
||||
saveJob(job);
|
||||
executeJob(job);
|
||||
|
||||
String[] expectedOutput =
|
||||
{"'1','USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"'2','USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"'3','Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"'4','USA','2004-10-26 00:00:00.000','Palo Alto'"};
|
||||
|
||||
|
||||
Multiset<String> setLines = HashMultiset.create(Arrays.asList(expectedOutput));
|
||||
|
||||
List<String> notFound = new LinkedList<>();
|
||||
List<GenericData.Record> expectedAvroRecords = new ArrayList<>();
|
||||
expectedAvroRecords.addAll(Arrays.asList(
|
||||
new GenericRecordBuilder(avroSchema)
|
||||
.set(SqoopAvroUtils.createAvroName("id"), 1)
|
||||
.set(SqoopAvroUtils.createAvroName("country"), "USA")
|
||||
.set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 23, 0, 0, 0, 0).toDate().getTime())
|
||||
.set(SqoopAvroUtils.createAvroName("city"), "San Francisco").build(),
|
||||
new GenericRecordBuilder(avroSchema)
|
||||
.set(SqoopAvroUtils.createAvroName("id"), 2)
|
||||
.set(SqoopAvroUtils.createAvroName("country"), "USA")
|
||||
.set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 24, 0, 0, 0, 0).toDate().getTime())
|
||||
.set(SqoopAvroUtils.createAvroName("city"), "Sunnyvale").build(),
|
||||
new GenericRecordBuilder(avroSchema)
|
||||
.set(SqoopAvroUtils.createAvroName("id"), 3)
|
||||
.set(SqoopAvroUtils.createAvroName("country"), "Czech Republic")
|
||||
.set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 25, 0, 0, 0, 0).toDate().getTime())
|
||||
.set(SqoopAvroUtils.createAvroName("city"), "Brno").build(),
|
||||
new GenericRecordBuilder(avroSchema)
|
||||
.set(SqoopAvroUtils.createAvroName("id"), 4)
|
||||
.set(SqoopAvroUtils.createAvroName("country"), "USA")
|
||||
.set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 26, 0, 0, 0, 0).toDate().getTime())
|
||||
.set(SqoopAvroUtils.createAvroName("city"), "Palo Alto").build(),
|
||||
new GenericRecordBuilder(avroSchema)
|
||||
.set(SqoopAvroUtils.createAvroName("id"), 5)
|
||||
.set(SqoopAvroUtils.createAvroName("country"), "USA")
|
||||
.set(SqoopAvroUtils.createAvroName("some_date"), new org.joda.time.DateTime(2004, 10, 27, 0, 0, 0, 0).toDate().getTime())
|
||||
.set(SqoopAvroUtils.createAvroName("city"), "Martha's Vineyard").build()
|
||||
));
|
||||
List<GenericRecord> notFound = new LinkedList<>();
|
||||
|
||||
Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
|
||||
for (Path file : files) {
|
||||
ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
|
||||
GenericRecord record;
|
||||
while ((record = avroParquetReader.read()) != null) {
|
||||
String recordAsLine = recordToLine(record);
|
||||
if (!setLines.remove(recordAsLine)) {
|
||||
notFound.add(recordAsLine);
|
||||
if (!expectedAvroRecords.remove(record)) {
|
||||
notFound.add(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!setLines.isEmpty() || !notFound.isEmpty()) {
|
||||
if (!expectedAvroRecords.isEmpty() || !notFound.isEmpty()) {
|
||||
fail("Output do not match expectations.");
|
||||
}
|
||||
}
|
||||
@ -126,12 +163,6 @@ public void toParquetTest() throws Exception {
|
||||
public void fromParquetTest() throws Exception {
|
||||
createTableCities();
|
||||
|
||||
Schema sqoopSchema = new Schema("cities");
|
||||
sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
|
||||
sqoopSchema.addColumn(new Text("country"));
|
||||
sqoopSchema.addColumn(new DateTime("some_date", true, false));
|
||||
sqoopSchema.addColumn(new Text("city"));
|
||||
|
||||
HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
@ -141,8 +172,8 @@ public void fromParquetTest() throws Exception {
|
||||
new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
|
||||
sqoopSchema, conf, null);
|
||||
|
||||
parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'");
|
||||
parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'");
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV("1,'USA','2004-10-23 00:00:00.000','San Francisco'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
|
||||
parquetWriter.destroy();
|
||||
|
||||
@ -150,8 +181,9 @@ public void fromParquetTest() throws Exception {
|
||||
new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")),
|
||||
sqoopSchema, conf, null);
|
||||
|
||||
parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'");
|
||||
parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'");
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV("4,'USA','2004-10-26 00:00:00.000','Palo Alto'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
parquetWriter.write(SqoopIDFUtils.fromCSV("5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'", sqoopSchema), SqoopIDFUtils.DEFAULT_NULL_VALUE);
|
||||
|
||||
parquetWriter.destroy();
|
||||
|
||||
@ -172,20 +204,12 @@ public void fromParquetTest() throws Exception {
|
||||
saveJob(job);
|
||||
|
||||
executeJob(job);
|
||||
assertEquals(provider.rowCount(getTableName()), 4);
|
||||
Assert.assertEquals(provider.rowCount(getTableName()), 5);
|
||||
assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
|
||||
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
|
||||
assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
}
|
||||
|
||||
public String recordToLine(GenericRecord genericRecord) {
|
||||
String line = "";
|
||||
line += "\'" + String.valueOf(genericRecord.get(0)) + "\',";
|
||||
line += "\'" + String.valueOf(genericRecord.get(1)) + "\',";
|
||||
line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',";
|
||||
line += "\'" + String.valueOf(genericRecord.get(3)) + "\'";
|
||||
return line;
|
||||
assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -108,7 +108,8 @@ public void testImportExport() throws Exception {
|
||||
"1,'USA','2004-10-23','San Francisco'",
|
||||
"2,'USA','2004-10-24','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25','Brno'",
|
||||
"4,'USA','2004-10-26','Palo Alto'"
|
||||
"4,'USA','2004-10-26','Palo Alto'",
|
||||
"5,'USA','2004-10-27','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
// This re-creates the table completely
|
||||
@ -129,6 +130,7 @@ public void testImportExport() throws Exception {
|
||||
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
|
||||
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
|
||||
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
|
||||
assertRowInCities(4, "USA", "2004-10-27", "Martha's Vineyard");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -52,7 +52,8 @@ public void testBasic() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
// RDBMS link
|
||||
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
|
||||
@ -80,10 +81,11 @@ public void testBasic() throws Exception {
|
||||
|
||||
executeJob(job);
|
||||
|
||||
assertEquals(4L, provider.rowCount(getTableName()));
|
||||
assertEquals(5L, provider.rowCount(getTableName()));
|
||||
assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
|
||||
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
|
||||
assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
|
||||
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
|
||||
assertRowInCities(5, "USA", Timestamp.valueOf("2004-10-27 00:00:00.000"), "Martha's Vineyard");
|
||||
}
|
||||
}
|
||||
|
@ -73,7 +73,8 @@ public void testCities() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
@ -157,7 +158,8 @@ public void testColumns() throws Exception {
|
||||
"1,'USA'",
|
||||
"2,'USA'",
|
||||
"3,'Czech Republic'",
|
||||
"4,'USA'"
|
||||
"4,'USA'",
|
||||
"5,'USA'"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
@ -197,7 +199,8 @@ public void testSql() throws Exception {
|
||||
"1",
|
||||
"2",
|
||||
"3",
|
||||
"4"
|
||||
"4",
|
||||
"5"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
@ -243,7 +246,8 @@ public void testDuplicateColumns() throws Exception {
|
||||
"1,1",
|
||||
"2,2",
|
||||
"3,3",
|
||||
"4,4"
|
||||
"4,4",
|
||||
"5,5"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
@ -292,7 +296,8 @@ public void testAllowNullsWithOneExtractor() throws Exception {
|
||||
"1,'USA','2004-10-23 00:00:00.000','San Francisco'",
|
||||
"2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
|
||||
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
|
||||
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
|
@ -100,7 +100,8 @@ public void testCities() throws Exception {
|
||||
"\"1\"",
|
||||
"\"2\"",
|
||||
"\"3\"",
|
||||
"\"4\""
|
||||
"\"4\"",
|
||||
"\"5\""
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user