5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 17:30:41 +08:00

SQOOP-2788: Sqoop2: Parquet support for HdfsConnector

(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2016-02-05 12:51:08 -08:00
parent 2f4da466ef
commit 55d1db2ba3
16 changed files with 577 additions and 48 deletions

View File

@ -52,6 +52,8 @@ system.classes.default=java.,\
org.apache.log4j.,\
org.apache.sqoop.,\
-org.apache.sqoop.connector.,\
org.apache.avro.,\
org.codehaus.jackson.,\
org.xerial.snappy.,\
sqoop.properties,\
sqoop_bootstrap.properties

View File

@ -73,6 +73,16 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -19,10 +19,14 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
@ -33,13 +37,18 @@
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
@ -55,6 +64,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
// the sequence of bytes that appears at the beginning and end of every
// parquet file
private static final byte[] PARQUET_MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
private Configuration conf = new Configuration();
private DataWriter dataWriter;
private Schema schema;
@ -85,7 +98,7 @@ public Void run() throws Exception {
private void extractFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
Path file, long start, long length, String[] locations)
throws IOException {
throws IOException, InterruptedException {
long end = start + length;
LOG.info("Extracting file " + file);
LOG.info("\t from offset " + start);
@ -93,8 +106,10 @@ private void extractFile(LinkConfiguration linkConfiguration,
LOG.info("\t of length " + length);
if(isSequenceFile(file)) {
extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} else {
extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} else if(isParquetFile(file)) {
extractParquetFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
} else {
extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
}
}
@ -136,7 +151,7 @@ private void extractSequenceFile(LinkConfiguration linkConfiguration,
@SuppressWarnings("resource")
private void extractTextFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
Path file, long start, long length, String[] locations)
Path file, long start, long length)
throws IOException {
LOG.info("Extracting text file");
long end = start + length;
@ -185,6 +200,35 @@ private void extractTextFile(LinkConfiguration linkConfiguration,
filestream.close();
}
private void extractParquetFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
Path file, long start, long length,
String[] locations) throws IOException, InterruptedException {
// Parquet does not expose a way to directly deal with file splits
// except through the ParquetInputFormat (ParquetInputSplit is @private)
FileSplit fileSplit = new FileSplit(file, start, length, locations);
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, AvroReadSupport.class.getName());
ParquetInputFormat parquetInputFormat = new ParquetInputFormat();
// ParquetReader needs a TaskAttemptContext to pass through the
// configuration object.
TaskAttemptContext taskAttemptContext = new SqoopTaskAttemptContext(conf);
RecordReader<Void, GenericRecord> recordReader = parquetInputFormat.createRecordReader(fileSplit, taskAttemptContext);
recordReader.initialize(fileSplit, taskAttemptContext);
AVROIntermediateDataFormat idf = new AVROIntermediateDataFormat(schema);
while (recordReader.nextKeyValue() != false) {
GenericRecord record = recordReader.getCurrentValue();
rowsRead++;
if (schema instanceof ByteArraySchema) {
dataWriter.writeArrayRecord(new Object[]{idf.toObject(record)});
} else {
dataWriter.writeArrayRecord(idf.toObject(record));
}
}
}
@Override
public long getRowsRead() {
return rowsRead;
@ -207,6 +251,41 @@ private boolean isSequenceFile(Path file) {
return true;
}
private boolean isParquetFile(Path file) {
try {
FileSystem fileSystem = file.getFileSystem(conf);
FileStatus fileStatus = fileSystem.getFileStatus(file);
FSDataInputStream fsDataInputStream = fileSystem.open(file);
long fileLength = fileStatus.getLen();
byte[] fileStart = new byte[PARQUET_MAGIC.length];
fsDataInputStream.readFully(fileStart);
if (LOG.isDebugEnabled()) {
LOG.error("file start: " + new String(fileStart, Charset.forName("ASCII")));
}
if (!Arrays.equals(fileStart, PARQUET_MAGIC)) {
return false;
}
long fileEndIndex = fileLength - PARQUET_MAGIC.length;
fsDataInputStream.seek(fileEndIndex);
byte[] fileEnd = new byte[PARQUET_MAGIC.length];
fsDataInputStream.readFully(fileEnd);
if (LOG.isDebugEnabled()) {
LOG.error("file end: " + new String(fileEnd, Charset.forName("ASCII")));
}
return Arrays.equals(fileEnd, PARQUET_MAGIC);
} catch (IOException e) {
return false;
}
}
private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
if (schema instanceof ByteArraySchema) {
dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});

View File

@ -32,6 +32,7 @@
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
import org.apache.sqoop.error.code.HdfsConnectorError;
@ -89,7 +90,7 @@ public Void run() throws Exception {
GenericHdfsWriter filewriter = getWriter(toJobConfig);
filewriter.initialize(filepath, conf, codec);
filewriter.initialize(filepath, context.getSchema(), conf, codec);
if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
String record;
@ -119,8 +120,14 @@ public Void run() throws Exception {
}
private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) {
return (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) ? new HdfsSequenceWriter()
: new HdfsTextWriter();
switch(toJobConf.toJobConfig.outputFormat) {
case SEQUENCE_FILE:
return new HdfsSequenceWriter();
case PARQUET_FILE:
return new HdfsParquetWriter();
default:
return new HdfsTextWriter();
}
}
private String getCompressionCodecName(ToJobConfiguration toJobConf) {
@ -151,11 +158,16 @@ private String getCompressionCodecName(ToJobConfiguration toJobConf) {
//TODO: We should probably support configurable extensions at some point
private static String getExtension(ToJobConfiguration toJobConf, CompressionCodec codec) {
if (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE)
return ".seq";
if (codec == null)
return ".txt";
return codec.getDefaultExtension();
switch(toJobConf.toJobConfig.outputFormat) {
case SEQUENCE_FILE:
return ".seq";
case PARQUET_FILE:
return ".parquet";
default:
if (codec == null)
return ".txt";
return codec.getDefaultExtension();
}
}
/* (non-Javadoc)

View File

@ -30,4 +30,9 @@ public enum ToFormat {
* Sequence file
*/
SEQUENCE_FILE,
/**
* Parquet file
*/
PARQUET_FILE,
}

View File

@ -20,12 +20,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.schema.Schema;
import java.io.IOException;
public abstract class GenericHdfsWriter {
public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException;
public abstract void write(String csv) throws IOException;

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.hdfsWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.schema.Schema;
import java.io.IOException;
public class HdfsParquetWriter extends GenericHdfsWriter {
private ParquetWriter avroParquetWriter;
private Schema sqoopSchema;
private AVROIntermediateDataFormat avroIntermediateDataFormat;
@Override
public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec hadoopCodec) throws IOException {
sqoopSchema = schema;
avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
CompressionCodecName parquetCodecName;
if (hadoopCodec == null) {
parquetCodecName = CompressionCodecName.UNCOMPRESSED;
} else {
parquetCodecName = CompressionCodecName.fromCompressionCodec(hadoopCodec.getClass());
}
avroParquetWriter =
AvroParquetWriter.builder(filepath)
.withSchema(avroIntermediateDataFormat.getAvroSchema())
.withCompressionCodec(parquetCodecName)
.withConf(conf).build();
}
@Override
public void write(String csv) throws IOException {
avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv));
}
@Override
public void destroy() throws IOException {
avroParquetWriter.close();
}
}

View File

@ -23,16 +23,17 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.schema.Schema;
import java.io.IOException;
public class HdfsSequenceWriter extends GenericHdfsWriter {
public class HdfsSequenceWriter extends GenericHdfsWriter {
private SequenceFile.Writer filewriter;
private Text text;
@SuppressWarnings("deprecation")
public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class,

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.connector.hdfs.HdfsConstants;
import org.apache.sqoop.schema.Schema;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
@ -34,7 +35,7 @@ public class HdfsTextWriter extends GenericHdfsWriter {
private BufferedWriter filewriter;
@Override
public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
FileSystem fs = filepath.getFileSystem(conf);
DataOutputStream filestream = fs.create(filepath, false);

View File

@ -17,9 +17,6 @@
*/
package org.apache.sqoop.connector.hdfs;
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -27,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -35,11 +33,17 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
@ -47,13 +51,18 @@
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.testng.annotations.AfterMethod;
import org.apache.sqoop.utils.ClassUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.PARQUET_FILE;
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
public class TestLoader extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
@ -63,6 +72,7 @@ public class TestLoader extends TestHdfsBase {
private final String outputDirectory;
private Loader loader;
private String user = "test_user";
private Schema schema;
@Factory(dataProvider="test-hdfs-loader")
public TestLoader(ToFormat outputFormat,
@ -80,9 +90,10 @@ public static Object[][] data() {
for (ToCompression compression : new ToCompression[]{
ToCompression.DEFAULT,
ToCompression.BZIP2,
ToCompression.GZIP,
ToCompression.NONE
}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE, PARQUET_FILE}) {
parameters.add(new Object[]{outputFileType, compression});
}
}
@ -100,7 +111,7 @@ public void tearDown() throws IOException {
@Test
public void testLoader() throws Exception {
FileSystem fs = FileSystem.get(new Configuration());
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
.addColumn(new FloatingPoint("col2", 4L))
.addColumn(new Text("col3"));
@ -130,14 +141,22 @@ public Object readContent() {
assertTestUser(user);
return null;
}
}, null, user);
}, schema, user);
LinkConfiguration linkConf = new LinkConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
jobConf.toJobConfig.compression = compression;
jobConf.toJobConfig.outputFormat = outputFormat;
Path outputPath = new Path(outputDirectory);
loader.load(context, linkConf, jobConf);
try {
loader.load(context, linkConf, jobConf);
} catch (Exception e) {
// we may wait to fail if the compression format selected is not supported by the
// output format
Assert.assertTrue(compressionNotSupported());
return;
}
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
@ -152,10 +171,26 @@ public Object readContent() {
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
private boolean compressionNotSupported() {
switch (outputFormat) {
case SEQUENCE_FILE:
return compression == ToCompression.GZIP;
case PARQUET_FILE:
return compression == ToCompression.BZIP2 || compression == ToCompression.DEFAULT;
}
return false;
}
@Test
public void testOverrideNull() throws Exception {
// Parquet supports an actual "null" value so overriding null would not make
// sense here
if (outputFormat == PARQUET_FILE) {
return;
}
FileSystem fs = FileSystem.get(new Configuration());
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
.addColumn(new FloatingPoint("col2", 8L))
.addColumn(new Text("col3"))
.addColumn(new Text("col4"));
@ -199,7 +234,15 @@ public Object readContent() {
jobConf.toJobConfig.nullValue = "\\N";
Path outputPath = new Path(outputDirectory);
loader.load(context, linkConf, jobConf);
try {
loader.load(context, linkConf, jobConf);
} catch (Exception e) {
// we may wait to fail if the compression format selected is not supported by the
// output format
assert(compressionNotSupported());
return;
}
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
@ -214,7 +257,7 @@ public Object readContent() {
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
private void verifyOutput(FileSystem fs, Path file, String format) throws IOException {
private void verifyOutput(FileSystem fs, Path file, String format) throws Exception {
Configuration conf = new Configuration();
FSDataInputStream fsin = fs.open(file);
CompressionCodec codec;
@ -228,7 +271,9 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce
case BZIP2:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
break;
case GZIP:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
break;
case DEFAULT:
if(org.apache.hadoop.util.VersionInfo.getVersion().matches("\\b1\\.\\d\\.\\d")) {
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
@ -282,11 +327,47 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce
Assert.assertEquals(line.toString(), formatRow(format, index++));
line = new org.apache.hadoop.io.Text();
}
break;
case PARQUET_FILE:
String compressionCodecClassName = ParquetFileReader.readFooter(conf, file, ParquetMetadataConverter.NO_FILTER).getBlocks().get(0).getColumns().get(0).getCodec().getHadoopCompressionCodecClassName();
if (compressionCodecClassName == null) {
codec = null;
} else {
codec = (CompressionCodec) ClassUtils.loadClass(compressionCodecClassName).newInstance();
}
// Verify compression
switch(compression) {
case GZIP:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
break;
case NONE:
default:
Assert.assertNull(codec);
break;
}
ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat();
avroIntermediateDataFormat.setSchema(schema);
GenericRecord record;
index = 1;
while ((record = avroParquetReader.read()) != null) {
List<Object> objects = new ArrayList<>();
for (int i = 0; i < record.getSchema().getFields().size(); i++) {
objects.add(record.get(i));
}
Assert.assertEquals(SqoopIDFUtils.toText(avroIntermediateDataFormat.toCSV(record)), formatRow(format, index++));
}
break;
}
}
private void verifyOutput(FileSystem fs, Path file) throws IOException {
private void verifyOutput(FileSystem fs, Path file) throws Exception {
verifyOutput(fs, file, "%d,%f,%s");
}
}
}

View File

@ -43,7 +43,8 @@ public class SqoopAvroUtils {
* Creates an Avro schema from a Sqoop schema.
*/
public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
String name = sqoopSchema.getName();
// avro schema names cannot start with quotes, lets just remove them
String name = sqoopSchema.getName().replace("\"", "");
String doc = sqoopSchema.getNote();
String namespace = SQOOP_SCHEMA_NAMESPACE;
Schema schema = Schema.createRecord(name, doc, namespace, false);

View File

@ -148,7 +148,7 @@ public Set<String> getJars() {
return jars;
}
private GenericRecord toAVRO(String csv) {
public GenericRecord toAVRO(String csv) {
String[] csvStringArray = parseCSVString(csv);
@ -175,7 +175,7 @@ private GenericRecord toAVRO(String csv) {
return avroObject;
}
private Object toAVRO(String csvString, Column column) {
public Object toAVRO(String csvString, Column column) {
Object returnValue = null;
switch (column.getType()) {
@ -232,7 +232,7 @@ private Object toAVRO(String csvString, Column column) {
return returnValue;
}
private GenericRecord toAVRO(Object[] objectArray) {
public GenericRecord toAVRO(Object[] objectArray) {
if (objectArray == null) {
return null;
@ -311,7 +311,7 @@ private GenericRecord toAVRO(Object[] objectArray) {
}
@SuppressWarnings("unchecked")
private String toCSV(GenericRecord record) {
public String toCSV(GenericRecord record) {
Column[] columns = this.schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
@ -387,7 +387,7 @@ private String toCSV(GenericRecord record) {
}
@SuppressWarnings("unchecked")
private Object[] toObject(GenericRecord record) {
public Object[] toObject(GenericRecord record) {
if (record == null) {
return null;
@ -459,4 +459,8 @@ private Object[] toObject(GenericRecord record) {
}
return object;
}
public Schema getAvroSchema() {
return avroSchema;
}
}

11
pom.xml
View File

@ -124,6 +124,7 @@ limitations under the License.
<groovy.version>2.4.0</groovy.version>
<jansi.version>1.7</jansi.version>
<felix.version>2.4.0</felix.version>
<parquet.version>1.8.1</parquet.version>
<!-- maven plugin versions -->
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
</properties>
@ -700,6 +701,16 @@ limitations under the License.
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -175,6 +175,16 @@ limitations under the License.
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
</dependencies>
<!-- Add classifier name to the JAR name -->

View File

@ -20,17 +20,27 @@
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multiset;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
@ -51,6 +61,7 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
@ -64,6 +75,9 @@ public class NullValueTest extends SqoopTestCase {
// The custom nullValue to use (set to null if default)
private String nullValue;
private Schema sqoopSchema;
@DataProvider(name="nul-value-test")
public static Object[][] data(ITestContext context) {
String customNullValue = "^&*custom!@";
@ -80,12 +94,19 @@ public NullValueTest(ToFormat format, String nullValue) {
}
@Override
public String getTestName() {
return methodName + "[" + format.name() + ", " + nullValue + "]";
}
@BeforeMethod
public void setup() throws Exception {
sqoopSchema = new Schema("cities");
sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("country"));
sqoopSchema.addColumn(new DateTime("some_date", true, false));
sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("city"));
createTableCities();
}
@ -128,6 +149,27 @@ public void testFromHdfs() throws Exception {
}
sequenceFileWriter.close();
break;
case PARQUET_FILE:
// Parquet file format does not support using custom null values
if (usingCustomNullValue()) {
return;
} else {
HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
Configuration conf = new Configuration();
FileSystem.setDefaultUri(conf, hdfsClient.getUri());
parquetWriter.initialize(
new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
sqoopSchema, conf, null);
for (String line : getCsv()) {
parquetWriter.write(line);
}
parquetWriter.destroy();
break;
}
default:
Assert.fail();
}
@ -166,6 +208,11 @@ public void testFromHdfs() throws Exception {
@Test
public void testToHdfs() throws Exception {
// Parquet file format does not support using custom null values
if (usingCustomNullValue() && format == ToFormat.PARQUET_FILE) {
return;
}
provider.insertRow(getTableName(), 1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
@ -203,16 +250,16 @@ public void testToHdfs() throws Exception {
executeJob(job);
Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
List<String> notFound = new ArrayList<>();
switch (format) {
case TEXT_FILE:
HdfsAsserts.assertMapreduceOutput(hdfsClient,
HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
break;
return;
case SEQUENCE_FILE:
Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
List<String> notFound = new ArrayList<>();
Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
for(Path file : files) {
SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
@ -224,17 +271,32 @@ public void testToHdfs() throws Exception {
}
}
}
if(!setLines.isEmpty() || !notFound.isEmpty()) {
LOG.error("Output do not match expectations.");
LOG.error("Expected lines that weren't present in the files:");
LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
LOG.error("Extra lines in files that weren't expected:");
LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
Assert.fail("Output do not match expectations.");
break;
case PARQUET_FILE:
AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
notFound = new LinkedList<>();
for (Path file : files) {
ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
GenericRecord record;
while ((record = avroParquetReader.read()) != null) {
String recordAsCsv = avroIntermediateDataFormat.toCSV(record);
if (!setLines.remove(recordAsCsv)) {
notFound.add(recordAsCsv);
}
}
}
break;
default:
Assert.fail();
}
if(!setLines.isEmpty() || !notFound.isEmpty()) {
LOG.error("Output do not match expectations.");
LOG.error("Expected lines that weren't present in the files:");
LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
LOG.error("Extra lines in files that weren't expected:");
LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
Assert.fail("Output do not match expectations.");
}
}
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connector.hdfs;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.DateTime;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class ParquetTest extends SqoopTestCase {
@AfterMethod
public void dropTable() {
super.dropTable();
}
@Test
public void toParquetTest() throws Exception {
createAndLoadTableCities();
// RDBMS link
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsConnection);
saveLink(rdbmsConnection);
// HDFS link
MLink hdfsConnection = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsConnection);
saveLink(hdfsConnection);
// Job creation
MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
// Set rdbms "FROM" config
fillRdbmsFromConfig(job, "id");
// Fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.PARQUET_FILE);
saveJob(job);
executeJob(job);
String[] expectedOutput =
{"'1','USA','2004-10-23 00:00:00.000','San Francisco'",
"'2','USA','2004-10-24 00:00:00.000','Sunnyvale'",
"'3','Czech Republic','2004-10-25 00:00:00.000','Brno'",
"'4','USA','2004-10-26 00:00:00.000','Palo Alto'"};
Multiset<String> setLines = HashMultiset.create(Arrays.asList(expectedOutput));
List<String> notFound = new LinkedList<>();
Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, getMapreduceDirectory());
for (Path file : files) {
ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
GenericRecord record;
while ((record = avroParquetReader.read()) != null) {
String recordAsLine = recordToLine(record);
if (!setLines.remove(recordAsLine)) {
notFound.add(recordAsLine);
}
}
}
if (!setLines.isEmpty() || !notFound.isEmpty()) {
fail("Output do not match expectations.");
}
}
@Test
public void fromParquetTest() throws Exception {
createTableCities();
Schema sqoopSchema = new Schema("cities");
sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
sqoopSchema.addColumn(new Text("country"));
sqoopSchema.addColumn(new DateTime("some_date", true, false));
sqoopSchema.addColumn(new Text("city"));
HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
Configuration conf = new Configuration();
FileSystem.setDefaultUri(conf, hdfsClient.getUri());
parquetWriter.initialize(
new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
sqoopSchema, conf, null);
parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'");
parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'");
parquetWriter.destroy();
parquetWriter.initialize(
new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")),
sqoopSchema, conf, null);
parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'");
parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'");
parquetWriter.destroy();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
fillHdfsLink(hdfsLink);
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(hdfsLink.getName(), rdbmsLink.getName());
fillHdfsFromConfig(job);
fillRdbmsToConfig(job);
saveJob(job);
executeJob(job);
assertEquals(provider.rowCount(getTableName()), 4);
assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
}
public String recordToLine(GenericRecord genericRecord) {
String line = "";
line += "\'" + String.valueOf(genericRecord.get(0)) + "\',";
line += "\'" + String.valueOf(genericRecord.get(1)) + "\',";
line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',";
line += "\'" + String.valueOf(genericRecord.get(3)) + "\'";
return line;
}
}