mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 23:02:46 +08:00
SQOOP-1390: Import data to HDFS as a set of Parquet files
(Qian Xu via Jarek Jarcec Cecho)
This commit is contained in:
parent
74ec89452b
commit
2e1e09422c
2
ivy.xml
2
ivy.xml
@ -183,6 +183,8 @@ under the License.
|
||||
conf="common->default;redist->default"/>
|
||||
<dependency org="commons-io" name="commons-io" rev="${commons-io.version}"
|
||||
conf="common->default;redist->default"/>
|
||||
<dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data-mapreduce.version}"
|
||||
conf="avro->default;redist->default"/>
|
||||
|
||||
<!-- dependencies for static analysis -->
|
||||
<dependency org="checkstyle" name="checkstyle" rev="${checkstyle.version}"
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
avro.version=1.7.5
|
||||
|
||||
kite-data-mapreduce.version=0.15.0
|
||||
|
||||
checkstyle.version=5.0
|
||||
|
||||
commons-cli.version=1.2
|
||||
|
@ -33,6 +33,9 @@ Import control options
|
||||
--as-textfile::
|
||||
Imports data as plain text (default)
|
||||
|
||||
--as-parquetfile::
|
||||
Imports data to Parquet Files
|
||||
|
||||
--boundary-query (query)::
|
||||
Using following query to select minimal and maximal value of '--split-by' column for creating splits
|
||||
|
||||
|
@ -36,6 +36,9 @@ Import control options
|
||||
--as-textfile::
|
||||
Imports data as plain text (default)
|
||||
|
||||
--as-parquetfile::
|
||||
Imports data to Parquet Files
|
||||
|
||||
--direct::
|
||||
Use direct import fast path (mysql only)
|
||||
|
||||
|
@ -169,6 +169,7 @@ The following Sqoop export and import options are not supported with HCatalog jo
|
||||
* +--append+
|
||||
* +--as-sequencefile+
|
||||
* +--as-avrofile+
|
||||
* +--as-parquetfile+
|
||||
|
||||
Ignored Sqoop Options
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
@ -81,6 +81,7 @@ Import control arguments:
|
||||
--as-avrodatafile Imports data to Avro Data Files
|
||||
--as-sequencefile Imports data to SequenceFiles
|
||||
--as-textfile Imports data as plain text (default)
|
||||
--as-parquetfile Imports data to Parquet Data Files
|
||||
...
|
||||
----
|
||||
|
||||
|
@ -47,6 +47,7 @@ Argument Description
|
||||
+\--as-avrodatafile+ Imports data to Avro Data Files
|
||||
+\--as-sequencefile+ Imports data to SequenceFiles
|
||||
+\--as-textfile+ Imports data as plain text (default)
|
||||
+\--as-parquetfile+ Imports data to Parquet Files
|
||||
+\--direct+ Use direct import fast path
|
||||
+\--inline-lob-limit <n>+ Set the maximum size for an inline LOB
|
||||
+-m,\--num-mappers <n>+ Use 'n' map tasks to import in parallel
|
||||
|
@ -59,6 +59,7 @@ Argument Description
|
||||
+\--as-avrodatafile+ Imports data to Avro Data Files
|
||||
+\--as-sequencefile+ Imports data to SequenceFiles
|
||||
+\--as-textfile+ Imports data as plain text (default)
|
||||
+\--as-parquetfile+ Imports data to Parquet Files
|
||||
+\--boundary-query <statement>+ Boundary query to use for creating splits
|
||||
+\--columns <col,col,col...>+ Columns to import from table
|
||||
+\--delete-target-dir+ Delete the import target directory\
|
||||
|
@ -39,7 +39,8 @@ public class SqoopOptions
|
||||
public enum FileLayout {
|
||||
TextFile,
|
||||
SequenceFile,
|
||||
AvroDataFile
|
||||
AvroDataFile,
|
||||
ParquetFile
|
||||
}
|
||||
|
||||
/**
|
||||
|
69
src/java/org/apache/sqoop/avro/AvroUtil.java
Normal file
69
src/java/org/apache/sqoop/avro/AvroUtil.java
Normal file
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.avro;
|
||||
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.sqoop.lib.BlobRef;
|
||||
import org.apache.sqoop.lib.ClobRef;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Date;
|
||||
import java.sql.Time;
|
||||
import java.sql.Timestamp;
|
||||
|
||||
/**
|
||||
* The service class provides methods for creating and converting Avro objects.
|
||||
*/
|
||||
public final class AvroUtil {
|
||||
|
||||
/**
|
||||
* Convert the Avro representation of a Java type (that has already been
|
||||
* converted from the SQL equivalent). Note that the method is taken from
|
||||
* {@link org.apache.sqoop.mapreduce.AvroImportMapper}
|
||||
*/
|
||||
public static Object toAvro(Object o, boolean bigDecimalFormatString) {
|
||||
if (o instanceof BigDecimal) {
|
||||
if (bigDecimalFormatString) {
|
||||
return ((BigDecimal)o).toPlainString();
|
||||
} else {
|
||||
return o.toString();
|
||||
}
|
||||
} else if (o instanceof Date) {
|
||||
return ((Date) o).getTime();
|
||||
} else if (o instanceof Time) {
|
||||
return ((Time) o).getTime();
|
||||
} else if (o instanceof Timestamp) {
|
||||
return ((Timestamp) o).getTime();
|
||||
} else if (o instanceof BytesWritable) {
|
||||
BytesWritable bw = (BytesWritable) o;
|
||||
return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
|
||||
} else if (o instanceof BlobRef) {
|
||||
BlobRef br = (BlobRef) o;
|
||||
// If blob data is stored in an external .lob file, save the ref file
|
||||
// as Avro bytes. If materialized inline, save blob data as Avro bytes.
|
||||
byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
|
||||
return ByteBuffer.wrap(bytes);
|
||||
} else if (o instanceof ClobRef) {
|
||||
throw new UnsupportedOperationException("ClobRef not supported");
|
||||
}
|
||||
// primitive types (Integer, etc) are left unchanged
|
||||
return o;
|
||||
}
|
||||
|
||||
}
|
57
src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
Normal file
57
src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
Normal file
@ -0,0 +1,57 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.lib;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.sqoop.avro.AvroUtil;
|
||||
|
||||
/**
|
||||
* The abstract class extends {@link org.apache.sqoop.lib.SqoopRecord}. It also
|
||||
* implements the interface GenericRecord which is a generic instance of an Avro
|
||||
* record schema. Fields are accessible by name as well as by index.
|
||||
*/
|
||||
public abstract class SqoopAvroRecord extends SqoopRecord implements GenericRecord {
|
||||
|
||||
public abstract boolean getBigDecimalFormatString();
|
||||
|
||||
@Override
|
||||
public void put(String key, Object v) {
|
||||
getFieldMap().put(key, v);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(String key) {
|
||||
Object o = getFieldMap().get(key);
|
||||
return AvroUtil.toAvro(o, getBigDecimalFormatString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(int i, Object v) {
|
||||
put(getFieldNameByIndex(i), v);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(int i) {
|
||||
return get(getFieldNameByIndex(i));
|
||||
}
|
||||
|
||||
private String getFieldNameByIndex(int i) {
|
||||
return getSchema().getFields().get(i).name();
|
||||
}
|
||||
|
||||
}
|
@ -19,27 +19,20 @@
|
||||
package org.apache.sqoop.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Date;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Time;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.Map;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.mapred.AvroWrapper;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import com.cloudera.sqoop.lib.BlobRef;
|
||||
import com.cloudera.sqoop.lib.ClobRef;
|
||||
import com.cloudera.sqoop.lib.LargeObjectLoader;
|
||||
import com.cloudera.sqoop.lib.SqoopRecord;
|
||||
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
|
||||
import org.apache.sqoop.avro.AvroUtil;
|
||||
|
||||
/**
|
||||
* Imports records by transforming them to Avro records in an Avro data file.
|
||||
@ -92,45 +85,10 @@ private GenericRecord toGenericRecord(SqoopRecord val) {
|
||||
Map<String, Object> fieldMap = val.getFieldMap();
|
||||
GenericRecord record = new GenericData.Record(schema);
|
||||
for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
|
||||
record.put(entry.getKey(), toAvro(entry.getValue()));
|
||||
Object avro = AvroUtil.toAvro(entry.getValue(), bigDecimalFormatString);
|
||||
record.put(entry.getKey(), avro);
|
||||
}
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the Avro representation of a Java type (that has already been
|
||||
* converted from the SQL equivalent).
|
||||
* @param o
|
||||
* @return
|
||||
*/
|
||||
private Object toAvro(Object o) {
|
||||
if (o instanceof BigDecimal) {
|
||||
if (bigDecimalFormatString) {
|
||||
return ((BigDecimal)o).toPlainString();
|
||||
} else {
|
||||
return o.toString();
|
||||
}
|
||||
} else if (o instanceof Date) {
|
||||
return ((Date) o).getTime();
|
||||
} else if (o instanceof Time) {
|
||||
return ((Time) o).getTime();
|
||||
} else if (o instanceof Timestamp) {
|
||||
return ((Timestamp) o).getTime();
|
||||
} else if (o instanceof BytesWritable) {
|
||||
BytesWritable bw = (BytesWritable) o;
|
||||
return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
|
||||
} else if (o instanceof BlobRef) {
|
||||
BlobRef br = (BlobRef) o;
|
||||
// If blob data is stored in an external .lob file, save the ref file
|
||||
// as Avro bytes. If materialized inline, save blob data as Avro bytes.
|
||||
byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
|
||||
return ByteBuffer.wrap(bytes);
|
||||
} else if (o instanceof ClobRef) {
|
||||
throw new UnsupportedOperationException("ClobRef not suported");
|
||||
}
|
||||
// primitive types (Integer, etc) are left unchanged
|
||||
return o;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -26,6 +26,8 @@
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -45,6 +47,7 @@
|
||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
|
||||
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
|
||||
|
||||
/**
|
||||
* Actually runs a jdbc import job using the ORM files generated by the
|
||||
@ -95,6 +98,20 @@ protected void configureMapper(Job job, String tableName,
|
||||
}
|
||||
|
||||
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
|
||||
} else if (options.getFileLayout()
|
||||
== SqoopOptions.FileLayout.ParquetFile) {
|
||||
Configuration conf = job.getConfiguration();
|
||||
// An Avro schema is required for creating a dataset that manages
|
||||
// Parquet data records. The import will fail, if schema is invalid.
|
||||
Schema schema = new Schema.Parser().parse(conf.get("avro.schema"));
|
||||
String uri = "";
|
||||
if (options.doHiveImport()) {
|
||||
// TODO: SQOOP-1393
|
||||
} else {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
uri = "dataset:" + fs.makeQualified(getContext().getDestination());
|
||||
}
|
||||
ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode());
|
||||
}
|
||||
|
||||
job.setMapperClass(getMapperClass());
|
||||
@ -129,6 +146,9 @@ protected Class<? extends Mapper> getMapperClass() {
|
||||
} else if (options.getFileLayout()
|
||||
== SqoopOptions.FileLayout.AvroDataFile) {
|
||||
return AvroImportMapper.class;
|
||||
} else if (options.getFileLayout()
|
||||
== SqoopOptions.FileLayout.ParquetFile) {
|
||||
return ParquetImportMapper.class;
|
||||
}
|
||||
|
||||
return null;
|
||||
@ -149,6 +169,9 @@ protected Class<? extends OutputFormat> getOutputFormatClass()
|
||||
} else if (options.getFileLayout()
|
||||
== SqoopOptions.FileLayout.AvroDataFile) {
|
||||
return AvroOutputFormat.class;
|
||||
} else if (options.getFileLayout()
|
||||
== SqoopOptions.FileLayout.ParquetFile) {
|
||||
return DatasetKeyOutputFormat.class;
|
||||
}
|
||||
|
||||
return null;
|
||||
|
70
src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
Normal file
70
src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
Normal file
@ -0,0 +1,70 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce;
|
||||
|
||||
import com.cloudera.sqoop.lib.LargeObjectLoader;
|
||||
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.sqoop.lib.SqoopAvroRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
|
||||
/**
|
||||
* Imports records by writing them to a Parquet File.
|
||||
*/
|
||||
public class ParquetImportMapper
|
||||
extends AutoProgressMapper<LongWritable, SqoopAvroRecord,
|
||||
GenericRecord, NullWritable> {
|
||||
|
||||
private LargeObjectLoader lobLoader = null;
|
||||
|
||||
@Override
|
||||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
Path workPath = new Path("/tmp/sqoop-parquet-" + context.getTaskAttemptID());
|
||||
lobLoader = new LargeObjectLoader(conf, workPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void map(LongWritable key, SqoopAvroRecord val, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
// Loading of LOBs was delayed until we have a Context.
|
||||
val.loadLargeObjects(lobLoader);
|
||||
} catch (SQLException sqlE) {
|
||||
throw new IOException(sqlE);
|
||||
}
|
||||
|
||||
context.write(val, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(Context context) throws IOException {
|
||||
if (null != lobLoader) {
|
||||
lobLoader.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
77
src/java/org/apache/sqoop/mapreduce/ParquetJob.java
Normal file
77
src/java/org/apache/sqoop/mapreduce/ParquetJob.java
Normal file
@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.DatasetNotFoundException;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.Formats;
|
||||
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
|
||||
import org.kitesdk.data.spi.SchemaValidationUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Helper class for setting up a Parquet MapReduce job.
|
||||
*/
|
||||
public final class ParquetJob {
|
||||
|
||||
private ParquetJob() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the import job. The import process will use a Kite dataset to
|
||||
* write data records into Parquet format internally. The input key class is
|
||||
* {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is
|
||||
* {@link org.apache.avro.generic.GenericRecord}.
|
||||
*/
|
||||
public static void configureImportJob(Configuration conf, Schema schema,
|
||||
String uri, boolean doAppend) throws IOException {
|
||||
Dataset dataset;
|
||||
if (doAppend) {
|
||||
try {
|
||||
dataset = Datasets.load(uri);
|
||||
} catch (DatasetNotFoundException ex) {
|
||||
dataset = createDataset(schema, uri);
|
||||
}
|
||||
Schema writtenWith = dataset.getDescriptor().getSchema();
|
||||
if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
|
||||
throw new IOException(
|
||||
String.format("Expected schema: %s%nActual schema: %s",
|
||||
writtenWith, schema));
|
||||
}
|
||||
} else {
|
||||
dataset = createDataset(schema, uri);
|
||||
}
|
||||
DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
|
||||
}
|
||||
|
||||
private static Dataset createDataset(Schema schema, String uri) {
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(schema)
|
||||
.format(Formats.PARQUET)
|
||||
.build();
|
||||
return Datasets.create(uri, descriptor, GenericRecord.class);
|
||||
}
|
||||
|
||||
}
|
@ -30,9 +30,11 @@
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.sqoop.lib.SqoopAvroRecord;
|
||||
import org.apache.sqoop.mapreduce.ImportJobBase;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
@ -1108,6 +1110,26 @@ private void generateSetField(Map<String, Integer> columnTypes,
|
||||
}
|
||||
}
|
||||
|
||||
private void generateSqoopAvroRecordMethods(String className, Schema schema, StringBuilder sb) {
|
||||
// Define shared immutable attributes as static
|
||||
sb.append(" private final static boolean bigDecimalFormatString;\n");
|
||||
sb.append(" private final static Schema schema;\n");
|
||||
sb.append(" static {\n");
|
||||
sb.append(" bigDecimalFormatString = " + bigDecimalFormatString + ";\n");
|
||||
sb.append(" schema = new Schema.Parser().parse(\"");
|
||||
sb.append(schema.toString().replaceAll("\"", "\\\\\""));
|
||||
sb.append("\");\n");
|
||||
sb.append(" }\n");
|
||||
sb.append(" @Override\n");
|
||||
sb.append(" public boolean getBigDecimalFormatString() {\n");
|
||||
sb.append(" return bigDecimalFormatString;\n");
|
||||
sb.append(" }\n");
|
||||
sb.append(" @Override\n");
|
||||
sb.append(" public Schema getSchema() {\n");
|
||||
sb.append(" return schema;\n");
|
||||
sb.append(" }\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the setField() method.
|
||||
* @param columnTypes - mapping from column names to sql types
|
||||
@ -1728,9 +1750,15 @@ public void generate() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
Schema schema = null;
|
||||
if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
|
||||
schema = generateAvroSchemaForTable(tableName);
|
||||
options.getConf().set("avro.schema", schema.toString());
|
||||
}
|
||||
|
||||
// Generate the Java code.
|
||||
StringBuilder sb = generateClassForColumns(columnTypes,
|
||||
cleanedColNames, cleanedDbWriteColNames);
|
||||
cleanedColNames, cleanedDbWriteColNames, schema);
|
||||
// Write this out to a file in the jar output directory.
|
||||
// We'll move it to the user-visible CodeOutputDir after compiling.
|
||||
String codeOutDir = options.getJarOutputDir();
|
||||
@ -1788,6 +1816,12 @@ public void generate() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private Schema generateAvroSchemaForTable(String tableName) throws IOException {
|
||||
AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
|
||||
connManager, tableName);
|
||||
return generator.generate();
|
||||
}
|
||||
|
||||
protected String[] getColumnNames(Map<String, Integer> columnTypes) {
|
||||
String [] colNames = options.getColumns();
|
||||
if (null == colNames) {
|
||||
@ -1838,15 +1872,18 @@ protected Map<String, Integer> getColumnTypes() throws IOException {
|
||||
* @param colNames - ordered list of column names for table.
|
||||
* @param dbWriteColNames - ordered list of column names for the db
|
||||
* write() method of the class.
|
||||
* @param schema - If a valid Avro schema is specified, the base class will
|
||||
* be SqoopAvroRecord
|
||||
* @return - A StringBuilder that contains the text of the class code.
|
||||
*/
|
||||
private StringBuilder generateClassForColumns(
|
||||
Map<String, Integer> columnTypes,
|
||||
String [] colNames, String [] dbWriteColNames) {
|
||||
String [] colNames, String [] dbWriteColNames, Schema schema) {
|
||||
if (colNames.length ==0) {
|
||||
throw new IllegalArgumentException("Attempted to generate class with "
|
||||
+ "no columns!");
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("// ORM class for table '" + tableName + "'\n");
|
||||
sb.append("// WARNING: This class is AUTO-GENERATED. "
|
||||
@ -1878,7 +1915,13 @@ private StringBuilder generateClassForColumns(
|
||||
sb.append("import " + BlobRef.class.getCanonicalName() + ";\n");
|
||||
sb.append("import " + ClobRef.class.getCanonicalName() + ";\n");
|
||||
sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n");
|
||||
sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
|
||||
|
||||
Class baseClass = SqoopRecord.class;
|
||||
if (null != schema) {
|
||||
sb.append("import org.apache.avro.Schema;\n");
|
||||
baseClass = SqoopAvroRecord.class;
|
||||
}
|
||||
sb.append("import " + baseClass.getCanonicalName() + ";\n");
|
||||
sb.append("import java.sql.PreparedStatement;\n");
|
||||
sb.append("import java.sql.ResultSet;\n");
|
||||
sb.append("import java.sql.SQLException;\n");
|
||||
@ -1898,8 +1941,8 @@ private StringBuilder generateClassForColumns(
|
||||
sb.append("\n");
|
||||
|
||||
String className = tableNameInfo.getShortClassForTable(tableName);
|
||||
sb.append("public class " + className + " extends SqoopRecord "
|
||||
+ " implements DBWritable, Writable {\n");
|
||||
sb.append("public class " + className + " extends " + baseClass.getSimpleName()
|
||||
+ " implements DBWritable, Writable {\n");
|
||||
sb.append(" private final int PROTOCOL_VERSION = "
|
||||
+ CLASS_WRITER_VERSION + ";\n");
|
||||
sb.append(
|
||||
@ -1918,6 +1961,10 @@ private StringBuilder generateClassForColumns(
|
||||
generateGetFieldMap(columnTypes, colNames, sb);
|
||||
generateSetField(columnTypes, colNames, sb);
|
||||
|
||||
if (baseClass == SqoopAvroRecord.class) {
|
||||
generateSqoopAvroRecordMethods(className, schema, sb);
|
||||
}
|
||||
|
||||
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
|
||||
// WritableComparable
|
||||
|
||||
|
@ -98,6 +98,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
|
||||
public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
|
||||
public static final String FMT_TEXTFILE_ARG = "as-textfile";
|
||||
public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
|
||||
public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
|
||||
public static final String HIVE_IMPORT_ARG = "hive-import";
|
||||
public static final String HIVE_TABLE_ARG = "hive-table";
|
||||
public static final String HIVE_DATABASE_ARG = "hive-database";
|
||||
|
@ -708,6 +708,10 @@ protected RelatedOptions getImportOptions() {
|
||||
.withDescription("Imports data to Avro data files")
|
||||
.withLongOpt(FMT_AVRODATAFILE_ARG)
|
||||
.create());
|
||||
importOpts.addOption(OptionBuilder
|
||||
.withDescription("Imports data to Parquet files")
|
||||
.withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
|
||||
.create());
|
||||
importOpts.addOption(OptionBuilder.withArgName("n")
|
||||
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
|
||||
.withLongOpt(NUM_MAPPERS_ARG)
|
||||
@ -923,6 +927,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
|
||||
}
|
||||
|
||||
if (in.hasOption(FMT_PARQUETFILE_ARG)) {
|
||||
out.setFileLayout(SqoopOptions.FileLayout.ParquetFile);
|
||||
}
|
||||
|
||||
if (in.hasOption(NUM_MAPPERS_ARG)) {
|
||||
out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
|
||||
}
|
||||
@ -1020,8 +1028,8 @@ protected void validateImportOptions(SqoopOptions options)
|
||||
&& options.getFileLayout() != SqoopOptions.FileLayout.TextFile
|
||||
&& options.getConnectString().contains("jdbc:mysql://")) {
|
||||
throw new InvalidOptionsException(
|
||||
"MySQL direct import currently supports only text output format."
|
||||
+ "Parameters --as-sequencefile and --as-avrodatafile are not "
|
||||
"MySQL direct import currently supports only text output format. "
|
||||
+ "Parameters --as-sequencefile --as-avrodatafile and --as-parquetfile are not "
|
||||
+ "supported with --direct params in MySQL case.");
|
||||
} else if (options.isDirect()
|
||||
&& options.doHiveDropDelims()) {
|
||||
|
@ -228,8 +228,17 @@ private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
|
||||
} while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
|
||||
|
||||
LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
|
||||
} else { // ignore everything else
|
||||
LOG.debug("Filename: " + sourceFilename + " ignored");
|
||||
} else {
|
||||
// Generated Parquet files do not follow the pattern "part-m-([0-9]{5}).ext", so that these
|
||||
// files cannot be moved to target directory expectedly. We simply check file extension.
|
||||
boolean fileMoved = false;
|
||||
if (sourceFilename.endsWith(".parquet")) {
|
||||
Path targetFilename = new Path(targetDir, sourceFilename.toString());
|
||||
fileMoved = fs.rename(fileStatus.getPath(), targetFilename);
|
||||
}
|
||||
if (!fileMoved) { // ignore everything else
|
||||
LOG.debug("Filename: " + sourceFilename + " ignored");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -372,6 +372,18 @@ For lib/avro-<version>.jar:
|
||||
|
||||
The Apache License, Version 2.0
|
||||
|
||||
For lib/kite-data-core-<version>.jar:
|
||||
|
||||
The Apache License, Version 2.0
|
||||
|
||||
For lib/kite-data-mapreduce-<version>.jar:
|
||||
|
||||
The Apache License, Version 2.0
|
||||
|
||||
For lib/kite-hadoop-compatibility-<version>.jar:
|
||||
|
||||
The Apache License, Version 2.0
|
||||
|
||||
For lib/avro-ipc-<version>.jar:
|
||||
|
||||
The Apache License, Version 2.0
|
||||
|
200
src/test/com/cloudera/sqoop/TestParquetImport.java
Normal file
200
src/test/com/cloudera/sqoop/TestParquetImport.java
Normal file
@ -0,0 +1,200 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.cloudera.sqoop;
|
||||
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetReader;
|
||||
import org.kitesdk.data.Datasets;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Tests --as-parquetfile.
|
||||
*/
|
||||
public class TestParquetImport extends ImportJobTestCase {
|
||||
|
||||
public static final Log LOG = LogFactory
|
||||
.getLog(TestParquetImport.class.getName());
|
||||
|
||||
/**
|
||||
* Create the argv to pass to Sqoop.
|
||||
*
|
||||
* @return the argv as an array of strings.
|
||||
*/
|
||||
protected String[] getOutputArgv(boolean includeHadoopFlags,
|
||||
String[] extraArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
if (includeHadoopFlags) {
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
}
|
||||
|
||||
args.add("--table");
|
||||
args.add(getTableName());
|
||||
args.add("--connect");
|
||||
args.add(HsqldbTestServer.getUrl());
|
||||
args.add("--warehouse-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--split-by");
|
||||
args.add("INTFIELD1");
|
||||
args.add("--as-parquetfile");
|
||||
if (extraArgs != null) {
|
||||
args.addAll(Arrays.asList(extraArgs));
|
||||
}
|
||||
|
||||
return args.toArray(new String[args.size()]);
|
||||
}
|
||||
|
||||
public void testParquetImport() throws IOException {
|
||||
String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
|
||||
"VARBINARY(2)",};
|
||||
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
|
||||
createTableWithColTypes(types, vals);
|
||||
|
||||
runImport(getOutputArgv(true, null));
|
||||
|
||||
Schema schema = getSchema();
|
||||
assertEquals(Type.RECORD, schema.getType());
|
||||
List<Field> fields = schema.getFields();
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "DATA_COL0", Type.BOOLEAN);
|
||||
checkField(fields.get(1), "DATA_COL1", Type.INT);
|
||||
checkField(fields.get(2), "DATA_COL2", Type.LONG);
|
||||
checkField(fields.get(3), "DATA_COL3", Type.FLOAT);
|
||||
checkField(fields.get(4), "DATA_COL4", Type.DOUBLE);
|
||||
checkField(fields.get(5), "DATA_COL5", Type.STRING);
|
||||
checkField(fields.get(6), "DATA_COL6", Type.BYTES);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
GenericRecord record1 = reader.next();
|
||||
//assertNull(record1);
|
||||
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
|
||||
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
|
||||
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
|
||||
assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
|
||||
assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
|
||||
assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
|
||||
Object object = record1.get("DATA_COL6");
|
||||
assertTrue(object instanceof ByteBuffer);
|
||||
ByteBuffer b = ((ByteBuffer) object);
|
||||
assertEquals((byte) 1, b.get(0));
|
||||
assertEquals((byte) 2, b.get(1));
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testOverrideTypeMapping() throws IOException {
|
||||
String [] types = { "INT" };
|
||||
String [] vals = { "10" };
|
||||
createTableWithColTypes(types, vals);
|
||||
|
||||
String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
|
||||
runImport(getOutputArgv(true, extraArgs));
|
||||
|
||||
Schema schema = getSchema();
|
||||
assertEquals(Type.RECORD, schema.getType());
|
||||
List<Field> fields = schema.getFields();
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "DATA_COL0", Type.STRING);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testFirstUnderscoreInColumnName() throws IOException {
|
||||
String [] names = { "_NAME" };
|
||||
String [] types = { "INT" };
|
||||
String [] vals = { "1987" };
|
||||
createTableWithColTypesAndNames(names, types, vals);
|
||||
|
||||
runImport(getOutputArgv(true, null));
|
||||
|
||||
Schema schema = getSchema();
|
||||
assertEquals(Type.RECORD, schema.getType());
|
||||
List<Field> fields = schema.getFields();
|
||||
assertEquals(types.length, fields.size());
|
||||
checkField(fields.get(0), "__NAME", Type.INT);
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
GenericRecord record1 = reader.next();
|
||||
assertEquals("__NAME", 1987, record1.get("__NAME"));
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testNullableParquetImport() throws IOException, SQLException {
|
||||
String [] types = { "INT" };
|
||||
String [] vals = { null };
|
||||
createTableWithColTypes(types, vals);
|
||||
|
||||
runImport(getOutputArgv(true, null));
|
||||
|
||||
DatasetReader<GenericRecord> reader = getReader();
|
||||
try {
|
||||
GenericRecord record1 = reader.next();
|
||||
assertNull(record1.get("DATA_COL0"));
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Schema getSchema() {
|
||||
return getDataset().getDescriptor().getSchema();
|
||||
}
|
||||
|
||||
private DatasetReader<GenericRecord> getReader() {
|
||||
return getDataset().newReader();
|
||||
}
|
||||
|
||||
private Dataset<GenericRecord> getDataset() {
|
||||
String uri = "dataset:file:" + getTablePath();
|
||||
return Datasets.load(uri, GenericRecord.class);
|
||||
}
|
||||
|
||||
private void checkField(Field field, String name, Type type) {
|
||||
assertEquals(name, field.name());
|
||||
assertEquals(Type.UNION, field.schema().getType());
|
||||
assertEquals(type, field.schema().getTypes().get(0).getType());
|
||||
assertEquals(Type.NULL, field.schema().getTypes().get(1).getType());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user