mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 20:39:58 +08:00
SQOOP-3318: Remove Kite dependency from test cases
(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
parent
72c5cd717e
commit
11c83f6838
@ -19,8 +19,14 @@
|
||||
package org.apache.sqoop.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
public final class FileSystemUtil {
|
||||
private FileSystemUtil() {
|
||||
@ -42,4 +48,18 @@ public static Path makeQualified(Path path, Configuration conf)
|
||||
|
||||
return path.getFileSystem(conf).makeQualified(path);
|
||||
}
|
||||
|
||||
public static boolean isFile(Path path, Configuration conf) throws IOException {
|
||||
return path.getFileSystem(conf).isFile(path);
|
||||
}
|
||||
|
||||
public static List<Path> listFiles(Path path, Configuration conf) throws IOException {
|
||||
List<Path> result = new ArrayList<>();
|
||||
FileSystem fileSystem = path.getFileSystem(conf);
|
||||
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(path, false);
|
||||
while (files.hasNext()) {
|
||||
result.add(files.next().getPath());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -23,12 +23,12 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
|
||||
@ -36,10 +36,8 @@
|
||||
import org.apache.sqoop.testutil.ImportJobTestCase;
|
||||
import org.apache.sqoop.tool.ImportAllTablesTool;
|
||||
import org.junit.Test;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetReader;
|
||||
import org.kitesdk.data.Datasets;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -180,7 +178,6 @@ public void testMultiTableImportAsParquetFormat() throws IOException {
|
||||
int i = 0;
|
||||
for (String tableName : this.tableNames) {
|
||||
Path tablePath = new Path(warehousePath, tableName);
|
||||
Dataset dataset = Datasets.load("dataset:file:" + tablePath);
|
||||
|
||||
// dequeue the expected value for this table. This
|
||||
// list has the same order as the tableNames list.
|
||||
@ -188,16 +185,9 @@ public void testMultiTableImportAsParquetFormat() throws IOException {
|
||||
+ this.expectedStrings.get(0);
|
||||
this.expectedStrings.remove(0);
|
||||
|
||||
DatasetReader<GenericRecord> reader = dataset.newReader();
|
||||
try {
|
||||
GenericRecord record = reader.next();
|
||||
String line = record.get(0) + "," + record.get(1);
|
||||
assertEquals("Table " + tableName + " expected a different string",
|
||||
expectedVal, line);
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<String> result = new ParquetReader(tablePath).readAllInCsv();
|
||||
assertEquals("Table " + tableName + " expected a different string",
|
||||
singletonList(expectedVal), result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,13 +48,10 @@
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetReader;
|
||||
import org.kitesdk.data.Datasets;
|
||||
|
||||
import static org.apache.avro.generic.GenericData.Record;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
@ -298,21 +295,11 @@ private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
|
||||
{
|
||||
Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
|
||||
DatasetReader<Record> datasetReader = null;
|
||||
try {
|
||||
datasetReader = parquetRecords.newReader();
|
||||
for (GenericRecord genericRecord : datasetReader) {
|
||||
if (valueMatches(genericRecord, record)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (datasetReader != null) {
|
||||
datasetReader.close();
|
||||
private boolean checkParquetFileForLine(Path path, List<Integer> record) throws IOException {
|
||||
List<GenericRecord> resultRecords = new ParquetReader(path.getParent()).readAll();
|
||||
for (GenericRecord resultRecord : resultRecords) {
|
||||
if (valueMatches(resultRecord, record)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -330,7 +317,7 @@ protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayou
|
||||
result = checkAvroFileForLine(fs, p, record);
|
||||
break;
|
||||
case ParquetFile:
|
||||
result = checkParquetFileForLine(fs, p, record);
|
||||
result = checkParquetFileForLine(p, record);
|
||||
break;
|
||||
}
|
||||
return result;
|
||||
|
@ -18,6 +18,10 @@
|
||||
|
||||
package org.apache.sqoop;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.testutil.ExportJobTestCase;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.avro.Schema;
|
||||
@ -28,7 +32,7 @@
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.kitesdk.data.*;
|
||||
import parquet.avro.AvroParquetWriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -39,9 +43,13 @@
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
|
||||
import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
|
||||
import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
|
||||
|
||||
|
||||
/**
|
||||
@ -121,30 +129,45 @@ public Schema getColumnParquetSchema() {
|
||||
|
||||
/**
|
||||
* Create a data file that gets exported to the db.
|
||||
* @param fileNum the number of the file (for multi-file export)
|
||||
* Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files
|
||||
* but since we do not use Kite in our test cases anymore we generate the .metadata directory here.
|
||||
* @param numRecords how many records to write to the file.
|
||||
*/
|
||||
protected void createParquetFile(int fileNum, int numRecords,
|
||||
protected void createParquetFile(int numRecords,
|
||||
ColumnGenerator... extraCols) throws IOException {
|
||||
|
||||
String uri = "dataset:file:" + getTablePath();
|
||||
Schema schema = buildSchema(extraCols);
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(schema)
|
||||
.format(Formats.PARQUET)
|
||||
.build();
|
||||
Dataset dataset = Datasets.create(uri, descriptor);
|
||||
DatasetWriter writer = dataset.newWriter();
|
||||
try {
|
||||
|
||||
createMetadataDir(schema);
|
||||
String fileName = UUID.randomUUID().toString() + ".parquet";
|
||||
Path filePath = new Path(getTablePath(), fileName);
|
||||
try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) {
|
||||
for (int i = 0; i < numRecords; i++) {
|
||||
GenericRecord record = new GenericData.Record(schema);
|
||||
record.put("id", i);
|
||||
record.put("msg", getMsgPrefix() + i);
|
||||
addExtraColumns(record, i, extraCols);
|
||||
writer.write(record);
|
||||
parquetWriter.write(record);
|
||||
}
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void createMetadataDir(Schema schema) throws IOException {
|
||||
final String descriptorFileTemplate = "location=file\\:%s\n" +
|
||||
" version=1\n" +
|
||||
" compressionType=snappy\n" +
|
||||
" format=parquet\n";
|
||||
Path metadataDirPath = new Path(getTablePath(), ".metadata");
|
||||
Path schemaFile = new Path(metadataDirPath, "schema.avsc");
|
||||
Path descriptorFile = new Path(metadataDirPath, "descriptor.properties");
|
||||
FileSystem fileSystem = getTablePath().getFileSystem(new Configuration());
|
||||
fileSystem.mkdirs(metadataDirPath);
|
||||
|
||||
try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) {
|
||||
fileOs.write(schema.toString().getBytes());
|
||||
}
|
||||
try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) {
|
||||
fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
@ -352,7 +375,7 @@ public void testSupportedParquetTypes() throws IOException, SQLException {
|
||||
colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
|
||||
"a", "VARCHAR(8)"),
|
||||
};
|
||||
createParquetFile(0, TOTAL_RECORDS, gens);
|
||||
createParquetFile(TOTAL_RECORDS, gens);
|
||||
createTable(gens);
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(TOTAL_RECORDS);
|
||||
@ -372,7 +395,7 @@ public void testNullableField() throws IOException, SQLException {
|
||||
Schema schema = Schema.createUnion(childSchemas);
|
||||
ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)");
|
||||
ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)");
|
||||
createParquetFile(0, TOTAL_RECORDS, gen0, gen1);
|
||||
createParquetFile(TOTAL_RECORDS, gen0, gen1);
|
||||
createTable(gen0, gen1);
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(TOTAL_RECORDS);
|
||||
@ -392,7 +415,7 @@ public void testParquetRecordsNotSupported() throws IOException, SQLException {
|
||||
record.put("myint", 100);
|
||||
// DB type is not used so can be anything:
|
||||
ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)");
|
||||
createParquetFile(0, TOTAL_RECORDS, gen);
|
||||
createParquetFile(TOTAL_RECORDS, gen);
|
||||
createTable(gen);
|
||||
|
||||
thrown.expect(Exception.class);
|
||||
@ -409,7 +432,7 @@ public void testMissingDatabaseFields() throws IOException, SQLException {
|
||||
// the Parquet value will not be exported
|
||||
ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT),
|
||||
null, null);
|
||||
createParquetFile(0, TOTAL_RECORDS, gen);
|
||||
createParquetFile(TOTAL_RECORDS, gen);
|
||||
createTable(gen);
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(TOTAL_RECORDS);
|
||||
@ -419,7 +442,7 @@ public void testMissingDatabaseFields() throws IOException, SQLException {
|
||||
public void testParquetWithUpdateKey() throws IOException, SQLException {
|
||||
String[] argv = { "--update-key", "ID" };
|
||||
final int TOTAL_RECORDS = 1;
|
||||
createParquetFile(0, TOTAL_RECORDS);
|
||||
createParquetFile(TOTAL_RECORDS);
|
||||
createTableWithInsert();
|
||||
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
|
||||
verifyExport(getMsgPrefix() + "0");
|
||||
@ -432,7 +455,7 @@ public void testParquetWithUpsert() throws IOException, SQLException {
|
||||
final int TOTAL_RECORDS = 2;
|
||||
// ColumnGenerator gen = colGenerator("100",
|
||||
// Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
|
||||
createParquetFile(0, TOTAL_RECORDS);
|
||||
createParquetFile(TOTAL_RECORDS);
|
||||
createTableWithInsert();
|
||||
|
||||
thrown.expect(Exception.class);
|
||||
@ -447,7 +470,7 @@ public void testMissingParquetFields() throws IOException, SQLException {
|
||||
|
||||
// null Parquet schema means don't create an Parquet field
|
||||
ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)");
|
||||
createParquetFile(0, TOTAL_RECORDS, gen);
|
||||
createParquetFile(TOTAL_RECORDS, gen);
|
||||
createTable(gen);
|
||||
|
||||
thrown.expect(Exception.class);
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.sqoop;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.sqoop.testutil.CommonArgs;
|
||||
import org.apache.sqoop.testutil.HsqldbTestServer;
|
||||
import org.apache.sqoop.testutil.ImportJobTestCase;
|
||||
@ -28,11 +30,12 @@
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.Test;
|
||||
import org.kitesdk.data.CompressionType;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetReader;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import parquet.format.CompressionCodec;
|
||||
import parquet.hadoop.Footer;
|
||||
import parquet.hadoop.ParquetFileReader;
|
||||
import parquet.hadoop.metadata.ParquetMetadata;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -42,7 +45,6 @@
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -119,10 +121,16 @@ public void testSnappyCompression() throws IOException {
|
||||
|
||||
@Test
|
||||
public void testDeflateCompression() throws IOException {
|
||||
runParquetImportTest("deflate");
|
||||
// The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
|
||||
// See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
|
||||
runParquetImportTest("deflate", "gzip");
|
||||
}
|
||||
|
||||
private void runParquetImportTest(String codec) throws IOException {
|
||||
runParquetImportTest(codec, codec);
|
||||
}
|
||||
|
||||
private void runParquetImportTest(String codec, String expectedCodec) throws IOException {
|
||||
String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
|
||||
"VARBINARY(2)",};
|
||||
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
|
||||
@ -131,7 +139,7 @@ private void runParquetImportTest(String codec) throws IOException {
|
||||
String [] extraArgs = { "--compression-codec", codec};
|
||||
runImport(getOutputArgv(true, extraArgs));
|
||||
|
||||
assertEquals(CompressionType.forName(codec), getCompressionType());
|
||||
assertEquals(expectedCodec.toUpperCase(), getCompressionType());
|
||||
|
||||
Schema schema = getSchema();
|
||||
assertEquals(Type.RECORD, schema.getType());
|
||||
@ -145,25 +153,21 @@ private void runParquetImportTest(String codec) throws IOException {
|
||||
checkField(fields.get(5), "DATA_COL5", Type.STRING);
|
||||
checkField(fields.get(6), "DATA_COL6", Type.BYTES);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
GenericRecord record1 = reader.next();
|
||||
assertNotNull(record1);
|
||||
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
|
||||
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
|
||||
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
|
||||
assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
|
||||
assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
|
||||
assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
|
||||
Object object = record1.get("DATA_COL6");
|
||||
assertTrue(object instanceof ByteBuffer);
|
||||
ByteBuffer b = ((ByteBuffer) object);
|
||||
assertEquals((byte) 1, b.get(0));
|
||||
assertEquals((byte) 2, b.get(1));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertNotNull(record1);
|
||||
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
|
||||
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
|
||||
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
|
||||
assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
|
||||
assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
|
||||
assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
|
||||
Object object = record1.get("DATA_COL6");
|
||||
assertTrue(object instanceof ByteBuffer);
|
||||
ByteBuffer b = ((ByteBuffer) object);
|
||||
assertEquals((byte) 1, b.get(0));
|
||||
assertEquals((byte) 2, b.get(1));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -181,15 +185,10 @@ public void testOverrideTypeMapping() throws IOException {
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "DATA_COL0", Type.STRING);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -207,15 +206,10 @@ public void testFirstUnderscoreInColumnName() throws IOException {
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "__NAME", Type.INT);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals("__NAME", 1987, record1.get("__NAME"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertEquals("__NAME", 1987, record1.get("__NAME"));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -233,15 +227,10 @@ public void testNonIdentCharactersInColumnName() throws IOException {
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "TEST_P_A_R_QUET", Type.INT);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -252,15 +241,10 @@ public void testNullableParquetImport() throws IOException, SQLException {
|
||||
|
||||
runImport(getOutputArgv(true, null));
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertNull(record1.get("DATA_COL0"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertNull(record1.get("DATA_COL0"));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -271,15 +255,10 @@ public void testQueryImport() throws IOException, SQLException {
|
||||
|
||||
runImport(getOutputQueryArgv(true, null));
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
assertEquals(1, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -291,17 +270,12 @@ public void testIncrementalParquetImport() throws IOException, SQLException {
|
||||
runImport(getOutputArgv(true, null));
|
||||
runImport(getOutputArgv(true, new String[]{"--append"}));
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
assertTrue(reader.hasNext());
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
record1 = reader.next();
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
assertFalse(reader.hasNext());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
|
||||
GenericRecord record1 = genericRecords.get(0);
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
record1 = genericRecords.get(1);
|
||||
assertEquals(1, record1.get("DATA_COL0"));
|
||||
assertEquals(2, genericRecords.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -319,30 +293,26 @@ public void testOverwriteParquetDatasetFail() throws IOException, SQLException {
|
||||
}
|
||||
}
|
||||
|
||||
private CompressionType getCompressionType() {
|
||||
return getDataset().getDescriptor().getCompressionType();
|
||||
private String getCompressionType() {
|
||||
ParquetMetadata parquetMetadata = getOutputMetadata();
|
||||
CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec();
|
||||
return parquetCompressionCodec.name();
|
||||
}
|
||||
|
||||
private ParquetMetadata getOutputMetadata() {
|
||||
try {
|
||||
Configuration config = new Configuration();
|
||||
FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath());
|
||||
List<Footer> footers = ParquetFileReader.readFooters(config, fileStatus, false);
|
||||
return footers.get(0).getParquetMetadata();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Schema getSchema() {
|
||||
return getDataset().getDescriptor().getSchema();
|
||||
}
|
||||
|
||||
private DatasetReader<GenericRecord> getReader() {
|
||||
return getDataset().newReader();
|
||||
}
|
||||
|
||||
private Dataset<GenericRecord> getDataset() {
|
||||
String uri = "dataset:file:" + getTablePath();
|
||||
return Datasets.load(uri, GenericRecord.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() {
|
||||
super.tearDown();
|
||||
String uri = "dataset:file:" + getTablePath();
|
||||
if (Datasets.exists(uri)) {
|
||||
Datasets.delete(uri);
|
||||
}
|
||||
String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
|
||||
return new Schema.Parser().parse(schemaString);
|
||||
}
|
||||
|
||||
private void checkField(Field field, String name, Type type) {
|
||||
|
@ -23,14 +23,12 @@
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.sqoop.Sqoop;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -38,6 +36,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.sqoop.avro.AvroSchemaMismatchException;
|
||||
import org.apache.sqoop.mapreduce.ParquetJob;
|
||||
import org.apache.sqoop.util.ParquetReader;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -54,11 +53,8 @@
|
||||
import org.apache.sqoop.tool.SqoopTool;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetReader;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.Formats;
|
||||
|
||||
import static java.util.Collections.sort;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -301,43 +297,39 @@ public void testNormalHiveImportAsParquet() throws IOException {
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
|
||||
new ImportTool());
|
||||
verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
}
|
||||
|
||||
private void verifyHiveDataset(String tableName, Object[][] valsArray) {
|
||||
String datasetUri = String.format("dataset:hive:default/%s",
|
||||
tableName.toLowerCase());
|
||||
assertTrue(Datasets.exists(datasetUri));
|
||||
Dataset dataset = Datasets.load(datasetUri);
|
||||
assertFalse(dataset.isEmpty());
|
||||
private void verifyHiveDataset(Object[][] valsArray) {
|
||||
List<String> expected = getExpectedLines(valsArray);
|
||||
List<String> result = new ParquetReader(getTablePath()).readAllInCsv();
|
||||
|
||||
DatasetReader<GenericRecord> reader = dataset.newReader();
|
||||
try {
|
||||
List<String> expectations = new ArrayList<String>();
|
||||
if (valsArray != null) {
|
||||
for (Object[] vals : valsArray) {
|
||||
expectations.add(Arrays.toString(vals));
|
||||
}
|
||||
}
|
||||
sort(expected);
|
||||
sort(result);
|
||||
|
||||
while (reader.hasNext() && expectations.size() > 0) {
|
||||
String actual = Arrays.toString(
|
||||
convertGenericRecordToArray(reader.next()));
|
||||
assertTrue("Expect record: " + actual, expectations.remove(actual));
|
||||
}
|
||||
assertFalse(reader.hasNext());
|
||||
assertEquals(0, expectations.size());
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
assertEquals(expected, result);
|
||||
}
|
||||
|
||||
private static Object[] convertGenericRecordToArray(GenericRecord record) {
|
||||
Object[] result = new Object[record.getSchema().getFields().size()];
|
||||
for (int i = 0; i < result.length; i++) {
|
||||
result[i] = record.get(i);
|
||||
private List<String> getExpectedLines(Object[][] valsArray) {
|
||||
List<String> expectations = new ArrayList<>();
|
||||
if (valsArray != null) {
|
||||
for (Object[] vals : valsArray) {
|
||||
expectations.add(toCsv(vals));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return expectations;
|
||||
}
|
||||
|
||||
private String toCsv(Object[] vals) {
|
||||
StringBuilder result = new StringBuilder();
|
||||
|
||||
for (Object val : vals) {
|
||||
result.append(val).append(",");
|
||||
}
|
||||
|
||||
result.deleteCharAt(result.length() - 1);
|
||||
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
/** Test that table is created in hive with no data import. */
|
||||
@ -388,13 +380,13 @@ public void testCreateOverwriteHiveImportAsParquet() throws IOException {
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
|
||||
verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
|
||||
String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
|
||||
String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
|
||||
runImportTest(TABLE_NAME, types, valsToOverwrite, "",
|
||||
getArgv(false, extraArgsForOverwrite), tool);
|
||||
verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
|
||||
verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}});
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -430,7 +422,7 @@ private void createHiveDataSet(String tableName) {
|
||||
.name(getColName(2)).type().nullable().stringType().noDefault()
|
||||
.endRecord();
|
||||
String dataSetUri = "dataset:hive:/default/" + tableName;
|
||||
ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri);
|
||||
ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()), dataSetUri);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -448,11 +440,11 @@ public void testAppendHiveImportAsParquet() throws IOException {
|
||||
ImportTool tool = new ImportTool();
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals, "", args, tool);
|
||||
verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
|
||||
verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
|
||||
|
||||
String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
|
||||
runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
|
||||
verifyHiveDataset(TABLE_NAME, new Object[][] {
|
||||
verifyHiveDataset(new Object[][] {
|
||||
{"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
|
||||
}
|
||||
|
||||
|
141
src/test/org/apache/sqoop/util/ParquetReader.java
Normal file
141
src/test/org/apache/sqoop/util/ParquetReader.java
Normal file
@ -0,0 +1,141 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.util;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import parquet.avro.AvroParquetReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.sqoop.util.FileSystemUtil.isFile;
|
||||
import static org.apache.sqoop.util.FileSystemUtil.listFiles;
|
||||
|
||||
public class ParquetReader implements AutoCloseable {
|
||||
|
||||
private final Path pathToRead;
|
||||
|
||||
private final Configuration configuration;
|
||||
|
||||
private final Deque<Path> filesToRead;
|
||||
|
||||
private parquet.hadoop.ParquetReader<GenericRecord> reader;
|
||||
|
||||
public ParquetReader(Path pathToRead, Configuration configuration) {
|
||||
this.pathToRead = pathToRead;
|
||||
this.configuration = configuration;
|
||||
this.filesToRead = new ArrayDeque<>(determineFilesToRead());
|
||||
initReader(filesToRead.removeFirst());
|
||||
}
|
||||
|
||||
public ParquetReader(Path pathToRead) {
|
||||
this(pathToRead, new Configuration());
|
||||
}
|
||||
|
||||
public GenericRecord next() throws IOException {
|
||||
GenericRecord result = reader.read();
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
if (!filesToRead.isEmpty()) {
|
||||
initReader(filesToRead.removeFirst());
|
||||
return next();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<GenericRecord> readAll() {
|
||||
List<GenericRecord> result = new ArrayList<>();
|
||||
|
||||
GenericRecord record;
|
||||
try {
|
||||
while ((record = next()) != null) {
|
||||
result.add(record);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<String> readAllInCsv() {
|
||||
List<String> result = new ArrayList<>();
|
||||
|
||||
for (GenericRecord record : readAll()) {
|
||||
result.add(convertToCsv(record));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private String convertToCsv(GenericRecord record) {
|
||||
StringBuilder result = new StringBuilder();
|
||||
for (int i = 0; i < record.getSchema().getFields().size(); i++) {
|
||||
result.append(record.get(i));
|
||||
result.append(",");
|
||||
}
|
||||
result.deleteCharAt(result.length() - 1);
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
private void initReader(Path file) {
|
||||
try {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
this.reader = AvroParquetReader.<GenericRecord>builder(file).build();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<Path> determineFilesToRead() {
|
||||
try {
|
||||
if (isFile(pathToRead, configuration)) {
|
||||
return Collections.singletonList(pathToRead);
|
||||
}
|
||||
|
||||
return listFiles(pathToRead, configuration);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (reader != null) {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user