diff --git a/ivy.xml b/ivy.xml
index abc12a10..e5334f15 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -183,6 +183,8 @@ under the License.
conf="common->default;redist->default"/>
+
+ Set the maximum size for an inline LOB
+-m,\--num-mappers + Use 'n' map tasks to import in parallel
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 192e97e3..c5ffa50f 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -59,6 +59,7 @@ Argument Description
+\--as-avrodatafile+ Imports data to Avro Data Files
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
++\--as-parquetfile+ Imports data to Parquet Files
+\--boundary-query + Boundary query to use for creating splits
+\--columns + Columns to import from table
+\--delete-target-dir+ Delete the import target directory\
diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java
index ffec2dc4..f4ababea 100644
--- a/src/java/com/cloudera/sqoop/SqoopOptions.java
+++ b/src/java/com/cloudera/sqoop/SqoopOptions.java
@@ -39,7 +39,8 @@ public class SqoopOptions
public enum FileLayout {
TextFile,
SequenceFile,
- AvroDataFile
+ AvroDataFile,
+ ParquetFile
}
/**
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
new file mode 100644
index 00000000..4b37d589
--- /dev/null
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.avro;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.BlobRef;
+import org.apache.sqoop.lib.ClobRef;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * The service class provides methods for creating and converting Avro objects.
+ */
+public final class AvroUtil {
+
+ /**
+ * Convert the Avro representation of a Java type (that has already been
+ * converted from the SQL equivalent). Note that the method is taken from
+ * {@link org.apache.sqoop.mapreduce.AvroImportMapper}
+ */
+ public static Object toAvro(Object o, boolean bigDecimalFormatString) {
+ if (o instanceof BigDecimal) {
+ if (bigDecimalFormatString) {
+ return ((BigDecimal)o).toPlainString();
+ } else {
+ return o.toString();
+ }
+ } else if (o instanceof Date) {
+ return ((Date) o).getTime();
+ } else if (o instanceof Time) {
+ return ((Time) o).getTime();
+ } else if (o instanceof Timestamp) {
+ return ((Timestamp) o).getTime();
+ } else if (o instanceof BytesWritable) {
+ BytesWritable bw = (BytesWritable) o;
+ return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+ } else if (o instanceof BlobRef) {
+ BlobRef br = (BlobRef) o;
+ // If blob data is stored in an external .lob file, save the ref file
+ // as Avro bytes. If materialized inline, save blob data as Avro bytes.
+ byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
+ return ByteBuffer.wrap(bytes);
+ } else if (o instanceof ClobRef) {
+ throw new UnsupportedOperationException("ClobRef not supported");
+ }
+ // primitive types (Integer, etc) are left unchanged
+ return o;
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
new file mode 100644
index 00000000..80875d23
--- /dev/null
+++ b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sqoop.avro.AvroUtil;
+
+/**
+ * The abstract class extends {@link org.apache.sqoop.lib.SqoopRecord}. It also
+ * implements the interface GenericRecord which is a generic instance of an Avro
+ * record schema. Fields are accessible by name as well as by index.
+ */
+public abstract class SqoopAvroRecord extends SqoopRecord implements GenericRecord {
+
+ public abstract boolean getBigDecimalFormatString();
+
+ @Override
+ public void put(String key, Object v) {
+ getFieldMap().put(key, v);
+ }
+
+ @Override
+ public Object get(String key) {
+ Object o = getFieldMap().get(key);
+ return AvroUtil.toAvro(o, getBigDecimalFormatString());
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ put(getFieldNameByIndex(i), v);
+ }
+
+ @Override
+ public Object get(int i) {
+ return get(getFieldNameByIndex(i));
+ }
+
+ private String getFieldNameByIndex(int i) {
+ return getSchema().getFields().get(i).name();
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
index 289eb28c..6fc656f5 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
@@ -19,27 +19,20 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.avro.AvroUtil;
/**
* Imports records by transforming them to Avro records in an Avro data file.
@@ -92,45 +85,10 @@ private GenericRecord toGenericRecord(SqoopRecord val) {
Map fieldMap = val.getFieldMap();
GenericRecord record = new GenericData.Record(schema);
for (Map.Entry entry : fieldMap.entrySet()) {
- record.put(entry.getKey(), toAvro(entry.getValue()));
+ Object avro = AvroUtil.toAvro(entry.getValue(), bigDecimalFormatString);
+ record.put(entry.getKey(), avro);
}
return record;
}
- /**
- * Convert the Avro representation of a Java type (that has already been
- * converted from the SQL equivalent).
- * @param o
- * @return
- */
- private Object toAvro(Object o) {
- if (o instanceof BigDecimal) {
- if (bigDecimalFormatString) {
- return ((BigDecimal)o).toPlainString();
- } else {
- return o.toString();
- }
- } else if (o instanceof Date) {
- return ((Date) o).getTime();
- } else if (o instanceof Time) {
- return ((Time) o).getTime();
- } else if (o instanceof Timestamp) {
- return ((Timestamp) o).getTime();
- } else if (o instanceof BytesWritable) {
- BytesWritable bw = (BytesWritable) o;
- return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
- } else if (o instanceof BlobRef) {
- BlobRef br = (BlobRef) o;
- // If blob data is stored in an external .lob file, save the ref file
- // as Avro bytes. If materialized inline, save blob data as Avro bytes.
- byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
- return ByteBuffer.wrap(bytes);
- } else if (o instanceof ClobRef) {
- throw new UnsupportedOperationException("ClobRef not suported");
- }
- // primitive types (Integer, etc) are left unchanged
- return o;
- }
-
-
}
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 6dcfebb9..300406ab 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -26,6 +26,8 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -45,6 +47,7 @@
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
/**
* Actually runs a jdbc import job using the ORM files generated by the
@@ -95,6 +98,20 @@ protected void configureMapper(Job job, String tableName,
}
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.ParquetFile) {
+ Configuration conf = job.getConfiguration();
+ // An Avro schema is required for creating a dataset that manages
+ // Parquet data records. The import will fail, if schema is invalid.
+ Schema schema = new Schema.Parser().parse(conf.get("avro.schema"));
+ String uri = "";
+ if (options.doHiveImport()) {
+ // TODO: SQOOP-1393
+ } else {
+ FileSystem fs = FileSystem.get(conf);
+ uri = "dataset:" + fs.makeQualified(getContext().getDestination());
+ }
+ ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode());
}
job.setMapperClass(getMapperClass());
@@ -129,6 +146,9 @@ protected Class extends Mapper> getMapperClass() {
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
return AvroImportMapper.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.ParquetFile) {
+ return ParquetImportMapper.class;
}
return null;
@@ -149,6 +169,9 @@ protected Class extends OutputFormat> getOutputFormatClass()
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.AvroDataFile) {
return AvroOutputFormat.class;
+ } else if (options.getFileLayout()
+ == SqoopOptions.FileLayout.ParquetFile) {
+ return DatasetKeyOutputFormat.class;
}
return null;
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
new file mode 100644
index 00000000..cc2982c3
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.lib.SqoopAvroRecord;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Imports records by writing them to a Parquet File.
+ */
+public class ParquetImportMapper
+ extends AutoProgressMapper {
+
+ private LargeObjectLoader lobLoader = null;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Path workPath = new Path("/tmp/sqoop-parquet-" + context.getTaskAttemptID());
+ lobLoader = new LargeObjectLoader(conf, workPath);
+ }
+
+ @Override
+ protected void map(LongWritable key, SqoopAvroRecord val, Context context)
+ throws IOException, InterruptedException {
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ val.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+
+ context.write(val, null);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
new file mode 100644
index 00000000..a74432a0
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetNotFoundException;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import org.kitesdk.data.spi.SchemaValidationUtil;
+
+import java.io.IOException;
+
+/**
+ * Helper class for setting up a Parquet MapReduce job.
+ */
+public final class ParquetJob {
+
+ private ParquetJob() {
+ }
+
+ /**
+ * Configure the import job. The import process will use a Kite dataset to
+ * write data records into Parquet format internally. The input key class is
+ * {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is
+ * {@link org.apache.avro.generic.GenericRecord}.
+ */
+ public static void configureImportJob(Configuration conf, Schema schema,
+ String uri, boolean doAppend) throws IOException {
+ Dataset dataset;
+ if (doAppend) {
+ try {
+ dataset = Datasets.load(uri);
+ } catch (DatasetNotFoundException ex) {
+ dataset = createDataset(schema, uri);
+ }
+ Schema writtenWith = dataset.getDescriptor().getSchema();
+ if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
+ throw new IOException(
+ String.format("Expected schema: %s%nActual schema: %s",
+ writtenWith, schema));
+ }
+ } else {
+ dataset = createDataset(schema, uri);
+ }
+ DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
+ }
+
+ private static Dataset createDataset(Schema schema, String uri) {
+ DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+ .schema(schema)
+ .format(Formats.PARQUET)
+ .build();
+ return Datasets.create(uri, descriptor, GenericRecord.class);
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index 94ff5765..4f9dedd9 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -30,9 +30,11 @@
import java.util.Properties;
import java.util.Set;
+import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.SqoopAvroRecord;
import org.apache.sqoop.mapreduce.ImportJobBase;
import com.cloudera.sqoop.SqoopOptions;
@@ -1108,6 +1110,26 @@ private void generateSetField(Map columnTypes,
}
}
+ private void generateSqoopAvroRecordMethods(String className, Schema schema, StringBuilder sb) {
+ // Define shared immutable attributes as static
+ sb.append(" private final static boolean bigDecimalFormatString;\n");
+ sb.append(" private final static Schema schema;\n");
+ sb.append(" static {\n");
+ sb.append(" bigDecimalFormatString = " + bigDecimalFormatString + ";\n");
+ sb.append(" schema = new Schema.Parser().parse(\"");
+ sb.append(schema.toString().replaceAll("\"", "\\\\\""));
+ sb.append("\");\n");
+ sb.append(" }\n");
+ sb.append(" @Override\n");
+ sb.append(" public boolean getBigDecimalFormatString() {\n");
+ sb.append(" return bigDecimalFormatString;\n");
+ sb.append(" }\n");
+ sb.append(" @Override\n");
+ sb.append(" public Schema getSchema() {\n");
+ sb.append(" return schema;\n");
+ sb.append(" }\n");
+ }
+
/**
* Generate the setField() method.
* @param columnTypes - mapping from column names to sql types
@@ -1728,9 +1750,15 @@ public void generate() throws IOException {
}
}
+ Schema schema = null;
+ if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+ schema = generateAvroSchemaForTable(tableName);
+ options.getConf().set("avro.schema", schema.toString());
+ }
+
// Generate the Java code.
StringBuilder sb = generateClassForColumns(columnTypes,
- cleanedColNames, cleanedDbWriteColNames);
+ cleanedColNames, cleanedDbWriteColNames, schema);
// Write this out to a file in the jar output directory.
// We'll move it to the user-visible CodeOutputDir after compiling.
String codeOutDir = options.getJarOutputDir();
@@ -1788,6 +1816,12 @@ public void generate() throws IOException {
}
}
+ private Schema generateAvroSchemaForTable(String tableName) throws IOException {
+ AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+ connManager, tableName);
+ return generator.generate();
+ }
+
protected String[] getColumnNames(Map columnTypes) {
String [] colNames = options.getColumns();
if (null == colNames) {
@@ -1838,15 +1872,18 @@ protected Map getColumnTypes() throws IOException {
* @param colNames - ordered list of column names for table.
* @param dbWriteColNames - ordered list of column names for the db
* write() method of the class.
+ * @param schema - If a valid Avro schema is specified, the base class will
+ * be SqoopAvroRecord
* @return - A StringBuilder that contains the text of the class code.
*/
private StringBuilder generateClassForColumns(
Map columnTypes,
- String [] colNames, String [] dbWriteColNames) {
+ String [] colNames, String [] dbWriteColNames, Schema schema) {
if (colNames.length ==0) {
throw new IllegalArgumentException("Attempted to generate class with "
+ "no columns!");
}
+
StringBuilder sb = new StringBuilder();
sb.append("// ORM class for table '" + tableName + "'\n");
sb.append("// WARNING: This class is AUTO-GENERATED. "
@@ -1878,7 +1915,13 @@ private StringBuilder generateClassForColumns(
sb.append("import " + BlobRef.class.getCanonicalName() + ";\n");
sb.append("import " + ClobRef.class.getCanonicalName() + ";\n");
sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n");
- sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
+
+ Class baseClass = SqoopRecord.class;
+ if (null != schema) {
+ sb.append("import org.apache.avro.Schema;\n");
+ baseClass = SqoopAvroRecord.class;
+ }
+ sb.append("import " + baseClass.getCanonicalName() + ";\n");
sb.append("import java.sql.PreparedStatement;\n");
sb.append("import java.sql.ResultSet;\n");
sb.append("import java.sql.SQLException;\n");
@@ -1898,8 +1941,8 @@ private StringBuilder generateClassForColumns(
sb.append("\n");
String className = tableNameInfo.getShortClassForTable(tableName);
- sb.append("public class " + className + " extends SqoopRecord "
- + " implements DBWritable, Writable {\n");
+ sb.append("public class " + className + " extends " + baseClass.getSimpleName()
+ + " implements DBWritable, Writable {\n");
sb.append(" private final int PROTOCOL_VERSION = "
+ CLASS_WRITER_VERSION + ";\n");
sb.append(
@@ -1918,6 +1961,10 @@ private StringBuilder generateClassForColumns(
generateGetFieldMap(columnTypes, colNames, sb);
generateSetField(columnTypes, colNames, sb);
+ if (baseClass == SqoopAvroRecord.class) {
+ generateSqoopAvroRecordMethods(className, schema, sb);
+ }
+
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
// WritableComparable
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index b77b1ea3..26950ccc 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -98,6 +98,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
public static final String FMT_TEXTFILE_ARG = "as-textfile";
public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
+ public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
public static final String HIVE_IMPORT_ARG = "hive-import";
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_DATABASE_ARG = "hive-database";
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index a3a2d0dd..54e618e5 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -708,6 +708,10 @@ protected RelatedOptions getImportOptions() {
.withDescription("Imports data to Avro data files")
.withLongOpt(FMT_AVRODATAFILE_ARG)
.create());
+ importOpts.addOption(OptionBuilder
+ .withDescription("Imports data to Parquet files")
+ .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
+ .create());
importOpts.addOption(OptionBuilder.withArgName("n")
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
.withLongOpt(NUM_MAPPERS_ARG)
@@ -923,6 +927,10 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
}
+ if (in.hasOption(FMT_PARQUETFILE_ARG)) {
+ out.setFileLayout(SqoopOptions.FileLayout.ParquetFile);
+ }
+
if (in.hasOption(NUM_MAPPERS_ARG)) {
out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
}
@@ -1020,8 +1028,8 @@ protected void validateImportOptions(SqoopOptions options)
&& options.getFileLayout() != SqoopOptions.FileLayout.TextFile
&& options.getConnectString().contains("jdbc:mysql://")) {
throw new InvalidOptionsException(
- "MySQL direct import currently supports only text output format."
- + "Parameters --as-sequencefile and --as-avrodatafile are not "
+ "MySQL direct import currently supports only text output format. "
+ + "Parameters --as-sequencefile --as-avrodatafile and --as-parquetfile are not "
+ "supported with --direct params in MySQL case.");
} else if (options.isDirect()
&& options.doHiveDropDelims()) {
diff --git a/src/java/org/apache/sqoop/util/AppendUtils.java b/src/java/org/apache/sqoop/util/AppendUtils.java
index 5eaaa958..b6bbc184 100644
--- a/src/java/org/apache/sqoop/util/AppendUtils.java
+++ b/src/java/org/apache/sqoop/util/AppendUtils.java
@@ -228,8 +228,17 @@ private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
} while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
- } else { // ignore everything else
- LOG.debug("Filename: " + sourceFilename + " ignored");
+ } else {
+ // Generated Parquet files do not follow the pattern "part-m-([0-9]{5}).ext", so that these
+ // files cannot be moved to target directory expectedly. We simply check file extension.
+ boolean fileMoved = false;
+ if (sourceFilename.endsWith(".parquet")) {
+ Path targetFilename = new Path(targetDir, sourceFilename.toString());
+ fileMoved = fs.rename(fileStatus.getPath(), targetFilename);
+ }
+ if (!fileMoved) { // ignore everything else
+ LOG.debug("Filename: " + sourceFilename + " ignored");
+ }
}
}
}
diff --git a/src/licenses/LICENSE-BIN.txt b/src/licenses/LICENSE-BIN.txt
index 4215d26b..8cec1ba0 100644
--- a/src/licenses/LICENSE-BIN.txt
+++ b/src/licenses/LICENSE-BIN.txt
@@ -372,6 +372,18 @@ For lib/avro-.jar:
The Apache License, Version 2.0
+For lib/kite-data-core-.jar:
+
+ The Apache License, Version 2.0
+
+For lib/kite-data-mapreduce-.jar:
+
+ The Apache License, Version 2.0
+
+For lib/kite-hadoop-compatibility-.jar:
+
+ The Apache License, Version 2.0
+
For lib/avro-ipc-.jar:
The Apache License, Version 2.0
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
new file mode 100644
index 00000000..2224719e
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests --as-parquetfile.
+ */
+public class TestParquetImport extends ImportJobTestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(TestParquetImport.class.getName());
+
+ /**
+ * Create the argv to pass to Sqoop.
+ *
+ * @return the argv as an array of strings.
+ */
+ protected String[] getOutputArgv(boolean includeHadoopFlags,
+ String[] extraArgs) {
+ ArrayList args = new ArrayList();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("--table");
+ args.add(getTableName());
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--split-by");
+ args.add("INTFIELD1");
+ args.add("--as-parquetfile");
+ if (extraArgs != null) {
+ args.addAll(Arrays.asList(extraArgs));
+ }
+
+ return args.toArray(new String[args.size()]);
+ }
+
+ public void testParquetImport() throws IOException {
+ String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
+ "VARBINARY(2)",};
+ String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
+ createTableWithColTypes(types, vals);
+
+ runImport(getOutputArgv(true, null));
+
+ Schema schema = getSchema();
+ assertEquals(Type.RECORD, schema.getType());
+ List fields = schema.getFields();
+ assertEquals(types.length, fields.size());
+ checkField(fields.get(0), "DATA_COL0", Type.BOOLEAN);
+ checkField(fields.get(1), "DATA_COL1", Type.INT);
+ checkField(fields.get(2), "DATA_COL2", Type.LONG);
+ checkField(fields.get(3), "DATA_COL3", Type.FLOAT);
+ checkField(fields.get(4), "DATA_COL4", Type.DOUBLE);
+ checkField(fields.get(5), "DATA_COL5", Type.STRING);
+ checkField(fields.get(6), "DATA_COL6", Type.BYTES);
+
+ DatasetReader reader = getReader();
+ try {
+ GenericRecord record1 = reader.next();
+ //assertNull(record1);
+ assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
+ assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
+ assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
+ assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
+ assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
+ assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
+ Object object = record1.get("DATA_COL6");
+ assertTrue(object instanceof ByteBuffer);
+ ByteBuffer b = ((ByteBuffer) object);
+ assertEquals((byte) 1, b.get(0));
+ assertEquals((byte) 2, b.get(1));
+ } finally {
+ reader.close();
+ }
+ }
+
+ public void testOverrideTypeMapping() throws IOException {
+ String [] types = { "INT" };
+ String [] vals = { "10" };
+ createTableWithColTypes(types, vals);
+
+ String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
+ runImport(getOutputArgv(true, extraArgs));
+
+ Schema schema = getSchema();
+ assertEquals(Type.RECORD, schema.getType());
+ List fields = schema.getFields();
+ assertEquals(types.length, fields.size());
+ checkField(fields.get(0), "DATA_COL0", Type.STRING);
+
+ DatasetReader reader = getReader();
+ try {
+ GenericRecord record1 = reader.next();
+ assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
+ } finally {
+ reader.close();
+ }
+ }
+
+ public void testFirstUnderscoreInColumnName() throws IOException {
+ String [] names = { "_NAME" };
+ String [] types = { "INT" };
+ String [] vals = { "1987" };
+ createTableWithColTypesAndNames(names, types, vals);
+
+ runImport(getOutputArgv(true, null));
+
+ Schema schema = getSchema();
+ assertEquals(Type.RECORD, schema.getType());
+ List fields = schema.getFields();
+ assertEquals(types.length, fields.size());
+ checkField(fields.get(0), "__NAME", Type.INT);
+
+ DatasetReader reader = getReader();
+ try {
+ GenericRecord record1 = reader.next();
+ assertEquals("__NAME", 1987, record1.get("__NAME"));
+ } finally {
+ reader.close();
+ }
+ }
+
+ public void testNullableParquetImport() throws IOException, SQLException {
+ String [] types = { "INT" };
+ String [] vals = { null };
+ createTableWithColTypes(types, vals);
+
+ runImport(getOutputArgv(true, null));
+
+ DatasetReader reader = getReader();
+ try {
+ GenericRecord record1 = reader.next();
+ assertNull(record1.get("DATA_COL0"));
+ } finally {
+ reader.close();
+ }
+ }
+
+ private Schema getSchema() {
+ return getDataset().getDescriptor().getSchema();
+ }
+
+ private DatasetReader getReader() {
+ return getDataset().newReader();
+ }
+
+ private Dataset getDataset() {
+ String uri = "dataset:file:" + getTablePath();
+ return Datasets.load(uri, GenericRecord.class);
+ }
+
+ private void checkField(Field field, String name, Type type) {
+ assertEquals(name, field.name());
+ assertEquals(Type.UNION, field.schema().getType());
+ assertEquals(type, field.schema().getTypes().get(0).getType());
+ assertEquals(Type.NULL, field.schema().getTypes().get(1).getType());
+ }
+
+}