diff --git a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties index c0082cc0..0311f880 100644 --- a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties +++ b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties @@ -52,6 +52,8 @@ system.classes.default=java.,\ org.apache.log4j.,\ org.apache.sqoop.,\ -org.apache.sqoop.connector.,\ + org.apache.avro.,\ + org.codehaus.jackson.,\ org.xerial.snappy.,\ sqoop.properties,\ sqoop_bootstrap.properties diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index 59963141..37cf3fa1 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -73,6 +73,16 @@ limitations under the License. provided + + org.apache.parquet + parquet-hadoop + + + + org.apache.parquet + parquet-avro + + diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 9ef2a051..5973463e 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -19,10 +19,14 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; @@ -33,13 +37,18 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; @@ -55,6 +64,10 @@ public class HdfsExtractor extends Extractor recordReader = parquetInputFormat.createRecordReader(fileSplit, taskAttemptContext); + recordReader.initialize(fileSplit, taskAttemptContext); + + AVROIntermediateDataFormat idf = new AVROIntermediateDataFormat(schema); + while (recordReader.nextKeyValue() != false) { + GenericRecord record = recordReader.getCurrentValue(); + rowsRead++; + if (schema instanceof ByteArraySchema) { + dataWriter.writeArrayRecord(new Object[]{idf.toObject(record)}); + } else { + dataWriter.writeArrayRecord(idf.toObject(record)); + } + } + } + @Override public long getRowsRead() { return rowsRead; @@ -207,6 +251,41 @@ private boolean isSequenceFile(Path file) { return true; } + private boolean isParquetFile(Path file) { + try { + FileSystem fileSystem = file.getFileSystem(conf); + FileStatus fileStatus = fileSystem.getFileStatus(file); + FSDataInputStream fsDataInputStream = fileSystem.open(file); + + long fileLength = fileStatus.getLen(); + + byte[] fileStart = new byte[PARQUET_MAGIC.length]; + fsDataInputStream.readFully(fileStart); + + if (LOG.isDebugEnabled()) { + LOG.error("file start: " + new String(fileStart, Charset.forName("ASCII"))); + } + + if (!Arrays.equals(fileStart, PARQUET_MAGIC)) { + return false; + } + + long fileEndIndex = fileLength - PARQUET_MAGIC.length; + fsDataInputStream.seek(fileEndIndex); + + byte[] fileEnd = new byte[PARQUET_MAGIC.length]; + fsDataInputStream.readFully(fileEnd); + + if (LOG.isDebugEnabled()) { + LOG.error("file end: " + new String(fileEnd, Charset.forName("ASCII"))); + } + + return Arrays.equals(fileEnd, PARQUET_MAGIC); + } catch (IOException e) { + return false; + } + } + private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException { if (schema instanceof ByteArraySchema) { dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)}); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 5de20c62..7cef93c3 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -32,6 +32,7 @@ import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; import org.apache.sqoop.error.code.HdfsConnectorError; @@ -89,7 +90,7 @@ public Void run() throws Exception { GenericHdfsWriter filewriter = getWriter(toJobConfig); - filewriter.initialize(filepath, conf, codec); + filewriter.initialize(filepath, context.getSchema(), conf, codec); if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { String record; @@ -119,8 +120,14 @@ public Void run() throws Exception { } private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) { - return (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) ? new HdfsSequenceWriter() - : new HdfsTextWriter(); + switch(toJobConf.toJobConfig.outputFormat) { + case SEQUENCE_FILE: + return new HdfsSequenceWriter(); + case PARQUET_FILE: + return new HdfsParquetWriter(); + default: + return new HdfsTextWriter(); + } } private String getCompressionCodecName(ToJobConfiguration toJobConf) { @@ -151,11 +158,16 @@ private String getCompressionCodecName(ToJobConfiguration toJobConf) { //TODO: We should probably support configurable extensions at some point private static String getExtension(ToJobConfiguration toJobConf, CompressionCodec codec) { - if (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) - return ".seq"; - if (codec == null) - return ".txt"; - return codec.getDefaultExtension(); + switch(toJobConf.toJobConfig.outputFormat) { + case SEQUENCE_FILE: + return ".seq"; + case PARQUET_FILE: + return ".parquet"; + default: + if (codec == null) + return ".txt"; + return codec.getDefaultExtension(); + } } /* (non-Javadoc) diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java index 27d121f5..ffce583a 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java @@ -30,4 +30,9 @@ public enum ToFormat { * Sequence file */ SEQUENCE_FILE, + + /** + * Parquet file + */ + PARQUET_FILE, } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java index 2ccccc4a..31023e73 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java @@ -20,12 +20,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.schema.Schema; import java.io.IOException; public abstract class GenericHdfsWriter { - public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException; + public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException; public abstract void write(String csv) throws IOException; diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java new file mode 100644 index 00000000..4ec813b0 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; +import org.apache.sqoop.schema.Schema; + +import java.io.IOException; + +public class HdfsParquetWriter extends GenericHdfsWriter { + + private ParquetWriter avroParquetWriter; + private Schema sqoopSchema; + private AVROIntermediateDataFormat avroIntermediateDataFormat; + + @Override + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec hadoopCodec) throws IOException { + sqoopSchema = schema; + avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema); + + CompressionCodecName parquetCodecName; + if (hadoopCodec == null) { + parquetCodecName = CompressionCodecName.UNCOMPRESSED; + } else { + parquetCodecName = CompressionCodecName.fromCompressionCodec(hadoopCodec.getClass()); + } + + avroParquetWriter = + AvroParquetWriter.builder(filepath) + .withSchema(avroIntermediateDataFormat.getAvroSchema()) + .withCompressionCodec(parquetCodecName) + .withConf(conf).build(); + + } + + @Override + public void write(String csv) throws IOException { + avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv)); + } + + @Override + public void destroy() throws IOException { + avroParquetWriter.close(); + } +} diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java index 75c2e7ef..dcce8617 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -23,16 +23,17 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.schema.Schema; import java.io.IOException; -public class HdfsSequenceWriter extends GenericHdfsWriter { +public class HdfsSequenceWriter extends GenericHdfsWriter { private SequenceFile.Writer filewriter; private Text text; @SuppressWarnings("deprecation") - public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), conf, filepath, Text.class, NullWritable.class, diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java index 78cf9732..384e3309 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.sqoop.connector.hdfs.HdfsConstants; +import org.apache.sqoop.schema.Schema; import java.io.BufferedWriter; import java.io.DataOutputStream; @@ -34,7 +35,7 @@ public class HdfsTextWriter extends GenericHdfsWriter { private BufferedWriter filewriter; @Override - public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { FileSystem fs = filepath.getFileSystem(conf); DataOutputStream filestream = fs.create(filepath, false); diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index adede3a7..cbd555a8 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -17,9 +17,6 @@ */ package org.apache.sqoop.connector.hdfs; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -27,6 +24,7 @@ import java.util.HashMap; import java.util.List; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -35,11 +33,17 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToCompression; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; @@ -47,13 +51,18 @@ import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; import org.apache.sqoop.schema.type.Text; -import org.testng.annotations.AfterMethod; +import org.apache.sqoop.utils.ClassUtils; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.PARQUET_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; + public class TestLoader extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; @@ -63,6 +72,7 @@ public class TestLoader extends TestHdfsBase { private final String outputDirectory; private Loader loader; private String user = "test_user"; + private Schema schema; @Factory(dataProvider="test-hdfs-loader") public TestLoader(ToFormat outputFormat, @@ -80,9 +90,10 @@ public static Object[][] data() { for (ToCompression compression : new ToCompression[]{ ToCompression.DEFAULT, ToCompression.BZIP2, + ToCompression.GZIP, ToCompression.NONE }) { - for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { + for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE, PARQUET_FILE}) { parameters.add(new Object[]{outputFileType, compression}); } } @@ -100,7 +111,7 @@ public void tearDown() throws IOException { @Test public void testLoader() throws Exception { FileSystem fs = FileSystem.get(new Configuration()); - Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) + schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) .addColumn(new FloatingPoint("col2", 4L)) .addColumn(new Text("col3")); @@ -130,14 +141,22 @@ public Object readContent() { assertTestUser(user); return null; } - }, null, user); + }, schema, user); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.compression = compression; jobConf.toJobConfig.outputFormat = outputFormat; Path outputPath = new Path(outputDirectory); - loader.load(context, linkConf, jobConf); + try { + loader.load(context, linkConf, jobConf); + } catch (Exception e) { + // we may wait to fail if the compression format selected is not supported by the + // output format + Assert.assertTrue(compressionNotSupported()); + return; + } + Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { @@ -152,10 +171,26 @@ public Object readContent() { Assert.assertEquals(5, fs.listStatus(outputPath).length); } + private boolean compressionNotSupported() { + switch (outputFormat) { + case SEQUENCE_FILE: + return compression == ToCompression.GZIP; + case PARQUET_FILE: + return compression == ToCompression.BZIP2 || compression == ToCompression.DEFAULT; + } + return false; + } + @Test public void testOverrideNull() throws Exception { + // Parquet supports an actual "null" value so overriding null would not make + // sense here + if (outputFormat == PARQUET_FILE) { + return; + } + FileSystem fs = FileSystem.get(new Configuration()); - Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) + schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) .addColumn(new FloatingPoint("col2", 8L)) .addColumn(new Text("col3")) .addColumn(new Text("col4")); @@ -199,7 +234,15 @@ public Object readContent() { jobConf.toJobConfig.nullValue = "\\N"; Path outputPath = new Path(outputDirectory); - loader.load(context, linkConf, jobConf); + try { + loader.load(context, linkConf, jobConf); + } catch (Exception e) { + // we may wait to fail if the compression format selected is not supported by the + // output format + assert(compressionNotSupported()); + return; + } + Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { @@ -214,7 +257,7 @@ public Object readContent() { Assert.assertEquals(5, fs.listStatus(outputPath).length); } - private void verifyOutput(FileSystem fs, Path file, String format) throws IOException { + private void verifyOutput(FileSystem fs, Path file, String format) throws Exception { Configuration conf = new Configuration(); FSDataInputStream fsin = fs.open(file); CompressionCodec codec; @@ -228,7 +271,9 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce case BZIP2: Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1); break; - + case GZIP: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1); + break; case DEFAULT: if(org.apache.hadoop.util.VersionInfo.getVersion().matches("\\b1\\.\\d\\.\\d")) { Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1); @@ -282,11 +327,47 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce Assert.assertEquals(line.toString(), formatRow(format, index++)); line = new org.apache.hadoop.io.Text(); } + break; + case PARQUET_FILE: + String compressionCodecClassName = ParquetFileReader.readFooter(conf, file, ParquetMetadataConverter.NO_FILTER).getBlocks().get(0).getColumns().get(0).getCodec().getHadoopCompressionCodecClassName(); + + if (compressionCodecClassName == null) { + codec = null; + } else { + codec = (CompressionCodec) ClassUtils.loadClass(compressionCodecClassName).newInstance(); + } + + // Verify compression + switch(compression) { + case GZIP: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1); + break; + + case NONE: + default: + Assert.assertNull(codec); + break; + } + + + ParquetReader avroParquetReader = AvroParquetReader.builder(file).build(); + AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(); + avroIntermediateDataFormat.setSchema(schema); + GenericRecord record; + index = 1; + while ((record = avroParquetReader.read()) != null) { + List objects = new ArrayList<>(); + for (int i = 0; i < record.getSchema().getFields().size(); i++) { + objects.add(record.get(i)); + } + Assert.assertEquals(SqoopIDFUtils.toText(avroIntermediateDataFormat.toCSV(record)), formatRow(format, index++)); + } + break; } } - private void verifyOutput(FileSystem fs, Path file) throws IOException { + private void verifyOutput(FileSystem fs, Path file) throws Exception { verifyOutput(fs, file, "%d,%f,%s"); } -} +} \ No newline at end of file diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java index 985149cb..89bc0f2c 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java @@ -43,7 +43,8 @@ public class SqoopAvroUtils { * Creates an Avro schema from a Sqoop schema. */ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) { - String name = sqoopSchema.getName(); + // avro schema names cannot start with quotes, lets just remove them + String name = sqoopSchema.getName().replace("\"", ""); String doc = sqoopSchema.getNote(); String namespace = SQOOP_SCHEMA_NAMESPACE; Schema schema = Schema.createRecord(name, doc, namespace, false); diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index ace1bdf5..e409fc12 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -148,7 +148,7 @@ public Set getJars() { return jars; } - private GenericRecord toAVRO(String csv) { + public GenericRecord toAVRO(String csv) { String[] csvStringArray = parseCSVString(csv); @@ -175,7 +175,7 @@ private GenericRecord toAVRO(String csv) { return avroObject; } - private Object toAVRO(String csvString, Column column) { + public Object toAVRO(String csvString, Column column) { Object returnValue = null; switch (column.getType()) { @@ -232,7 +232,7 @@ private Object toAVRO(String csvString, Column column) { return returnValue; } - private GenericRecord toAVRO(Object[] objectArray) { + public GenericRecord toAVRO(Object[] objectArray) { if (objectArray == null) { return null; @@ -311,7 +311,7 @@ private GenericRecord toAVRO(Object[] objectArray) { } @SuppressWarnings("unchecked") - private String toCSV(GenericRecord record) { + public String toCSV(GenericRecord record) { Column[] columns = this.schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); @@ -387,7 +387,7 @@ private String toCSV(GenericRecord record) { } @SuppressWarnings("unchecked") - private Object[] toObject(GenericRecord record) { + public Object[] toObject(GenericRecord record) { if (record == null) { return null; @@ -459,4 +459,8 @@ private Object[] toObject(GenericRecord record) { } return object; } + + public Schema getAvroSchema() { + return avroSchema; + } } diff --git a/pom.xml b/pom.xml index cb8a973a..ba0a2437 100644 --- a/pom.xml +++ b/pom.xml @@ -124,6 +124,7 @@ limitations under the License. 2.4.0 1.7 2.4.0 + 1.8.1 2.6 @@ -700,6 +701,16 @@ limitations under the License. jetty-servlet ${jetty.version} + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.apache.parquet + parquet-avro + ${parquet.version} + diff --git a/test/pom.xml b/test/pom.xml index 451352a8..134bca1d 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -175,6 +175,16 @@ limitations under the License. hadoop-common + + org.apache.parquet + parquet-hadoop + + + + org.apache.parquet + parquet-avro + + diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java index 3ec4f661..1e8c688c 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java @@ -20,17 +20,27 @@ import com.google.common.collect.HashMultiset; import com.google.common.collect.Iterables; import com.google.common.collect.Multiset; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.test.asserts.HdfsAsserts; import org.apache.sqoop.test.infrastructure.Infrastructure; import org.apache.sqoop.test.infrastructure.SqoopTestCase; @@ -51,6 +61,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) @@ -64,6 +75,9 @@ public class NullValueTest extends SqoopTestCase { // The custom nullValue to use (set to null if default) private String nullValue; + + private Schema sqoopSchema; + @DataProvider(name="nul-value-test") public static Object[][] data(ITestContext context) { String customNullValue = "^&*custom!@"; @@ -80,12 +94,19 @@ public NullValueTest(ToFormat format, String nullValue) { } @Override + public String getTestName() { return methodName + "[" + format.name() + ", " + nullValue + "]"; } @BeforeMethod public void setup() throws Exception { + sqoopSchema = new Schema("cities"); + sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true)); + sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("country")); + sqoopSchema.addColumn(new DateTime("some_date", true, false)); + sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("city")); + createTableCities(); } @@ -128,6 +149,27 @@ public void testFromHdfs() throws Exception { } sequenceFileWriter.close(); break; + case PARQUET_FILE: + // Parquet file format does not support using custom null values + if (usingCustomNullValue()) { + return; + } else { + HdfsParquetWriter parquetWriter = new HdfsParquetWriter(); + + Configuration conf = new Configuration(); + FileSystem.setDefaultUri(conf, hdfsClient.getUri()); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")), + sqoopSchema, conf, null); + + for (String line : getCsv()) { + parquetWriter.write(line); + } + + parquetWriter.destroy(); + break; + } default: Assert.fail(); } @@ -166,6 +208,11 @@ public void testFromHdfs() throws Exception { @Test public void testToHdfs() throws Exception { + // Parquet file format does not support using custom null values + if (usingCustomNullValue() && format == ToFormat.PARQUET_FILE) { + return; + } + provider.insertRow(getTableName(), 1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); @@ -203,16 +250,16 @@ public void testToHdfs() throws Exception { executeJob(job); + + Multiset setLines = HashMultiset.create(Arrays.asList(getCsv())); + Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); + List notFound = new ArrayList<>(); switch (format) { case TEXT_FILE: HdfsAsserts.assertMapreduceOutput(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv()); - break; + return; case SEQUENCE_FILE: - Multiset setLines = HashMultiset.create(Arrays.asList(getCsv())); - List notFound = new ArrayList<>(); - Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); - for(Path file : files) { SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file); SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath); @@ -224,17 +271,32 @@ public void testToHdfs() throws Exception { } } } - if(!setLines.isEmpty() || !notFound.isEmpty()) { - LOG.error("Output do not match expectations."); - LOG.error("Expected lines that weren't present in the files:"); - LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'"); - LOG.error("Extra lines in files that weren't expected:"); - LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'"); - Assert.fail("Output do not match expectations."); + break; + case PARQUET_FILE: + AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema); + notFound = new LinkedList<>(); + for (Path file : files) { + ParquetReader avroParquetReader = AvroParquetReader.builder(file).build(); + GenericRecord record; + while ((record = avroParquetReader.read()) != null) { + String recordAsCsv = avroIntermediateDataFormat.toCSV(record); + if (!setLines.remove(recordAsCsv)) { + notFound.add(recordAsCsv); + } + } } break; default: Assert.fail(); } + + if(!setLines.isEmpty() || !notFound.isEmpty()) { + LOG.error("Output do not match expectations."); + LOG.error("Expected lines that weren't present in the files:"); + LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'"); + LOG.error("Extra lines in files that weren't expected:"); + LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'"); + Assert.fail("Output do not match expectations."); + } } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java new file mode 100644 index 00000000..222c493b --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class ParquetTest extends SqoopTestCase { + + @AfterMethod + public void dropTable() { + super.dropTable(); + } + + @Test + public void toParquetTest() throws Exception { + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); + + + // Set rdbms "FROM" config + fillRdbmsFromConfig(job, "id"); + + // Fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.PARQUET_FILE); + + saveJob(job); + executeJob(job); + + String[] expectedOutput = + {"'1','USA','2004-10-23 00:00:00.000','San Francisco'", + "'2','USA','2004-10-24 00:00:00.000','Sunnyvale'", + "'3','Czech Republic','2004-10-25 00:00:00.000','Brno'", + "'4','USA','2004-10-26 00:00:00.000','Palo Alto'"}; + + + Multiset setLines = HashMultiset.create(Arrays.asList(expectedOutput)); + + List notFound = new LinkedList<>(); + + Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, getMapreduceDirectory()); + for (Path file : files) { + ParquetReader avroParquetReader = AvroParquetReader.builder(file).build(); + GenericRecord record; + while ((record = avroParquetReader.read()) != null) { + String recordAsLine = recordToLine(record); + if (!setLines.remove(recordAsLine)) { + notFound.add(recordAsLine); + } + } + } + + if (!setLines.isEmpty() || !notFound.isEmpty()) { + fail("Output do not match expectations."); + } + } + + @Test + public void fromParquetTest() throws Exception { + createTableCities(); + + Schema sqoopSchema = new Schema("cities"); + sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true)); + sqoopSchema.addColumn(new Text("country")); + sqoopSchema.addColumn(new DateTime("some_date", true, false)); + sqoopSchema.addColumn(new Text("city")); + + HdfsParquetWriter parquetWriter = new HdfsParquetWriter(); + + Configuration conf = new Configuration(); + FileSystem.setDefaultUri(conf, hdfsClient.getUri()); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")), + sqoopSchema, conf, null); + + parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'"); + parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'"); + + parquetWriter.destroy(); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")), + sqoopSchema, conf, null); + + parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'"); + parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'"); + + parquetWriter.destroy(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(hdfsLink.getName(), rdbmsLink.getName()); + fillHdfsFromConfig(job); + fillRdbmsToConfig(job); + saveJob(job); + + executeJob(job); + assertEquals(provider.rowCount(getTableName()), 4); + assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); + assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); + assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); + assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + } + + public String recordToLine(GenericRecord genericRecord) { + String line = ""; + line += "\'" + String.valueOf(genericRecord.get(0)) + "\',"; + line += "\'" + String.valueOf(genericRecord.get(1)) + "\',"; + line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',"; + line += "\'" + String.valueOf(genericRecord.get(3)) + "\'"; + return line; + } + +}