diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index 61ff3f24..178720d8 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -54,9 +54,22 @@ if [ -z "${HADOOP_MAPRED_HOME}" ]; then
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
fi
fi
+
+# We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+# so that hcat script works correctly on BigTop
+if [ -z "${HADOOP_HOME}" ]; then
+ if [ -n "${HADOOP_COMMON_HOME}" ]; then
+ HADOOP_HOME=${HADOOP_COMMON_HOME}
+ export HADOOP_HOME
+ fi
+fi
+
if [ -z "${HBASE_HOME}" ]; then
HBASE_HOME=/usr/lib/hbase
fi
+if [ -z "${HCAT_HOME}" ]; then
+ HCAT_HOME=/usr/lib/hcatalog
+fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -76,6 +89,12 @@ if [ ! -d "${HBASE_HOME}" ]; then
echo 'Please set $HBASE_HOME to the root of your HBase installation.'
fi
+## Moved to be a runtime check in sqoop.
+if [ ! -d "${HCAT_HOME}" ]; then
+ echo "Warning: $HCAT_HOME does not exist! HCatalog jobs will fail."
+ echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
+fi
+
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@@ -106,6 +125,15 @@ if [ -e "$HBASE_HOME/bin/hbase" ]; then
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
+# Add HCatalog to dependency list
+if [ -e "${HCAT_HOME}/bin/hcat" ]; then
+ TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`${HCAT_HOME}/bin/hcat -classpath`
+ if [ -z "${HIVE_CONF_DIR}" ]; then
+ TMP_SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}:${HIVE_CONF_DIR}
+ fi
+ SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
+fi
+
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
if [ -d "${ZOOCFGDIR}" ]; then
SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -136,4 +164,6 @@ export HADOOP_CLASSPATH
export HADOOP_COMMON_HOME
export HADOOP_MAPRED_HOME
export HBASE_HOME
+export HCAT_HOME
+export HIVE_CONF_DIR
diff --git a/bin/configure-sqoop.cmd b/bin/configure-sqoop.cmd
index f5fd6082..ec57e375 100644
--- a/bin/configure-sqoop.cmd
+++ b/bin/configure-sqoop.cmd
@@ -55,6 +55,15 @@ if not defined HADOOP_MAPRED_HOME (
exit /b 1
)
)
+
+:: We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+:: so that hcat script works correctly on BigTop
+if not defined HADOOP_HOME (
+ if defined HADOOP_COMMON_HOME (
+ set HADOOP_HOME=%HADOOP_COMMON_HOME%
+ )
+)
+
:: Check for HBase dependency
if not defined HBASE_HOME (
if defined HBASE_VERSION (
diff --git a/build.xml b/build.xml
index 636c1035..b4b08e5c 100644
--- a/build.xml
+++ b/build.xml
@@ -51,6 +51,7 @@
+
@@ -60,6 +61,7 @@
+
@@ -70,6 +72,7 @@
+
@@ -80,6 +83,7 @@
+
@@ -600,6 +604,7 @@
+
@@ -643,12 +648,14 @@
+
+
@@ -658,6 +665,9 @@
+
+
+
@@ -667,6 +677,8 @@
+
+
@@ -712,7 +724,7 @@
-
+
+
+
+
+
+
+
diff --git a/ivy.xml b/ivy.xml
index 1fa4dd13..750adfc3 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -37,10 +37,15 @@ under the License.
extends="runtime"
description="artifacts needed to compile/test the application"/>
-
-
-
-
+
+
+
+
+
@@ -172,6 +177,11 @@ under the License.
+
+
+
+
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index c4cc5613..2920c892 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -42,6 +42,9 @@ under the License.
+
@@ -52,6 +55,8 @@ under the License.
+
-
+
+
-
+
-
-
+
+
+
diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt
index 01ac1cf6..2e888879 100644
--- a/src/docs/user/SqoopUserGuide.txt
+++ b/src/docs/user/SqoopUserGuide.txt
@@ -72,6 +72,8 @@ include::help.txt[]
include::version.txt[]
+include::hcatalog.txt[]
+
include::compatibility.txt[]
include::connectors.txt[]
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
new file mode 100644
index 00000000..b8e495e9
--- /dev/null
+++ b/src/docs/user/hcatalog.txt
@@ -0,0 +1,313 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+Sqoop-HCatalog Integration
+--------------------------
+
+HCatalog Background
+~~~~~~~~~~~~~~~~~~~
+
+HCatalog is a table and storage management service for Hadoop that enables
+users with different data processing tools – Pig, MapReduce, and Hive –
+to more easily read and write data on the grid. HCatalog’s table abstraction
+presents users with a relational view of data in the Hadoop distributed
+file system (HDFS) and ensures that users need not worry about where or
+in what format their data is stored: RCFile format, text files, or
+SequenceFiles.
+
+HCatalog supports reading and writing files in any format for which a Hive
+SerDe (serializer-deserializer) has been written. By default, HCatalog
+supports RCFile, CSV, JSON, and SequenceFile formats. To use a custom
+format, you must provide the InputFormat and OutputFormat as well as the SerDe.
+
+The ability of HCatalog to abstract various storage formats is used in
+providing the RCFile (and future file types) support to Sqoop.
+
+Exposing HCatalog Tables to Sqoop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog integration with Sqoop is patterned on an existing feature set that
+supports Avro and Hive tables. Five new command line options are introduced,
+and some command line options defined for Hive are reused.
+
+New Command Line Options
+^^^^^^^^^^^^^^^^^^^^^^^^
+
++--hcatalog-database+::
+Specifies the database name for the HCatalog table. If not specified,
+the default database name +default+ is used. Providing the
++--hcatalog-database+ option without +--hcatalog-table+ is an error.
+This is not a required option.
+
++--hcatalog-table+::
+The argument value for this option is the HCatalog tablename.
+The presence of the +--hcatalog-table+ option signifies that the import
+or export job is done using HCatalog tables, and it is a required option for
+HCatalog jobs.
+
++--hcatalog-home+::
+The home directory for the HCatalog installation. The directory is
+expected to have a +lib+ subdirectory and a +share/hcatalog+ subdirectory
+with necessary HCatalog libraries. If not specified, the system property
++hcatalog.home+ will be checked and failing that, a system environment
+variable +HCAT_HOME+ will be checked. If none of these are set, the
+default value will be used and currently the default is set to
++/usr/lib/hcatalog+.
+This is not a required option.
+
++--create-hcatalog-table+::
+
+This option specifies whether an HCatalog table should be created
+automatically when importing data. By default, HCatalog tables are assumed
+to exist. The table name will be the same as the database table name
+translated to lower case. Further described in +Automatic Table Creation+
+below.
+
++--hcatalog-storage-stanza+::
+
+This option specifies the storage stanza to be appended to the table.
+Further described in +Automatic Table Creation+ below.
+
+Supported Sqoop Hive Options
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following Sqoop options are also used along with the +--hcatalog-table+
+option to provide additional input to the HCatalog jobs. Some of the existing
+Hive import job options are reused with HCatalog jobs instead of creating
+HCatalog-specific options for the same purpose.
+
++--map-column-hive+::
+This option maps a database column to HCatalog with a specific HCatalog
+type.
+
++--hive-home+::
+The Hive home location.
+
++--hive-partition-key+::
+Used for static partitioning filter. The partitioning key should be of
+type STRING. There can be only one static partitioning key.
+
++--hive-partition-value+::
+The value associated with the partition.
+
+Unsupported Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Unsupported Sqoop Hive Import Options
++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop Hive import options are not supported with HCatalog jobs.
+
+* +--hive-import+
+* +--hive-overwrite+
+
+Unsupported Sqoop Export and Import Options
++++++++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop export and import options are not supported with HCatalog jobs.
+
+* +--direct+
+* +--export-dir+
+* +--target-dir+
+* +--warehouse-dir+
+* +--append+
+* +--as-sequencefile+
+* +--as-avrofile+
+
+Ignored Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^
+
+The following options are ignored with HCatalog jobs.
+
+* All input delimiter options are ignored.
+
+* Output delimiters are generally ignored unless either
++--hive-drop-import-delims+ or +--hive-delims-replacement+ is used. When the
++--hive-drop-import-delims+ or +--hive-delims-replacement+ option is
+specified, all +CHAR+ type database table columns will be post-processed
+to either remove or replace the delimiters, respectively. See +Delimited Text
+Formats and Field and Line Delimiter Characters+ below. This is only needed
+if the HCatalog table uses text formats.
+
+Automatic Table Creation
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the key features of Sqoop is to manage and create the table metadata
+when importing into Hadoop. HCatalog import jobs also provide for this
+feature with the option +--create-hcatalog-table+. Furthermore, one of the
+important benefits of the HCatalog integration is to provide storage
+agnosticism to Sqoop data movement jobs. To provide for that feature,
+HCatalog import jobs provide an option that lets a user specifiy the
+storage format for the created table.
+
+The option +--create-hcatalog-table+ is used as an indicator that a table
+has to be created as part of the HCatalog import job. If the option
++--create-hcatalog-table+ is specified and the table exists, then the
+table creation will fail and the job will be aborted.
+
+The option +--hcatalog-storage-stanza+ can be used to specify the storage
+format of the newly created table. The default value for this option is
++stored as rcfile+. The value specified for this option is assumed to be a
+valid Hive storage format expression. It will be appended to the +create table+
+command generated by the HCatalog import job as part of automatic table
+creation. Any error in the storage stanza will cause the table creation to
+fail and the import job will be aborted.
+
+Any additional resources needed to support the storage format referenced in
+the option +--hcatalog-storage-stanza+ should be provided to the job either
+by placing them in +$HIVE_HOME/lib+ or by providing them in +HADOOP_CLASSPATH+
+and +LIBJAR+ files.
+
+If the option +--hive-partition-key+ is specified, then the value of this
+option is used as the partitioning key for the newly created table. Only
+one partitioning key can be specified with this option.
+
+Object names are mapped to the lowercase equivalents as specified below
+when mapped to an HCatalog table. This includes the table name (which
+is the same as the external store table name converted to lower case)
+and field names.
+
+Delimited Text Formats and Field and Line Delimiter Characters
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog supports delimited text format as one of the table storage formats.
+But when delimited text is used and the imported data has fields that contain
+those delimiters, then the data may be parsed into a different number of
+fields and records by Hive, thereby losing data fidelity.
+
+For this case, one of these existing Sqoop import options can be used:
+
+* +--hive-delims-replacement+
+
+* +--hive-drop-import-delims+
+
+If either of these options is provided for import, then any column of type
+STRING will be formatted with the Hive delimiter processing and then written
+to the HCatalog table.
+
+HCatalog Table Requirements
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The HCatalog table should be created before using it as part of a Sqoop job
+if the default table creation options (with optional storage stanza) are not
+sufficient. All storage formats supported by HCatalog can be used with the
+creation of the HCatalog tables. This makes this feature readily adopt new
+storage formats that come into the Hive project, such as ORC files.
+
+Support for Partitioning
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Sqoop HCatalog feature supports the following table types:
+
+* Unpartitioned tables
+
+* Partitioned tables with a static partitioning key specified
+
+* Partitioned tables with dynamic partition keys from the database
+result set
+
+* Partitioned tables with a combination of a static key and additional
+dynamic partitioning keys
+
+Schema Mapping
+~~~~~~~~~~~~~~
+
+Sqoop currently does not support column name mapping. However, the user
+is allowed to override the type mapping. Type mapping loosely follows
+the Hive type mapping already present in Sqoop except that SQL types
+“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
+mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+used for checking the column definition correctness only and can be overridden
+with the --map-column-hive option.
+
+All types except binary are assignable to a String type.
+
+Any field of number type (int, shortint, tinyint, bigint and bigdecimal,
+float and double) is assignable to another field of any number type during
+exports and imports. Depending on the precision and scale of the target type
+of assignment, truncations can occur.
+
+Furthermore, date/time/timestamps are mapped to string (the full
+date/time/timestamp representation) or bigint (the number of milliseconds
+since epoch) during imports and exports.
+
+BLOBs and CLOBs are only supported for imports. The BLOB/CLOB objects when
+imported are stored in a Sqoop-specific format and knowledge of this format
+is needed for processing these objects in a Pig/Hive job or another Map Reduce
+job.
+
+Database column names are mapped to their lowercase equivalents when mapped
+to the HCatalog fields. Currently, case-sensitive database object names are
+not supported.
+
+Projection of a set of columns from a table to an HCatalog table or loading
+to a column projection is allowed, subject to table constraints. The dynamic
+partitioning columns, if any, must be part of the projection when importing
+data into HCatalog tables.
+
+Dynamic partitioning fields should be mapped to database columns that are
+defined with the NOT NULL attribute (although this is not validated). A null
+value during import for a dynamic partitioning column will abort the Sqoop
+job.
+
+Support for HCatalog Data Types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+All the primitive HCatalog types are supported. Currently all the complex
+HCatalog types are unsupported.
+
+BLOB/CLOB database types are only supported for imports.
+
+Providing Hive and HCatalog Libraries for the Sqoop Job
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+With the support for HCatalog added to Sqoop, any HCatalog job depends on a
+set of jar files being available both on the Sqoop client host and where the
+Map/Reduce tasks run. To run HCatalog jobs, the environment variable
++HADOOP_CLASSPATH+ must be set up as shown below before launching the Sqoop
+HCatalog jobs.
+
++HADOOP_CLASSPATH=$(hcat -classpath)+
++export HADOOP_CLASSPATH+
+
+
+The necessary HCatalog dependencies will be copied to the distributed cache
+automatically by the Sqoop job.
+
+Examples
+~~~~~~~~
+
+Create an HCatalog table, such as:
+
++hcat -e "create table txn(txn_date string, cust_id string, amount float,
+store_id int) partitioned by (cust_id string) stored as rcfile;"+
+
+
+Then Sqoop import and export of the "txn" HCatalog table can be invoked as
+follows:
+
+Import
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop import --connect -table --hcatalog-table txn +
+
+Export
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop export --connect -table --hcatalog-table txn +
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f18d43e2..4be6a6a7 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -59,6 +59,10 @@ public class SqoopOptions implements Cloneable {
public static final String METASTORE_PASSWORD_KEY =
"sqoop.metastore.client.record.password";
+ // Default hive and hcat locations.
+ public static final String DEF_HIVE_HOME = "/usr/lib/hive";
+ public static final String DEF_HCAT_HOME = "/usr/lib/hcatalog";
+
public static final boolean METASTORE_PASSWORD_DEFAULT = false;
/**
@@ -151,6 +155,15 @@ public String toString() {
private String hiveDelimsReplacement;
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
+ @StoredAsProperty("hcatalog.table.name")
+ private String hCatTableName;
+ @StoredAsProperty("hcatalog.database.name")
+ private String hCatDatabaseName;
+ @StoredAsProperty("hcatalog.create.table")
+ private boolean hCatCreateTable;
+ @StoredAsProperty("hcatalog.storage.stanza")
+ private String hCatStorageStanza;
+ private String hCatHome; // not serialized to metastore.
// User explicit mapping of types
private Properties mapColumnJava; // stored as map.colum.java
@@ -197,7 +210,9 @@ public String toString() {
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
- private boolean areDelimsManuallySet;
+
+ private boolean areOutputDelimsManuallySet;
+ private boolean areInputDelimsManuallySet;
private Configuration conf;
@@ -580,7 +595,8 @@ public void loadProperties(Properties props) {
// Delimiters were previously memoized; don't let the tool override
// them with defaults.
- this.areDelimsManuallySet = true;
+ this.areOutputDelimsManuallySet = true;
+ this.areInputDelimsManuallySet = true;
// If we loaded true verbose flag, we need to apply it
if (this.verbose) {
@@ -804,7 +820,21 @@ public static void clearNonceDir() {
public static String getHiveHomeDefault() {
// Set this with $HIVE_HOME, but -Dhive.home can override.
String hiveHome = System.getenv("HIVE_HOME");
- return System.getProperty("hive.home", hiveHome);
+ hiveHome = System.getProperty("hive.home", hiveHome);
+ if (hiveHome == null) {
+ hiveHome = DEF_HIVE_HOME;
+ }
+ return hiveHome;
+ }
+
+ public static String getHCatHomeDefault() {
+ // Set this with $HCAT_HOME, but -Dhcatalog.home can override.
+ String hcatHome = System.getenv("HCAT_HOME");
+ hcatHome = System.getProperty("hcat.home", hcatHome);
+ if (hcatHome == null) {
+ hcatHome = DEF_HCAT_HOME;
+ }
+ return hcatHome;
}
private void initDefaults(Configuration baseConfiguration) {
@@ -813,6 +843,7 @@ private void initDefaults(Configuration baseConfiguration) {
this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
this.hiveHome = getHiveHomeDefault();
+ this.hCatHome = getHCatHomeDefault();
this.inputDelimiters = new DelimiterSet(
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR,
@@ -834,7 +865,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.jarDirIsAuto = true;
this.layout = FileLayout.TextFile;
- this.areDelimsManuallySet = false;
+ this.areOutputDelimsManuallySet = false;
+ this.areInputDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
this.useCompression = false;
@@ -1263,6 +1295,47 @@ public void setFailIfHiveTableExists(boolean fail) {
this.failIfHiveTableExists = fail;
}
+ // HCatalog support
+ public void setHCatTableName(String ht) {
+ this.hCatTableName = ht;
+ }
+
+ public String getHCatTableName() {
+ return this.hCatTableName;
+ }
+
+ public void setHCatDatabaseName(String hd) {
+ this.hCatDatabaseName = hd;
+ }
+
+ public String getHCatDatabaseName() {
+ return this.hCatDatabaseName;
+ }
+
+
+ public String getHCatHome() {
+ return hCatHome;
+ }
+
+ public void setHCatHome(String home) {
+ this.hCatHome = home;
+ }
+
+ public boolean doCreateHCatalogTable() {
+ return hCatCreateTable;
+ }
+
+ public void setCreateHCatalogTable(boolean create) {
+ this.hCatCreateTable = create;
+ }
+
+ public void setHCatStorageStanza(String stanza) {
+ this.hCatStorageStanza = stanza;
+ }
+
+ public String getHCatStorageStanza() {
+ return this.hCatStorageStanza;
+ }
/**
* @return location where .java files go; guaranteed to end with '/'.
*/
@@ -1673,18 +1746,32 @@ public void setFetchSize(Integer size) {
this.fetchSize = size;
}
- /**
- * @return true if the delimiters have been explicitly set by the user.
+ /*
+ * @return true if the output delimiters have been explicitly set by the user
*/
- public boolean explicitDelims() {
- return areDelimsManuallySet;
+ public boolean explicitOutputDelims() {
+ return areOutputDelimsManuallySet;
}
/**
- * Flag the delimiter settings as explicit user settings, or implicit.
+ * Flag the output delimiter settings as explicit user settings, or implicit.
*/
- public void setExplicitDelims(boolean explicit) {
- this.areDelimsManuallySet = explicit;
+ public void setExplicitOutputDelims(boolean explicit) {
+ this.areOutputDelimsManuallySet = explicit;
+ }
+
+ /**
+ * @return true if the input delimiters have been explicitly set by the user.
+ */
+ public boolean explicitInputDelims() {
+ return areInputDelimsManuallySet;
+ }
+
+ /**
+ * Flag the input delimiter settings as explicit user settings, or implicit.
+ */
+ public void setExplicitInputDelims(boolean explicit) {
+ this.areInputDelimsManuallySet = explicit;
}
public Configuration getConf() {
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index 53540635..2070b639 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -60,6 +60,18 @@ public final class ConfigurationConstants {
public static final String PROP_MAPRED_JOB_TRACKER_ADDRESS =
"mapred.job.tracker";
+ /**
+ * The Configuration property identifying the job tracker address (new).
+ */
+ public static final String PROP_MAPREDUCE_JOB_TRACKER_ADDRESS =
+ "mapreduce.jobtracker.address";
+
+ /**
+ * The Configuration property identifying the framework name. If set to YARN
+ * then we will not be in local mode.
+ */
+ public static final String PROP_MAPREDUCE_FRAMEWORK_NAME =
+ "mapreduce.framework.name";
/**
* The group name of task counters.
*/
@@ -78,6 +90,11 @@ public final class ConfigurationConstants {
public static final String COUNTER_MAP_INPUT_RECORDS =
"MAP_INPUT_RECORDS";
+ /**
+ * The name of the parameter for ToolRunner to set jars to add to distcache.
+ */
+ public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
+
private ConfigurationConstants() {
// Disable Explicit Object Creation
}
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 838f0838..02596a67 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -60,6 +60,15 @@ public class HiveImport {
private ConnManager connManager;
private Configuration configuration;
private boolean generateOnly;
+ private static boolean testMode = false;
+
+ public static boolean getTestMode() {
+ return testMode;
+ }
+
+ public static void setTestMode(boolean mode) {
+ testMode = mode;
+ }
/** Entry point through which Hive invocation should be attempted. */
private static final String HIVE_MAIN_CLASS =
@@ -285,6 +294,14 @@ private void executeScript(String filename, List env)
throws IOException {
SubprocessSecurityManager subprocessSM = null;
+ if (testMode) {
+ // We use external mock hive process for test mode as
+ // HCatalog dependency would have brought in Hive classes.
+ LOG.debug("Using external Hive process in test mode.");
+ executeExternalHiveScript(filename, env);
+ return;
+ }
+
try {
Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index a1ac38e7..3549bda8 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -164,6 +164,70 @@ public String toHiveType(int sqlType) {
return HiveTypes.toHiveType(sqlType);
}
+ /**
+ * Resolve a database-specific type to HCat data type. Largely follows Sqoop's
+ * hive translation.
+ * @param sqlType
+ * sql type
+ * @return hcat type
+ */
+ public String toHCatType(int sqlType) {
+ switch (sqlType) {
+
+ // Ideally TINYINT and SMALLINT should be mapped to their
+ // HCat equivalents tinyint and smallint respectively
+ // But the Sqoop Java type conversion has them mapped to Integer
+ // Even though the referenced Java doc clearly recommends otherwise.
+ // Chaning this now can cause many of the sequence file usages to
+ // break as value class implementations will change. So, we
+ // just use the same behavior here.
+ case Types.SMALLINT:
+ case Types.TINYINT:
+ case Types.INTEGER:
+ return "int";
+
+ case Types.VARCHAR:
+ case Types.CHAR:
+ case Types.LONGVARCHAR:
+ case Types.NVARCHAR:
+ case Types.NCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ case Types.CLOB:
+ return "string";
+
+ case Types.FLOAT:
+ case Types.REAL:
+ return "float";
+
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return "string";
+
+ case Types.DOUBLE:
+ return "double";
+
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return "boolean";
+
+ case Types.BIGINT:
+ return "bigint";
+
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.BLOB:
+ case Types.LONGVARBINARY:
+ return "binary";
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot convert SQL type to HCatalog type " + sqlType);
+ }
+ }
+
/**
* Resolve a database-specific type to Avro data type.
* @param sqlType sql type
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index ef1d363c..5afd90c1 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -30,6 +31,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.LargeObjectLoader;
@@ -63,6 +65,13 @@ public DataDrivenImportJob(final SqoopOptions opts,
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
+ if (isHCatJob) {
+ LOG.info("Configuring mapper for HCatalog import job");
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
+ job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
+ return;
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
// For text files, specify these as the output types; for
// other types, we just use the defaults.
@@ -82,6 +91,9 @@ protected void configureMapper(Job job, String tableName,
@Override
protected Class extends Mapper> getMapperClass() {
+ if (options.getHCatTableName() != null) {
+ return SqoopHCatUtilities.getImportMapperClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return TextImportMapper.class;
} else if (options.getFileLayout()
@@ -98,6 +110,10 @@ protected Class extends Mapper> getMapperClass() {
@Override
protected Class extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ LOG.debug("Returning HCatOutputFormat for output format");
+ return SqoopHCatUtilities.getOutputFormatClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return RawKeyTextOutputFormat.class;
} else if (options.getFileLayout()
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 1065d0ba..d0be5705 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
@@ -57,7 +58,7 @@ public class ExportJobBase extends JobBase {
* The (inferred) type of a file or group of files.
*/
public enum FileType {
- SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+ SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
}
public static final Log LOG = LogFactory.getLog(
@@ -80,6 +81,7 @@ public enum FileType {
protected ExportJobContext context;
+
public ExportJobBase(final ExportJobContext ctxt) {
this(ctxt, null, null, null);
}
@@ -195,6 +197,9 @@ private static FileType fromMagicNumber(Path file, Configuration conf) {
* @return the Path to the files we are going to export to the db.
*/
protected Path getInputPath() throws IOException {
+ if (isHCatJob) {
+ return null;
+ }
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
inputPath = inputPath.makeQualified(FileSystem.get(conf));
@@ -207,7 +212,9 @@ protected void configureInputFormat(Job job, String tableName,
throws ClassNotFoundException, IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- FileInputFormat.addInputPath(job, getInputPath());
+ if (!isHCatJob) {
+ FileInputFormat.addInputPath(job, getInputPath());
+ }
}
@Override
@@ -371,6 +378,12 @@ public void runExport() throws ExportException, IOException {
}
propagateOptionsToJob(job);
+ if (isHCatJob) {
+ LOG.info("Configuring HCatalog for export job");
+ SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+ hCatUtils.configureHCat(options, job, cmgr, tableName,
+ job.getConfiguration());
+ }
configureInputFormat(job, tableName, tableClassName, null);
configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName);
@@ -448,6 +461,9 @@ protected boolean inputIsSequenceFiles() {
}
protected FileType getInputFileType() {
+ if (isHCatJob) {
+ return FileType.HCATALOG_MANAGED_FILE;
+ }
try {
return getFileType(context.getOptions().getConf(), getInputPath());
} catch (IOException ioe) {
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 2465f3f4..ab7f21e7 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ protected void configureOutputFormat(Job job, String tableName,
job.setOutputFormatClass(getOutputFormatClass());
+ if (isHCatJob) {
+ LOG.debug("Configuring output format for HCatalog import job");
+ SqoopHCatUtilities.configureImportOutputFormat(options, job,
+ getContext().getConnManager(), tableName, job.getConfiguration());
+ return;
+ }
+
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
job.getConfiguration().set("mapred.output.value.class", tableClassName);
}
@@ -149,6 +157,11 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
perfCounters.startClock();
boolean success = doSubmitJob(job);
+
+ if (isHCatJob) {
+ SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+ }
+
perfCounters.stopClock();
Counters jobCounters = job.getCounters();
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 20636a0b..fee78e02 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -65,7 +66,11 @@ protected void configureInputFormat(Job job, String tableName,
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- if (fileType == FileType.AVRO_DATA_FILE) {
+ if (isHCatJob) {
+ SqoopHCatUtilities.configureExportInputFormat(options, job,
+ context.getConnManager(), tableName, job.getConfiguration());
+ return;
+ } else if (fileType == FileType.AVRO_DATA_FILE) {
LOG.debug("Configuring for Avro export");
ConnManager connManager = context.getConnManager();
Map columnTypeInts;
@@ -93,6 +98,9 @@ protected void configureInputFormat(Job job, String tableName,
@Override
protected Class extends InputFormat> getInputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getInputFormatClass();
+ }
if (fileType == FileType.AVRO_DATA_FILE) {
return AvroInputFormat.class;
}
@@ -101,6 +109,9 @@ protected Class extends InputFormat> getInputFormatClass()
@Override
protected Class extends Mapper> getMapperClass() {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getExportMapperClass();
+ }
switch (fileType) {
case SEQUENCE_FILE:
return SequenceFileExportMapper.class;
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 0df11562..322df1c9 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -56,6 +56,7 @@ public class JobBase {
private Job mrJob;
private ClassLoader prevClassLoader = null;
+ protected final boolean isHCatJob;
public static final String PROPERTY_VERBOSE = "sqoop.verbose";
@@ -76,6 +77,7 @@ public JobBase(final SqoopOptions opts,
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
+ isHCatJob = options.getHCatTableName() != null;
}
/**
@@ -220,7 +222,7 @@ private void addDirToCache(File dir, FileSystem fs, Set localUrls) {
*/
protected void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
-
+
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
new file mode 100644
index 00000000..47febf76
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+
+/**
+ * A combined HCatInputFormat equivalent that allows us to generate the number
+ * of splits to the number of map tasks.
+ *
+ * The logic is simple. We get the list of splits for HCatInputFormat. If it is
+ * less than the number of mappers, all is good. Else, we sort the splits by
+ * size and assign them to each of the mappers in a simple scheme. After
+ * assigning the splits to each of the mapper, for the next round we start with
+ * the mapper that got the last split. That way, the size of the split is
+ * distributed in a more uniform fashion than a simple round-robin assignment.
+ */
+public class SqoopHCatExportFormat extends HCatInputFormat {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportFormat.class.getName());
+
+ @Override
+ public List getSplits(JobContext job)
+ throws IOException, InterruptedException {
+ List hCatSplits = super.getSplits(job);
+ int hCatSplitCount = hCatSplits.size();
+ int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
+ if (expectedSplitCount == 0) {
+ expectedSplitCount = hCatSplitCount;
+ }
+ LOG.debug("Expected split count " + expectedSplitCount);
+ LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
+ // Sort the splits by length descending.
+
+ Collections.sort(hCatSplits, new Comparator() {
+ @Override
+ public int compare(InputSplit is1, InputSplit is2) {
+ try {
+ return (int) (is2.getLength() - is1.getLength());
+ } catch (Exception e) {
+ LOG.warn("Exception caught while sorting Input splits " + e);
+ }
+ return 0;
+ }
+ });
+ List combinedSplits = new ArrayList();
+
+ // The number of splits generated by HCatInputFormat is within
+ // our limits
+
+ if (hCatSplitCount <= expectedSplitCount) {
+ for (InputSplit split : hCatSplits) {
+ List hcSplitList = new ArrayList();
+ hcSplitList.add(split);
+ combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
+ }
+ return combinedSplits;
+ }
+ List> combinedSplitList =
+ new ArrayList>();
+ for (int i = 0; i < expectedSplitCount; i++) {
+ combinedSplitList.add(new ArrayList());
+ }
+ boolean ascendingAssigment = true;
+
+ int lastSet = 0;
+ for (int i = 0; i < hCatSplitCount; ++i) {
+ int splitNum = i % expectedSplitCount;
+ int currentSet = i / expectedSplitCount;
+ if (currentSet != lastSet) {
+ ascendingAssigment = !ascendingAssigment;
+ }
+ if (ascendingAssigment) {
+ combinedSplitList.get(splitNum).add(hCatSplits.get(i));
+ } else {
+ combinedSplitList.
+ get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
+ }
+ lastSet = currentSet;
+ }
+ for (int i = 0; i < expectedSplitCount; i++) {
+ SqoopHCatInputSplit sqoopSplit =
+ new SqoopHCatInputSplit(combinedSplitList.get(i));
+ combinedSplits.add(sqoopSplit);
+ }
+
+ return combinedSplits;
+
+ }
+
+ @Override
+ public RecordReader
+ createRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a SqoopHCatRecordReader");
+ return new SqoopHCatRecordReader(split, taskContext, this);
+ }
+
+ public RecordReader
+ createHCatRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a base HCatRecordReader");
+ return super.createRecordReader(split, taskContext);
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
new file mode 100644
index 00000000..539cedf0
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * A mapper that works on combined hcat splits.
+ */
+public class SqoopHCatExportMapper
+ extends
+ AutoProgressMapper {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportMapper.class.getName());
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private List hCatSchemaFields;
+
+ private SqoopRecord sqoopRecord;
+ private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+ private static final String TIME_TYPE = "java.sql.Time";
+ private static final String DATE_TYPE = "java.sql.Date";
+ private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+ private static final String FLOAT_TYPE = "Float";
+ private static final String DOUBLE_TYPE = "Double";
+ private static final String BYTE_TYPE = "Byte";
+ private static final String SHORT_TYPE = "Short";
+ private static final String INTEGER_TYPE = "Integer";
+ private static final String LONG_TYPE = "Long";
+ private static final String BOOLEAN_TYPE = "Boolean";
+ private static final String STRING_TYPE = "String";
+ private static final String BYTESWRITABLE =
+ "org.apache.hadoop.io.BytesWritable";
+ private static boolean debugHCatExportMapper = false;
+ private MapWritable colTypesJava;
+ private MapWritable colTypesSql;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ colTypesJava = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+ colTypesSql = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+ // Instantiate a copy of the user's class to hold and parse the record.
+
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+ debugHCatExportMapper = conf.getBoolean(
+ SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == sqoopRecord) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+ HCatSchema partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+ for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+ hCatFullTableSchema.append(hfs);
+ }
+ hCatSchemaFields = hCatFullTableSchema.getFields();
+
+ }
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value,
+ Context context)
+ throws IOException, InterruptedException {
+ context.write(convertToSqoopRecord(value), NullWritable.get());
+ }
+
+ private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+ throws IOException {
+ Text key = new Text();
+ for (Map.Entry e : sqoopRecord.getFieldMap().entrySet()) {
+ String colName = e.getKey();
+ String hfn = colName.toLowerCase();
+ key.set(hfn);
+ String javaColType = colTypesJava.get(key).toString();
+ int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+ HCatFieldSchema field =
+ hCatFullTableSchema.get(hfn);
+ HCatFieldSchema.Type fieldType = field.getType();
+ Object hCatVal =
+ hcr.get(hfn, hCatFullTableSchema);
+ String hCatTypeString = field.getTypeString();
+ Object sqlVal = convertToSqoop(hCatVal, fieldType,
+ javaColType, hCatTypeString);
+ if (debugHCatExportMapper) {
+ LOG.debug("hCatVal " + hCatVal + " of type "
+ + (hCatVal == null ? null : hCatVal.getClass().getName())
+ + ",sqlVal " + sqlVal + " of type "
+ + (sqlVal == null ? null : sqlVal.getClass().getName())
+ + ",java type " + javaColType + ", sql type = "
+ + SqoopHCatUtilities.sqlTypeString(sqlType));
+ }
+ sqoopRecord.setField(colName, sqlVal);
+ }
+ return sqoopRecord;
+ }
+
+ private Object convertToSqoop(Object val,
+ HCatFieldSchema.Type fieldType, String javaColType,
+ String hCatTypeString) throws IOException {
+
+ if (val == null) {
+ return null;
+ }
+
+ switch (fieldType) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case FLOAT:
+ case DOUBLE:
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BOOLEAN:
+ val = convertBooleanTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BIGINT:
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date((Long) val);
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time((Long) val);
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp((Long) val);
+ } else {
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ }
+ break;
+ case STRING:
+ val = convertStringTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BINARY:
+ val = convertBinaryTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new IOException("Cannot convert HCatalog type "
+ + fieldType);
+ }
+ LOG.error("Cannot convert HCatalog object of "
+ + " type " + hCatTypeString + " to java object type "
+ + javaColType);
+ return null;
+ }
+
+ private Object convertBinaryTypes(Object val, String javaColType) {
+ byte[] bb = (byte[]) val;
+ if (javaColType.equals(BYTESWRITABLE)) {
+ BytesWritable bw = new BytesWritable();
+ bw.set(bb, 0, bb.length);
+ return bw;
+ }
+ return null;
+ }
+
+ private Object convertStringTypes(Object val, String javaColType) {
+ String valStr = val.toString();
+ if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(valStr);
+ } else if (javaColType.equals(DATE_TYPE)
+ || javaColType.equals(TIME_TYPE)
+ || javaColType.equals(TIMESTAMP_TYPE)) {
+ // Oracle expects timestamps for Date also by default based on version
+ // Just allow all date types to be assignment compatible
+ if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+ Date d = Date.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return d;
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(d.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(d.getTime());
+ }
+ } else if (valStr.length() == 8) { // time in hh:mm:ss
+ Time t = Time.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(t.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return t;
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(t.getTime());
+ }
+ } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+ Timestamp ts = Timestamp.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(ts.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(ts.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return ts;
+ }
+ } else {
+ return null;
+ }
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return valStr;
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return Boolean.valueOf(valStr);
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return Byte.parseByte(valStr);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return Short.parseShort(valStr);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return Integer.parseInt(valStr);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return Long.parseLong(valStr);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return Float.parseFloat(valStr);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return Double.parseDouble(valStr);
+ }
+ return null;
+ }
+
+ private Object convertBooleanTypes(Object val, String javaColType) {
+ Boolean b = (Boolean) val;
+ if (javaColType.equals(BOOLEAN_TYPE)) {
+ return b;
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return (byte) (b ? 1 : 0);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return (short) (b ? 1 : 0);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return (int) (b ? 1 : 0);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return (long) (b ? 1 : 0);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return (float) (b ? 1 : 0);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return (double) (b ? 1 : 0);
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(b ? 1 : 0);
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return val.toString();
+ }
+ return null;
+ }
+
+ private Object convertNumberTypes(Object val, String javaColType) {
+ Number n = (Number) val;
+ if (javaColType.equals(BYTE_TYPE)) {
+ return n.byteValue();
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return n.shortValue();
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return n.intValue();
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return n.longValue();
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return n.floatValue();
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return n.doubleValue();
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(n.doubleValue());
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return n.toString();
+ }
+ return null;
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
new file mode 100644
index 00000000..4f0ff1ba
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * A mapper for HCatalog import.
+ */
+public class SqoopHCatImportMapper extends
+ SqoopMapper {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatImportMapper.class.getName());
+
+ private static boolean debugHCatImportMapper = false;
+
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private int fieldCount;
+ private boolean bigDecimalFormatString;
+ private LargeObjectLoader lobLoader;
+ private HCatSchema partitionSchema = null;
+ private HCatSchema dataColsSchema = null;
+ private String stringDelimiterReplacements = null;
+ private ArrayWritable delimCharsArray;
+ private String hiveDelimsReplacement;
+ private boolean doHiveDelimsReplacement = false;
+ private DelimiterSet hiveDelimiters;
+ private String staticPartitionKey;
+ private int[] hCatFieldPositions;
+ private int colCount;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+ partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ StringBuilder storerInfoStr = new StringBuilder(1024);
+ StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+ storerInfoStr.append("HCatalog Storer Info : ")
+ .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
+ .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+ .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+ .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+ Properties storerProperties = storerInfo.getProperties();
+ if (!storerProperties.isEmpty()) {
+ storerInfoStr.append("\nStorer properties ");
+ for (Map.Entry