diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index e6041975..c042e7c6 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -86,6 +86,20 @@ if [ -z "${HCAT_HOME}" ]; then
HCAT_HOME=${SQOOP_HOME}/../hive-hcatalog
fi
fi
+if [ -z "${ACCUMULO_HOME}" ]; then
+ if [ -d "/usr/lib/accumulo" ]; then
+ ACCUMULO_HOME=/usr/lib/accumulo
+ else
+ ACCUMULO_HOME=${SQOOP_HOME}/../accumulo
+ fi
+fi
+if [ -z "${ZOOKEEPER_HOME}" ]; then
+ if [ -d "/usr/lib/zookeeper" ]; then
+ ZOOKEEPER_HOME=/usr/lib/zookeeper
+ else
+ ZOOKEEPER_HOME=${SQOOP_HOME}/../zookeeper
+ fi
+fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -111,6 +125,15 @@ if [ ! -d "${HCAT_HOME}" ]; then
echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
fi
+if [ ! -d "${ACCUMULO_HOME}" ]; then
+ echo "Warning: $ACCUMULO_HOME does not exist! Accumulo imports will fail."
+ echo 'Please set $ACCUMULO_HOME to the root of your Accumulo installation.'
+fi
+if [ ! -d "${ZOOKEEPER_HOME}" ]; then
+ echo "Warning: $ZOOKEEPER_HOME does not exist! Accumulo imports will fail."
+ echo 'Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.'
+fi
+
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@@ -150,6 +173,16 @@ if [ -e "${HCAT_HOME}/bin/hcat" ]; then
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
+# Add Accumulo to dependency list
+if [ -e "$ACCUMULO_HOME/bin/accumulo" ]; then
+ for jn in `$ACCUMULO_HOME/bin/accumulo classpath | grep file:.*accumulo.*jar | cut -d':' -f2`; do
+ SQOOP_CLASSPATH=$SQOOP_CLASSPATH:$jn
+ done
+ for jn in `$ACCUMULO_HOME/bin/accumulo classpath | grep file:.*zookeeper.*jar | cut -d':' -f2`; do
+ SQOOP_CLASSPATH=$SQOOP_CLASSPATH:$jn
+ done
+fi
+
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
if [ -d "${ZOOCFGDIR}" ]; then
SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -182,4 +215,4 @@ export HADOOP_MAPRED_HOME
export HBASE_HOME
export HCAT_HOME
export HIVE_CONF_DIR
-
+export ACCUMULO_HOME
diff --git a/bin/configure-sqoop.cmd b/bin/configure-sqoop.cmd
index ec57e375..4598bc8c 100644
--- a/bin/configure-sqoop.cmd
+++ b/bin/configure-sqoop.cmd
@@ -72,6 +72,14 @@ if not defined HBASE_HOME (
echo Warning: HBASE_HOME and HBASE_VERSION not set.
)
)
+::
+:: Check for Accumulo dependency
+if not defined ACCUMULO_HOME (
+ echo Warning: ACCUMULO_HOME not set.
+)
+if not defined ZOOKEEPER_HOME (
+ echo Warning: ZOOKEEPER_HOME not set.
+)
:: Check: If we can't find our dependencies, give up here.
@@ -91,6 +99,14 @@ if not exist "%HBASE_HOME%" (
echo Warning: HBASE_HOME does not exist! HBase imports will fail.
echo Please set HBASE_HOME to the root of your HBase installation.
)
+if not exist "%ACCUMULO_HOME%" (
+ echo Warning: ACCUMULO_HOME does not exist! Accumulo imports will fail.
+ echo Please set ACCUMULO_HOME to the root of your Accumulo installation.
+)
+if not exist "%ZOOKEEPER_HOME%" (
+ echo Warning: ZOOKEEPER_HOME does not exist! Accumulo imports will fail.
+ echo Please set ZOOKEEPER_HOME to the root of your Zookeeper installation.
+)
:: Add sqoop dependencies to classpath
set SQOOP_CLASSPATH=
@@ -114,6 +130,12 @@ if exist "%HBASE_HOME%" (
call :add_dir_to_classpath %HBASE_HOME%
call :add_dir_to_classpath %HBASE_HOME%\lib
)
+::
+:: Add Accumulo to dependency list
+if exist "%ACCUMULO_HOME%" (
+ call :add_dir_to_classpath %ACCUMULO_HOME%
+ call :add_dir_to_classpath %ACCUMULO_HOME%\lib
+)
if not defined ZOOCFGDIR (
if defined ZOOKEEPER_CONF_DIR (
diff --git a/ivy.xml b/ivy.xml
index c5130ae7..c00d30c6 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -36,21 +36,22 @@ under the License.
+
+ extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
+ extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
+ extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
+ extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
+ extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
@@ -185,6 +186,14 @@ under the License.
+
+
+
+
+
+
diff --git a/src/docs/user/accumulo-args.txt b/src/docs/user/accumulo-args.txt
new file mode 100644
index 00000000..d553a677
--- /dev/null
+++ b/src/docs/user/accumulo-args.txt
@@ -0,0 +1,50 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+
+.Accumulo arguments:
+[grid="all"]
+`-------------------------------------`-----------------------------------
+Argument Description
+--------------------------------------------------------------------------
++\--accumulo-table + Specifies an Accumulo table to use\
+ as the target instead of HDFS
++\--accumulo-column-family + Sets the target column family for\
+ the import
++\--accumulo-create-table+ If specified, create missing\
+ Accumulo tables
++\--accumulo-row-key + Specifies which input column to use\
+ as the row key
++\--accumulo-visibility + (Optional) Specifies a visibility\
+ token to apply to all rows inserted\
+ into Accumulo. Default is the\
+ empty string.
++\--accumulo-batch-size + (Optional) Sets the size in bytes\
+ of Accumulo's write buffer. Default\
+ is 4MB.
++\--accumulo-max-latency + (Optional) Sets the max latency in\
+ milliseconds for the Accumulo\
+ batch writer. Default is 0.
++\--accumulo-zookeepers + Comma-separated list of Zookeeper\
+ servers used by the Accumulo instance
++\--accumulo-instance + Name of the target Accumulo instance
++\--accumulo-user + Name of the Accumulo user to import as
++\--accumulo-password + Password for the Accumulo user
+--------------------------------------------------------------------------
+
diff --git a/src/docs/user/accumulo.txt b/src/docs/user/accumulo.txt
new file mode 100644
index 00000000..6869eb87
--- /dev/null
+++ b/src/docs/user/accumulo.txt
@@ -0,0 +1,65 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+
+Importing Data Into Accumulo
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Sqoop supports importing records into a table in Accumulo
+
+By specifying +\--accumulo-table+, you instruct Sqoop to import
+to a table in Accumulo rather than a directory in HDFS. Sqoop will
+import data to the table specified as the argument to +\--accumulo-table+.
+Each row of the input table will be transformed into an Accumulo
++Mutation+ operation to a row of the output table. The key for each row is
+taken from a column of the input. By default Sqoop will use the split-by
+column as the row key column. If that is not specified, it will try to
+identify the primary key column, if any, of the source table. You can
+manually specify the row key column with +\--accumulo-row-key+. Each output
+column will be placed in the same column family, which must be specified
+with +\--accumulo-column-family+.
+
+NOTE: This function is incompatible with direct import (parameter
++\--direct+), and cannot be used in the same operation as an HBase import.
+
+If the target table does not exist, the Sqoop job will
+exit with an error, unless the +--accumulo-create-table+ parameter is
+specified. Otherwise, you should create the target table before running
+an import.
+
+Sqoop currently serializes all values to Accumulo by converting each field
+to its string representation (as if you were importing to HDFS in text
+mode), and then inserts the UTF-8 bytes of this string in the target
+cell.
+
+By default, no visibility is applied to the resulting cells in Accumulo,
+so the data will be visible to any Accumulo user. Use the
++\--accumulo-visibility+ parameter to specify a visibility token to
+apply to all rows in the import job.
+
+For performance tuning, use the optional +\--accumulo-buffer-size\+ and
++\--accumulo-max-latency+ parameters. See Accumulo's documentation for
+an explanation of the effects of these parameters.
+
+In order to connect to an Accumulo instance, you must specify the location
+of a Zookeeper ensemble using the +\--accumulo-zookeepers+ parameter,
+the name of the Accumulo instance (+\--accumulo-instance+), and the
+username and password to connect with (+\--accumulo-user+ and
++\--accumulo-password+ respectively).
+
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index dfc9b392..0db6d977 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -554,6 +554,9 @@ include::hive.txt[]
include::hbase-args.txt[]
include::hbase.txt[]
+include::accumulo-args.txt[]
+include::accumulo.txt[]
+
include::codegen-args.txt[]
As mentioned earlier, a byproduct of importing a table to HDFS is a
diff --git a/src/docs/user/validation.txt b/src/docs/user/validation.txt
index 282cfd6b..27a78e22 100644
--- a/src/docs/user/validation.txt
+++ b/src/docs/user/validation.txt
@@ -101,7 +101,7 @@ The following are the limitations in the current implementation:
* all-tables option
* free-form query option
-* Data imported into Hive or HBase
+* Data imported into Hive, HBase or Accumulo
* table import with --where argument
* incremental imports
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 5c7a56a0..19546cb5 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -39,6 +39,7 @@
import com.cloudera.sqoop.tool.SqoopTool;
import com.cloudera.sqoop.util.RandomHash;
import com.cloudera.sqoop.util.StoredAsProperty;
+import org.apache.sqoop.accumulo.AccumuloConstants;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.validation.AbortOnFailureHandler;
@@ -257,6 +258,46 @@ public String toString() {
// "key" column for the merge operation.
@StoredAsProperty("merge.key.col") private String mergeKeyCol;
+ // Accumulo home directory
+ private String accumuloHome; // not serialized to metastore.
+ // Zookeeper home directory
+ private String zookeeperHome; // not serialized to metastore.
+
+ // Accumulo table to import into.
+ @StoredAsProperty("accumulo.table") private String accumuloTable;
+
+ // Column family to prepend to inserted cols.
+ @StoredAsProperty("accumulo.col.family") private String accumuloColFamily;
+
+ // Column of the input to use as the row key.
+ @StoredAsProperty("accumulo.row.key.col") private String accumuloRowKeyCol;
+ //
+ // Visibility token to be applied to each row imported.
+ @StoredAsProperty("accumulo.visibility") private String accumuloVisibility;
+
+ // Size of the write buffer.
+ @StoredAsProperty("accumulo.batch.size")
+ private long accumuloBatchSize;
+
+ // Maximum latency for batch writer.
+ @StoredAsProperty("accumulo.max.latency")
+ private long accumuloMaxLatency;
+
+ // if true, create table.
+ @StoredAsProperty("accumulo.create.table")
+ private boolean accumuloCreateTable;
+
+ // Accumulo user name
+ @StoredAsProperty("accumulo.user") private String accumuloUser;
+
+ // Accumulo password
+ @StoredAsProperty("accumulo.password") private String accumuloPassword;
+
+ // Accumulo instance
+ @StoredAsProperty("accumulo.instance") private String accumuloInstance;
+
+ // Accumulo zookeeper
+ @StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers;
// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved job, these will
@@ -846,6 +887,8 @@ private void initDefaults(Configuration baseConfiguration) {
// default action is to run the full pipeline.
this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
+ this.accumuloHome = getAccumuloHomeDefault();
+ this.zookeeperHome = getZookeeperHomeDefault();
this.hiveHome = getHiveHomeDefault();
this.hCatHome = getHCatHomeDefault();
@@ -902,6 +945,11 @@ private void initDefaults(Configuration baseConfiguration) {
this.mapColumnHive = new Properties();
this.mapColumnJava = new Properties();
+ // Set Accumulo batch size defaults, since 0 is not the same
+ // as "not set"
+ this.accumuloBatchSize = AccumuloConstants.DEFAULT_BATCH_SIZE;
+ this.accumuloMaxLatency = AccumuloConstants.DEFAULT_LATENCY;
+
// We do not want to be verbose too much if not explicitly needed
this.verbose = false;
this.isValidationEnabled = false; // validation is disabled by default
@@ -2100,6 +2148,195 @@ public String getMergeKeyCol() {
return this.mergeKeyCol;
}
+ public static String getAccumuloHomeDefault() {
+ // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
+ String accumuloHome = System.getenv("ACCUMULO_HOME");
+ accumuloHome = System.getProperty("accumulo.home", accumuloHome);
+ return accumuloHome;
+ }
+
+ public static String getZookeeperHomeDefault() {
+ // Set this with $ZOOKEEPER_HOME, but -Dzookeeper.home can override.
+ String zookeeperHome = System.getenv("ZOOKEEPER_HOME");
+ zookeeperHome = System.getProperty("zookeeper.home", zookeeperHome);
+ return zookeeperHome;
+ }
+
+ public String getAccumuloHome() {
+ return accumuloHome;
+ }
+
+ public void setAccumuloHome(String home) {
+ this.accumuloHome = home;
+ }
+
+ public String getZookeeperHome() {
+ return zookeeperHome;
+ }
+
+ public void setZookeeperHome(String home) {
+ this.zookeeperHome = home;
+ }
+
+ /**
+ * Set whether we should create missing Accumulo tables.
+ */
+ public void setCreateAccumuloTable(boolean create) {
+ this.accumuloCreateTable = create;
+ }
+
+ /**
+ * Returns true if we should create Accumulo tables/column families
+ * that are missing.
+ */
+ public boolean getCreateAccumuloTable() {
+ return this.accumuloCreateTable;
+ }
+
+ /**
+ * Sets the Accumulo batch size (in bytes).
+ */
+ public void setAccumuloBatchSize(long batchSize) {
+ this.accumuloBatchSize = batchSize;
+ }
+
+ /**
+ * Gets the Accumulo batch size (in bytes).
+ */
+ public long getAccumuloBatchSize() {
+ return this.accumuloBatchSize;
+ }
+
+ /**
+ * Sets the Accumulo target column family.
+ */
+ public void setAccumuloColFamily(String colFamily) {
+ this.accumuloColFamily = colFamily;
+ }
+
+ /**
+ * Gets the Accumulo import target column family.
+ */
+ public String getAccumuloColFamily() {
+ return this.accumuloColFamily;
+ }
+
+ /**
+ * Sets the Accumulo max latency.
+ */
+ public void setAccumuloMaxLatency(long maxLatency) {
+ this.accumuloMaxLatency = maxLatency;
+ }
+
+ /**
+ * Gets the Accumulo max latency.
+ */
+ public long getAccumuloMaxLatency() {
+ return this.accumuloMaxLatency;
+ }
+
+ /**
+ * Gets the column to use as the row id in an Accumulo import.
+ * If null, use the primary key column.
+ */
+ public String getAccumuloRowKeyColumn() {
+ return this.accumuloRowKeyCol;
+ }
+
+ /**
+ * Sets the column to use as the row id in an Accumulo import.
+ *
+ */
+ public void setAccumuloRowKeyColumn(String col) {
+ this.accumuloRowKeyCol = col;
+ }
+
+ /**
+ * Gets the visibility token to use.
+ * If null, don't assign a visibility.
+ */
+ public String getAccumuloVisibility() {
+ return this.accumuloVisibility;
+ }
+
+ /**
+ * Sets the visibility token to use.
+ *
+ */
+ public void setAccumuloVisibility(String vis) {
+ this.accumuloVisibility = vis;
+ }
+
+ /**
+ * Gets the target Accumulo table name, if any.
+ */
+ public String getAccumuloTable() {
+ return this.accumuloTable;
+ }
+
+ /**
+ * Sets the target Accumulo table name.
+ */
+ public void setAccumuloTable(String table) {
+ this.accumuloTable = table;
+ }
+
+ /**
+ * Gets the target Accumulo user name, if any.
+ */
+ public String getAccumuloUser() {
+ return this.accumuloUser;
+ }
+
+ /**
+ * Sets the target Accumulo user name for an import.
+ */
+ public void setAccumuloUser(String user) {
+ this.accumuloUser = user;
+ }
+
+ /**
+ * Gets the target Accumulo password, if any.
+ */
+ public String getAccumuloPassword() {
+ return this.accumuloPassword;
+ }
+
+ /**
+ * Sets the target Accumulo password for an import.
+ */
+ public void setAccumuloPassword(String passwd) {
+ this.accumuloPassword = passwd;
+ }
+
+ /**
+ * Gets the target Accumulo instance, if any.
+ */
+ public String getAccumuloInstance() {
+ return this.accumuloInstance;
+ }
+
+ /**
+ * Sets the target Accumulo instance for an import.
+ */
+ public void setAccumuloInstance(String instance) {
+ this.accumuloInstance = instance;
+ }
+
+ /**
+ * Gets the target Accumulo zookeeper instance, if any.
+ */
+ public String getAccumuloZookeepers() {
+ return this.accumuloZookeepers;
+ }
+
+ /**
+ ** Sets the target Accumulo zookeeper instance for an import.
+ **/
+ public void setAccumuloZookeepers(String zookeepers) {
+ this.accumuloZookeepers = zookeepers;
+ }
+
public void setConnManagerClassName(String connManagerClass) {
this.connManagerClassName = connManagerClass;
}
diff --git a/src/java/org/apache/sqoop/accumulo/AccumuloConstants.java b/src/java/org/apache/sqoop/accumulo/AccumuloConstants.java
new file mode 100644
index 00000000..0511a103
--- /dev/null
+++ b/src/java/org/apache/sqoop/accumulo/AccumuloConstants.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.accumulo;
+
+/**
+ * This class provides constants used to define properties relating to
+ * Accumulo imports.
+ */
+public final class AccumuloConstants {
+ // Some defaults to use if these aren't specified.
+ // Default buffer size for BatchWriter
+ public static final long DEFAULT_BATCH_SIZE = 10240000L;
+ // Default latency for BatchWriter
+ public static final long DEFAULT_LATENCY = 5000L;
+
+ /** Configuration key specifying the table to insert into. */
+ public static final String TABLE_NAME_KEY = "sqoop.accumulo.insert.table";
+
+ /** Configuration key specifying the column family to insert into. */
+ public static final String COL_FAMILY_KEY =
+ "sqoop.accumulo.insert.column.family";
+
+ /** Configuration key specifying the column of the input whose value
+ * should be used as the row id.
+ */
+ public static final String ROW_KEY_COLUMN_KEY =
+ "sqoop.accumulo.insert.row.key.column";
+
+ /** Configuration key specifying the column of the input whose value
+ * should be used as the row id.
+ */
+ public static final String VISIBILITY_KEY =
+ "sqoop.accumulo.insert.visibility";
+ /**
+ * Configuration key specifying the Transformer implementation to use.
+ */
+ public static final String TRANSFORMER_CLASS_KEY =
+ "sqoop.accumulo.insert.put.transformer.class";
+
+ public static final String MAX_LATENCY =
+ "sqoop.accumulo.max.latency";
+
+ public static final String BATCH_SIZE =
+ "sqoop.accumulo.batch.size";
+
+ public static final String ZOOKEEPERS =
+ "sqoop.accumulo.zookeeper.hostnames";
+
+ public static final String ACCUMULO_INSTANCE =
+ "sqoop.accumulo.instance.name";
+
+ public static final String ACCUMULO_USER_NAME =
+ "sqoop.accumulo.user.name";
+
+ public static final String ACCUMULO_PASSWORD =
+ "sqoop.accumulo.password";
+
+ public static final String ACCUMULO_SITE_XML_PATH =
+ "/conf/accumulo-site.xml";
+
+ // Prevent instantiation.
+ private AccumuloConstants(){
+ }
+}
diff --git a/src/java/org/apache/sqoop/accumulo/AccumuloMutationProcessor.java b/src/java/org/apache/sqoop/accumulo/AccumuloMutationProcessor.java
new file mode 100644
index 00000000..123688cd
--- /dev/null
+++ b/src/java/org/apache/sqoop/accumulo/AccumuloMutationProcessor.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * SqoopRecordProcessor that performs an Accumulo mutation operation
+ * that contains all the fields of the record.
+ */
+public class AccumuloMutationProcessor implements Closeable, Configurable,
+ FieldMapProcessor {
+
+ private Configuration conf;
+
+ // An object that can transform a map of fieldName->object
+ // into a Mutation.
+ private MutationTransformer mutationTransformer;
+
+ private String tableName;
+ private BatchWriter table;
+
+ public AccumuloMutationProcessor() {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setConf(Configuration config) {
+ this.conf = config;
+
+ // Get the implementation of MutationTransformer to use.
+ // By default, we call toString() on every non-null field.
+ Class extends MutationTransformer> xformerClass =
+ (Class extends MutationTransformer>)
+ this.conf.getClass(AccumuloConstants.TRANSFORMER_CLASS_KEY,
+ ToStringMutationTransformer.class);
+ this.mutationTransformer = (MutationTransformer)
+ ReflectionUtils.newInstance(xformerClass, this.conf);
+ if (null == mutationTransformer) {
+ throw new RuntimeException("Could not instantiate MutationTransformer.");
+ }
+
+ String colFam = conf.get(AccumuloConstants.COL_FAMILY_KEY, null);
+ if (null == colFam) {
+ throw new RuntimeException("Accumulo column family not set.");
+ }
+ this.mutationTransformer.setColumnFamily(colFam);
+
+ String rowKey = conf.get(AccumuloConstants.ROW_KEY_COLUMN_KEY, null);
+ if (null == rowKey) {
+ throw new RuntimeException("Row key column not set.");
+ }
+ this.mutationTransformer.setRowKeyColumn(rowKey);
+
+ String vis = conf.get(AccumuloConstants.VISIBILITY_KEY, null);
+ this.mutationTransformer.setVisibility(vis);
+
+ this.tableName = conf.get(AccumuloConstants.TABLE_NAME_KEY, null);
+ String zookeeper = conf.get(AccumuloConstants.ZOOKEEPERS);
+ String instance = conf.get(AccumuloConstants.ACCUMULO_INSTANCE);
+
+ Instance inst = new ZooKeeperInstance(instance, zookeeper);
+ String username = conf.get(AccumuloConstants.ACCUMULO_USER_NAME);
+ String pw = conf.get(AccumuloConstants.ACCUMULO_PASSWORD);
+ if (null == pw) {
+ pw = "";
+ }
+ byte[] password = pw.getBytes();
+
+ BatchWriterConfig bwc = new BatchWriterConfig();
+
+ long bs = conf.getLong(AccumuloConstants.BATCH_SIZE,
+ AccumuloConstants.DEFAULT_BATCH_SIZE);
+ bwc.setMaxMemory(bs);
+
+ long la = conf.getLong(AccumuloConstants.MAX_LATENCY,
+ AccumuloConstants.DEFAULT_LATENCY);
+ bwc.setMaxLatency(la, TimeUnit.MILLISECONDS);
+
+ try {
+ Connector conn = inst.getConnector(username, new PasswordToken(password));
+
+ this.table = conn.createBatchWriter(tableName, bwc);
+ } catch (AccumuloException ex) {
+ throw new RuntimeException("Error accessing Accumulo", ex);
+ } catch (AccumuloSecurityException aex){
+ throw new RuntimeException("Security exception accessing Accumulo", aex);
+ } catch(TableNotFoundException tex){
+ throw new RuntimeException("Accumulo table " + tableName
+ + " not found", tex);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ /**
+ * Processes a record by extracting its field map and converting
+ * it into a list of Mutations into Accumulo.
+ */
+ public void accept(FieldMappable record)
+ throws IOException, ProcessingException {
+ Map fields = record.getFieldMap();
+
+ Iterable putList = mutationTransformer.getMutations(fields);
+ if (null != putList) {
+ for (Mutation m : putList) {
+ try {
+ this.table.addMutation(m);
+ } catch (MutationsRejectedException ex) {
+ throw new IOException("Mutation rejected" , ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ /**
+ * Closes the Accumulo table and commits all pending operations.
+ */
+ public void close() throws IOException {
+ try {
+ this.table.close();
+ } catch (MutationsRejectedException ex) {
+ throw new IOException("Mutations rejected", ex);
+ }
+ }
+}
diff --git a/src/java/org/apache/sqoop/accumulo/AccumuloUtil.java b/src/java/org/apache/sqoop/accumulo/AccumuloUtil.java
new file mode 100644
index 00000000..1cbb8594
--- /dev/null
+++ b/src/java/org/apache/sqoop/accumulo/AccumuloUtil.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.accumulo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.config.ConfigurationConstants;
+
+/**
+ * This class provides a method that checks if Accumulo jars are present in the
+ * current classpath. It also provides a setAlwaysNoAccumuloJarMode mechanism
+ * for testing and simulation the condition where the is on Accumulo jar (since
+ * Accumulo is pulled automatically by ivy)
+ */
+public final class AccumuloUtil {
+ private static final Log LOG = LogFactory.getLog(AccumuloUtil.class);
+ private static boolean testingMode = false;
+
+ private static final String INSTANCE_CLASS
+ = "org.apache.accumulo.core.client.Instance";
+
+ // Prevent instantiation
+ private AccumuloUtil() {
+ }
+
+ /**
+ * This is a way to make this always return false for testing.
+ */
+ public static void setAlwaysNoAccumuloJarMode(boolean mode) {
+ testingMode = mode;
+ }
+
+ public static boolean isAccumuloJarPresent() {
+ if (testingMode) {
+ return false;
+ }
+ try {
+ Class.forName(INSTANCE_CLASS);
+ } catch (ClassNotFoundException cnfe) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Add the Accumulo jar files to local classpath and dist cache.
+ * @throws IOException
+ */
+ public static void addJars(Job job, SqoopOptions options) throws IOException {
+
+ if (isLocalJobTracker(job)) {
+ LOG.info("Not adding Accumulo jars to distributed cache in local mode");
+ } else if (options.isSkipDistCache()) {
+ LOG.info("Not adding Accumulo jars to distributed cache as requested");
+ } else {
+ Configuration conf = job.getConfiguration();
+ String accumuloHome = null;
+ String zookeeperHome = null;
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (options != null) {
+ accumuloHome = options.getAccumuloHome();
+ }
+ if (accumuloHome == null) {
+ accumuloHome = SqoopOptions.getAccumuloHomeDefault();
+ }
+ LOG.info("Accumulo job : Accumulo Home = " + accumuloHome);
+ if (options != null) {
+ zookeeperHome = options.getZookeeperHome();
+ }
+ if (zookeeperHome == null) {
+ zookeeperHome = SqoopOptions.getZookeeperHomeDefault();
+ }
+ LOG.info("Accumulo job : Zookeeper Home = " + zookeeperHome);
+
+ conf.addResource(accumuloHome + AccumuloConstants.ACCUMULO_SITE_XML_PATH);
+
+ // Add any libjars already specified
+ Set localUrls = new HashSet();
+ localUrls
+ .addAll(conf.getStringCollection(
+ ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
+
+ String dir = accumuloHome + File.separator + "lib";
+ LOG.info("Adding jar files under " + dir + " to distributed cache");
+ addDirToCache(new File(dir), fs, localUrls, false);
+
+ dir = zookeeperHome;
+ LOG.info("Adding jar files under " + dir + " to distributed cache");
+ addDirToCache(new File(dir), fs, localUrls, false);
+
+ String tmpjars = conf
+ .get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
+ StringBuilder sb = new StringBuilder(1024);
+ if (null != tmpjars) {
+ sb.append(tmpjars);
+ sb.append(",");
+ }
+ sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
+ conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM,
+ sb.toString());
+ }
+ }
+
+ /**
+ * Add the .jar elements of a directory to the DCache classpath, optionally
+ * recursively.
+ */
+ private static void addDirToCache(File dir, FileSystem fs,
+ Set localUrls, boolean recursive) {
+ if (dir != null) {
+ File[] fileList = dir.listFiles();
+
+ if (fileList != null) {
+ for (File libFile : dir.listFiles()) {
+ if (libFile.exists() && !libFile.isDirectory()
+ && libFile.getName().endsWith("jar")) {
+ Path p = new Path(libFile.toString());
+ if (libFile.canRead()) {
+ String qualified = p.makeQualified(fs).toString();
+ LOG.info("Adding to job classpath: " + qualified);
+ localUrls.add(qualified);
+ } else {
+ LOG.warn("Ignoring unreadable file " + libFile);
+ }
+ }
+ if (recursive && libFile.isDirectory()) {
+ addDirToCache(libFile, fs, localUrls, recursive);
+ }
+ }
+ } else {
+ LOG.warn("No files under " + dir
+ + " to add to distributed cache for Accumulo job");
+ }
+ }
+ }
+
+ /**
+ * Check to see if the job is running in local mode.
+ */
+ public static boolean isLocalJobTracker(Job job) {
+ boolean retval = false;
+ Configuration conf = job.getConfiguration();
+ // If framework is set to YARN, then we can't be running in local mode
+ if ("yarn".equalsIgnoreCase(conf
+ .get(ConfigurationConstants.PROP_MAPREDUCE_FRAMEWORK_NAME))) {
+ retval = false;
+ } else {
+ String jtAddr = conf
+ .get(ConfigurationConstants.PROP_MAPRED_JOB_TRACKER_ADDRESS);
+ String jtAddr2 = conf
+ .get(ConfigurationConstants.PROP_MAPREDUCE_JOB_TRACKER_ADDRESS);
+ retval = (jtAddr != null && jtAddr.equals("local"))
+ || (jtAddr2 != null && jtAddr2.equals("local"));
+ }
+ return retval;
+ }
+}
diff --git a/src/java/org/apache/sqoop/accumulo/MutationTransformer.java b/src/java/org/apache/sqoop/accumulo/MutationTransformer.java
new file mode 100644
index 00000000..106c9497
--- /dev/null
+++ b/src/java/org/apache/sqoop/accumulo/MutationTransformer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.accumulo.core.data.Mutation;
+
+/**
+ * Abstract class that takes a map of jdbc field names to values
+ * and converts them to Mutations for Accumulo.
+ */
+public abstract class MutationTransformer {
+
+ private String columnFamily;
+ private String rowKeyColumn;
+ private String visibility;
+
+ /**
+ * @return the default column family to insert into.
+ */
+ public String getColumnFamily() {
+ return this.columnFamily;
+ }
+
+ /**
+ * Set the default column family to insert into.
+ */
+ public void setColumnFamily(String colFamily) {
+ this.columnFamily = colFamily;
+ }
+
+ /**
+ * @return the field name identifying the value to use as the row id.
+ */
+ public String getRowKeyColumn() {
+ return this.rowKeyColumn;
+ }
+
+ /**
+ * Set the column of the input fields which should be used to calculate
+ * the row id.
+ */
+ public void setRowKeyColumn(String rowKeyCol) {
+ this.rowKeyColumn = rowKeyCol;
+ }
+
+ /**
+ * @return the field name identifying the visibility token.
+ */
+ public String getVisibility() {
+ return this.visibility;
+ }
+
+ /**
+ * Set the visibility token to set for each cell.
+ */
+ public void setVisibility(String vis) {
+ this.visibility = vis;
+ }
+
+ /**
+ * Returns a list of Mutations that inserts the fields into a row in Accumulo.
+ * @param fields a map of field names to values to insert.
+ * @return A list of Mutations that inserts these into Accumulo.
+ */
+ public abstract Iterable getMutations(Map fields)
+ throws IOException;
+}
diff --git a/src/java/org/apache/sqoop/accumulo/ToStringMutationTransformer.java b/src/java/org/apache/sqoop/accumulo/ToStringMutationTransformer.java
new file mode 100644
index 00000000..c300855c
--- /dev/null
+++ b/src/java/org/apache/sqoop/accumulo/ToStringMutationTransformer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.accumulo;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+
+/**
+ * PutTransformer that calls toString on all non-null fields.
+ */
+public class ToStringMutationTransformer extends MutationTransformer {
+
+ public static final Log LOG = LogFactory.getLog(
+ ToStringMutationTransformer.class.getName());
+
+ public ToStringMutationTransformer() {
+ }
+
+ /**
+ * Return the serialized bytes for a field name, using
+ * the cache if it's already in there.
+ */
+ @Override
+ public Iterable getMutations(Map fields)
+ throws IOException {
+ String rowKeyCol = getRowKeyColumn();
+ String colFamily = getColumnFamily();
+ Object rowKey = fields.get(rowKeyCol);
+ String vis = getVisibility();
+ if (null == rowKey) {
+ // If the row-key column is null, we don't insert this row.
+ LOG.warn("Could not insert row with null value for row-key column: "
+ + rowKeyCol);
+ return null;
+ }
+ ColumnVisibility colVis = null;
+ if (null != vis && vis.length() > 0) {
+ colVis = new ColumnVisibility(vis);
+ }
+ Mutation mut = new Mutation(rowKey.toString());
+ for (Map.Entry fieldEntry : fields.entrySet()) {
+ String colName = fieldEntry.getKey();
+ if (!colName.equals(rowKeyCol)) {
+ // This is a regular field, not the row key.
+ // Add it if it's not null.
+ Object val = fieldEntry.getValue();
+ if (null != val) {
+ if (null == colVis) {
+ mut.put(new Text(colFamily), new Text(colName),
+ new Value(val.toString().getBytes("UTF8")));
+ } else {
+ mut.put(new Text(colFamily), new Text(colName),
+ colVis, new Value(val.toString().getBytes("UTF8")));
+ }
+ }
+ }
+ }
+ return Collections.singletonList(mut);
+ }
+}
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 1ffa40f4..82482636 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -40,6 +40,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.accumulo.AccumuloUtil;
+import org.apache.sqoop.mapreduce.AccumuloImportJob;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.LoggingUtils;
@@ -594,6 +596,13 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
} else {
importer = new HBaseBulkImportJob(opts, context);
}
+ } else if (opts.getAccumuloTable() != null) {
+ // Import to Accumulo.
+ if (!AccumuloUtil.isAccumuloJarPresent()) {
+ throw new ImportException("Accumulo jars are not present in "
+ + "classpath, cannot import to Accumulo!");
+ }
+ importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
@@ -630,6 +639,13 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
} else {
importer = new HBaseBulkImportJob(opts, context);
}
+ } else if (opts.getAccumuloTable() != null) {
+ // Import to Accumulo.
+ if (!AccumuloUtil.isAccumuloJarPresent()) {
+ throw new ImportException("Accumulo jars are not present in classpath,"
+ + " cannot import to Accumulo!");
+ }
+ importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
diff --git a/src/java/org/apache/sqoop/mapreduce/AccumuloImportJob.java b/src/java/org/apache/sqoop/mapreduce/AccumuloImportJob.java
new file mode 100644
index 00000000..cb2145f5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/AccumuloImportJob.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.accumulo.AccumuloConstants;
+import org.apache.sqoop.accumulo.AccumuloMutationProcessor;
+import org.apache.sqoop.accumulo.AccumuloUtil;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Runs an Accumulo import via DataDrivenDBInputFormat to the
+ * AccumuloMutationProcessor in the DelegatingOutputFormat.
+ */
+public class AccumuloImportJob extends DataDrivenImportJob {
+
+ public static final Log LOG
+ = LogFactory.getLog(AccumuloImportJob.class.getName());
+ protected static SqoopOptions opts;
+
+ public AccumuloImportJob(final SqoopOptions opts,
+ final ImportJobContext importContext) {
+ super(opts, importContext.getInputFormat(), importContext);
+ this.opts = opts;
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws IOException {
+ job.setOutputKeyClass(SqoopRecord.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setMapperClass(getMapperClass());
+ }
+
+ @Override
+ protected Class extends Mapper> getMapperClass() {
+ return AccumuloImportMapper.class;
+ }
+
+ @Override
+ protected Class extends OutputFormat> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return DelegatingOutputFormat.class;
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws ClassNotFoundException, IOException {
+
+ // Use the DelegatingOutputFormat with the AccumuloMutationProcessor.
+ job.setOutputFormatClass(getOutputFormatClass());
+
+ Configuration conf = job.getConfiguration();
+ conf.setClass("sqoop.output.delegate.field.map.processor.class",
+ AccumuloMutationProcessor.class, FieldMapProcessor.class);
+
+ // Set the Accumulo parameters (table, column family, row key):
+ conf.set(AccumuloConstants.ZOOKEEPERS,
+ options.getAccumuloZookeepers());
+ conf.set(AccumuloConstants.ACCUMULO_INSTANCE,
+ options.getAccumuloInstance());
+ conf.set(AccumuloConstants.ACCUMULO_USER_NAME,
+ options.getAccumuloUser());
+ String pw = options.getAccumuloPassword();
+ if (null == pw) {
+ pw = "";
+ }
+ conf.set(AccumuloConstants.ACCUMULO_PASSWORD, pw);
+ conf.set(AccumuloConstants.TABLE_NAME_KEY,
+ options.getAccumuloTable());
+ conf.set(AccumuloConstants.COL_FAMILY_KEY,
+ options.getAccumuloColFamily());
+ conf.setLong(AccumuloConstants.BATCH_SIZE,
+ options.getAccumuloBatchSize());
+ conf.setLong(AccumuloConstants.MAX_LATENCY,
+ options.getAccumuloMaxLatency());
+
+ // What column of the input becomes the row key?
+ String rowKeyCol = options.getAccumuloRowKeyColumn();
+ if (null == rowKeyCol) {
+ // User didn't explicitly set one. If there's a split-by column set,
+ // use that.
+ rowKeyCol = options.getSplitByCol();
+ }
+
+ if (null == rowKeyCol) {
+ // No split-by column is explicitly set.
+ // If the table has a primary key, use that.
+ ConnManager manager = getContext().getConnManager();
+ rowKeyCol = manager.getPrimaryKey(tableName);
+ }
+
+ if (null == rowKeyCol) {
+ // Give up here if this is still unset.
+ throw new IOException(
+ "Could not determine the row-key column. "
+ + "Use --accumulo-row-key to specify the input column that "
+ + "names each row.");
+ }
+
+ conf.set(AccumuloConstants.ROW_KEY_COLUMN_KEY, rowKeyCol);
+ }
+
+ @Override
+ /** Create the target Accumulo table before running the job, if appropriate.*/
+ protected void jobSetup(Job job) throws IOException, ImportException {
+ Configuration conf = job.getConfiguration();
+ String tableName = conf.get(AccumuloConstants.TABLE_NAME_KEY);
+ String familyName = conf.get(AccumuloConstants.COL_FAMILY_KEY);
+ String zookeepers = conf.get(AccumuloConstants.ZOOKEEPERS);
+ String instance = conf.get(AccumuloConstants.ACCUMULO_INSTANCE);
+ String user = conf.get(AccumuloConstants.ACCUMULO_USER_NAME);
+
+ if (null == tableName) {
+ throw new ImportException(
+ "Import to Accumulo error: Table name not specified");
+ }
+
+ if (null == familyName) {
+ throw new ImportException(
+ "Import to Accumulo error: Column family not specified");
+ }
+
+ try {
+ // Set up the libjars
+ AccumuloUtil.addJars(job, opts);
+
+ Instance inst = new ZooKeeperInstance(instance, zookeepers);
+ String password = conf.get(AccumuloConstants.ACCUMULO_PASSWORD);
+ Connector conn = inst.getConnector(user, new PasswordToken(password));
+ if (!conn.tableOperations().exists(tableName)) {
+ if (options.getCreateAccumuloTable()) {
+ LOG.info("Table " + tableName + " doesn't exist, creating.");
+ try {
+ conn.tableOperations().create(tableName);
+ } catch (TableExistsException e) {
+ // Should only happen if the table was created
+ // by another process between the existence check
+ // and the create command
+ LOG.info("Table " + tableName + " created by another process.");
+ }
+ } else {
+ throw new ImportException(
+ "Table "
+ + tableName
+ + " does not exist, and --accumulo-create-table "
+ + "not specified.");
+ }
+ }
+ } catch (AccumuloException e) {
+ throw new ImportException(e);
+ } catch (AccumuloSecurityException e) {
+ throw new ImportException(e);
+ }
+ super.jobSetup(job);
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/AccumuloImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AccumuloImportMapper.java
new file mode 100644
index 00000000..e196099d
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/AccumuloImportMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by writing them to Accumulo via the DelegatingOutputFormat
+ * and the AccumuloMutationProcessor.
+ */
+public class AccumuloImportMapper
+ extends AutoProgressMapper
+ {
+
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ context.write(val, NullWritable.get());
+ }
+}
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 018d11fa..6d6f1ea5 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -180,6 +180,20 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
"hbase-bulkload";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
+ //Accumulo arguments.
+ public static final String ACCUMULO_TABLE_ARG = "accumulo-table";
+ public static final String ACCUMULO_COL_FAM_ARG = "accumulo-column-family";
+ public static final String ACCUMULO_ROW_KEY_ARG = "accumulo-row-key";
+ public static final String ACCUMULO_VISIBILITY_ARG = "accumulo-visibility";
+ public static final String ACCUMULO_CREATE_TABLE_ARG
+ = "accumulo-create-table";
+ public static final String ACCUMULO_BATCH_SIZE_ARG = "accumulo-batch-size";
+ public static final String ACCUMULO_MAX_LATENCY_ARG = "accumulo-max-latency";
+ public static final String ACCUMULO_ZOOKEEPERS_ARG = "accumulo-zookeepers";
+ public static final String ACCUMULO_INSTANCE_ARG = "accumulo-instance";
+ public static final String ACCUMULO_USER_ARG = "accumulo-user";
+ public static final String ACCUMULO_PASSWORD_ARG = "accumulo-password";
+
// Arguments for the saved job management system.
public static final String STORAGE_METASTORE_ARG = "meta-connect";
@@ -728,6 +742,116 @@ protected RelatedOptions getHBaseOptions() {
return hbaseOpts;
}
+ protected RelatedOptions getAccumuloOptions() {
+ RelatedOptions accumuloOpts =
+ new RelatedOptions("Accumulo arguments");
+ accumuloOpts.addOption(OptionBuilder.withArgName("table")
+ .hasArg()
+ .withDescription("Import to in Accumulo")
+ .withLongOpt(ACCUMULO_TABLE_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("family")
+ .hasArg()
+ .withDescription("Sets the target column family for the import")
+ .withLongOpt(ACCUMULO_COL_FAM_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("col")
+ .hasArg()
+ .withDescription("Specifies which input column to use as the row key")
+ .withLongOpt(ACCUMULO_ROW_KEY_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("vis")
+ .hasArg()
+ .withDescription("Visibility token to be applied to all rows imported")
+ .withLongOpt(ACCUMULO_VISIBILITY_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder
+ .withDescription("If specified, create missing Accumulo tables")
+ .withLongOpt(ACCUMULO_CREATE_TABLE_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("size")
+ .hasArg()
+ .withDescription("Batch size in bytes")
+ .withLongOpt(ACCUMULO_BATCH_SIZE_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("latency")
+ .hasArg()
+ .withDescription("Max write latency in milliseconds")
+ .withLongOpt(ACCUMULO_MAX_LATENCY_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("zookeepers")
+ .hasArg()
+ .withDescription("Comma-separated list of zookeepers (host:port)")
+ .withLongOpt(ACCUMULO_ZOOKEEPERS_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("instance")
+ .hasArg()
+ .withDescription("Accumulo instance name.")
+ .withLongOpt(ACCUMULO_INSTANCE_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("user")
+ .hasArg()
+ .withDescription("Accumulo user name.")
+ .withLongOpt(ACCUMULO_USER_ARG)
+ .create());
+ accumuloOpts.addOption(OptionBuilder.withArgName("password")
+ .hasArg()
+ .withDescription("Accumulo password.")
+ .withLongOpt(ACCUMULO_PASSWORD_ARG)
+ .create());
+
+ return accumuloOpts;
+ }
+
+ protected void applyAccumuloOptions(CommandLine in, SqoopOptions out) {
+ if (in.hasOption(ACCUMULO_TABLE_ARG)) {
+ out.setAccumuloTable(in.getOptionValue(ACCUMULO_TABLE_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_COL_FAM_ARG)) {
+ out.setAccumuloColFamily(in.getOptionValue(ACCUMULO_COL_FAM_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_ROW_KEY_ARG)) {
+ out.setAccumuloRowKeyColumn(in.getOptionValue(ACCUMULO_ROW_KEY_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_VISIBILITY_ARG)) {
+ out.setAccumuloVisibility(in.getOptionValue(ACCUMULO_VISIBILITY_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_CREATE_TABLE_ARG)) {
+ out.setCreateAccumuloTable(true);
+ }
+
+ if (in.hasOption(ACCUMULO_BATCH_SIZE_ARG)) {
+ out.setAccumuloBatchSize(Long.parseLong(
+ in.getOptionValue(ACCUMULO_BATCH_SIZE_ARG)));
+ }
+
+ if (in.hasOption(ACCUMULO_MAX_LATENCY_ARG)) {
+ out.setAccumuloMaxLatency(Long.parseLong(
+ in.getOptionValue(ACCUMULO_MAX_LATENCY_ARG)));
+ }
+
+ if (in.hasOption(ACCUMULO_ZOOKEEPERS_ARG)) {
+ out.setAccumuloZookeepers(in.getOptionValue(ACCUMULO_ZOOKEEPERS_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_INSTANCE_ARG)) {
+ out.setAccumuloInstance(in.getOptionValue(ACCUMULO_INSTANCE_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_USER_ARG)) {
+ out.setAccumuloUser(in.getOptionValue(ACCUMULO_USER_ARG));
+ }
+
+ if (in.hasOption(ACCUMULO_PASSWORD_ARG)) {
+ out.setAccumuloPassword(in.getOptionValue(ACCUMULO_PASSWORD_ARG));
+ }
+ }
+
+
@SuppressWarnings("static-access")
protected void addValidationOpts(RelatedOptions validationOptions) {
validationOptions.addOption(OptionBuilder
@@ -1256,6 +1380,52 @@ protected void validateHiveOptions(SqoopOptions options)
}
}
+ protected void validateAccumuloOptions(SqoopOptions options)
+ throws InvalidOptionsException {
+ if ((options.getAccumuloColFamily() != null
+ && options.getAccumuloTable() == null)
+ || (options.getAccumuloColFamily() == null
+ && options.getAccumuloTable() != null)) {
+ throw new InvalidOptionsException(
+ "Both --accumulo-table and --accumulo-column-family must be set."
+ + HELP_STR);
+ }
+ if (options.getAccumuloTable() != null && options.isDirect()) {
+ throw new InvalidOptionsException("Direct import is incompatible with "
+ + "Accumulo. Please remove parameter --direct");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getHBaseTable() != null) {
+ throw new InvalidOptionsException("HBase import is incompatible with "
+ + "Accumulo import.");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+ throw new InvalidOptionsException("Accumulo import is not compatible "
+ + "with importing into file format.");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getHBaseColFamily() != null) {
+ throw new InvalidOptionsException("Use --accumulo-column-family with "
+ + "Accumulo import.");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getAccumuloUser() == null) {
+ throw
+ new InvalidOptionsException("Must specify Accumulo user.");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getAccumuloInstance() == null) {
+ throw new
+ InvalidOptionsException("Must specify Accumulo instance.");
+ }
+ if (options.getAccumuloTable() != null
+ && options.getAccumuloZookeepers() == null) {
+ throw new
+ InvalidOptionsException("Must specify Zookeeper server(s).");
+ }
+ }
+
protected void validateHCatalogOptions(SqoopOptions options)
throws InvalidOptionsException {
// Make sure that one of hCatalog or hive jobs are used
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index fbbde1d1..50826e9b 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -685,6 +685,7 @@ public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getHBaseOptions());
toolOptions.addUniqueOptions(getHCatalogOptions());
toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
+ toolOptions.addUniqueOptions(getAccumuloOptions());
// get common codegen opts.
RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
@@ -856,6 +857,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
applyCodeGenOptions(in, out, allTables);
applyHBaseOptions(in, out);
applyHCatOptions(in, out);
+ applyAccumuloOptions(in, out);
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Error: expected numeric argument.\n"
@@ -890,7 +892,9 @@ protected void validateImportOptions(SqoopOptions options)
"Cannot specify --" + SQL_QUERY_ARG + " and --table together."
+ HELP_STR);
} else if (options.getSqlQuery() != null
- && options.getTargetDir() == null && options.getHBaseTable() == null) {
+ && options.getTargetDir() == null
+ && options.getHBaseTable() == null
+ && options.getAccumuloTable() == null) {
throw new InvalidOptionsException(
"Must specify destination with --target-dir."
+ HELP_STR);
@@ -987,6 +991,7 @@ public void validateOptions(SqoopOptions options)
validateHBaseOptions(options);
validateHiveOptions(options);
validateHCatalogOptions(options);
+ validateAccumuloOptions(options);
}
}
diff --git a/src/test/org/apache/sqoop/accumulo/AccumuloTestCase.java b/src/test/org/apache/sqoop/accumulo/AccumuloTestCase.java
new file mode 100644
index 00000000..bc773693
--- /dev/null
+++ b/src/test/org/apache/sqoop/accumulo/AccumuloTestCase.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.After;
+import org.junit.Before;
+
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Utility methods that facilitate Accumulo import tests.
+ * These test use the MiniAccumuloCluster. They are
+ * absolutely not thread safe.
+ */
+public abstract class AccumuloTestCase extends ImportJobTestCase {
+ private static final String ACCUMULO_USER="root";
+ private static final String ACCUMULO_PASSWORD="rootroot";
+
+ /*
+ * This is to restore test.build.data system property which gets reset
+ * when Accumulo tests are run. Since other tests in Sqoop also depend upon
+ * this property, they can fail if are run subsequently in the same VM.
+ */
+ private static String testBuildDataProperty = "";
+
+ private static void recordTestBuildDataProperty() {
+ testBuildDataProperty = System.getProperty("test.build.data", "");
+ }
+
+ private static void restoreTestBuidlDataProperty() {
+ System.setProperty("test.build.data", testBuildDataProperty);
+ }
+
+ public static final Log LOG = LogFactory.getLog(
+ AccumuloTestCase.class.getName());
+
+ protected static MiniAccumuloCluster accumuloCluster;
+ protected static File tempDir;
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(String accumuloTable,
+ String accumuloColFam, boolean accumuloCreate,
+ String queryStr) {
+
+ ArrayList args = new ArrayList();
+
+ if (null != queryStr) {
+ args.add("--query");
+ args.add(queryStr);
+ } else {
+ args.add("--table");
+ args.add(getTableName());
+ }
+ args.add("--split-by");
+ args.add(getColName(0));
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--num-mappers");
+ args.add("1");
+ args.add("--accumulo-column-family");
+ args.add(accumuloColFam);
+ args.add("--accumulo-table");
+ args.add(accumuloTable);
+ if (accumuloCreate) {
+ args.add("--accumulo-create-table");
+ }
+ args.add("--accumulo-instance");
+ args.add(accumuloCluster.getInstanceName());
+ args.add("--accumulo-zookeepers");
+ args.add(accumuloCluster.getZooKeepers());
+ args.add("--accumulo-user");
+ args.add(ACCUMULO_USER);
+ args.add("--accumulo-password");
+ args.add(ACCUMULO_PASSWORD);
+
+ return args.toArray(new String[0]);
+ }
+
+ protected static void setUpCluster() throws Exception {
+ File temp = File.createTempFile("test", "tmp");
+ tempDir = new File(temp.getParent(), "accumulo"
+ + System.currentTimeMillis());
+ tempDir.mkdir();
+ tempDir.deleteOnExit();
+ temp.delete();
+ accumuloCluster = new MiniAccumuloCluster(tempDir, ACCUMULO_PASSWORD);
+ accumuloCluster.start();
+ }
+
+ protected static void cleanUpCluster() throws Exception {
+ accumuloCluster.stop();
+ delete(tempDir);
+ }
+
+ protected static void delete(File dir) {
+ if (dir.isDirectory()) {
+ File[] kids = dir.listFiles();
+ for (File f : kids) {
+ if (f.isDirectory()) {
+ delete(f);
+ } else {
+ f.delete();
+ }
+ }
+ }
+ dir.delete();
+ }
+
+ @Override
+ @Before
+ public void setUp() {
+ try {
+ setUpCluster();
+ } catch (Exception e) {
+ LOG.error("Error setting up MiniAccumuloCluster.", e);
+ }
+ AccumuloTestCase.recordTestBuildDataProperty();
+ super.setUp();
+ }
+
+ @Override
+ @After
+ public void tearDown() {
+ super.tearDown();
+ try {
+ cleanUpCluster();
+ } catch (Exception e) {
+ LOG.error("Error stopping MiniAccumuloCluster.", e);
+ }
+ }
+
+ protected void verifyAccumuloCell(String tableName, String rowKey,
+ String colFamily, String colName, String val) throws IOException {
+ try {
+ Instance inst = new ZooKeeperInstance(accumuloCluster.getInstanceName(),
+ accumuloCluster.getZooKeepers());
+ Connector conn = inst.getConnector(ACCUMULO_USER,
+ new PasswordToken(ACCUMULO_PASSWORD));
+ Scanner scanner = conn.createScanner(tableName, Constants.NO_AUTHS);
+ scanner.setRange(new Range(rowKey));
+ Iterator> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+ String columnFamily = entry.getKey().getColumnFamily().toString();
+ String qual = entry.getKey().getColumnQualifier().toString();
+ if (columnFamily.equals(colFamily)
+ && qual.equals(colName)) {
+ String value = entry.getValue().toString();
+ if (null == val) {
+ assertNull("Got a result when expected null", value);
+ } else {
+ assertNotNull("No result, but we expected one", value);
+ assertEquals(val, value);
+ }
+ }
+ }
+ } catch (AccumuloException e) {
+ throw new IOException("AccumuloException in verifyAccumuloCell", e);
+ } catch (AccumuloSecurityException e) {
+ throw new IOException("AccumuloSecurityException in verifyAccumuloCell",
+ e);
+ } catch (TableNotFoundException e) {
+ throw new IOException("TableNotFoundException in verifyAccumuloCell", e);
+ }
+ }
+}
diff --git a/src/test/org/apache/sqoop/accumulo/TestAccumuloImport.java b/src/test/org/apache/sqoop/accumulo/TestAccumuloImport.java
new file mode 100644
index 00000000..d52f0f08
--- /dev/null
+++ b/src/test/org/apache/sqoop/accumulo/TestAccumuloImport.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * Test imports of tables into Accumulo.
+ */
+public class TestAccumuloImport extends AccumuloTestCase {
+
+ @Test
+ public void testBasicUsage() throws IOException {
+ String [] argv = getArgv("BasicUsage", "BasicColFam", true, null);
+ String [] types = { "INT", "INT" };
+ String [] vals = { "0", "1" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyAccumuloCell("BasicUsage", "0", "BasicColFam", getColName(1), "1");
+ }
+
+ @Test
+ public void testMissingTableFails() throws IOException {
+ // Test that if the table doesn't exist, we fail unless we
+ // explicitly create the table.
+ String [] argv = getArgv("MissingTable", "MissingFam", false, null);
+ String [] types = { "INT", "INT" };
+ String [] vals = { "0", "1" };
+ createTableWithColTypes(types, vals);
+ try {
+ runImport(argv);
+ fail("Expected Exception");
+ } catch (IOException e) {
+ LOG.info("Got exception -- ok; we expected that job to fail.");
+ }
+ }
+
+ @Test
+ public void testOverwriteSucceeds() throws IOException {
+ // Test that we can create a table and then import immediately
+ // back on top of it without problem.
+ String [] argv = getArgv("OverwriteT", "OverwriteF", true, null);
+ String [] types = { "INT", "INT" };
+ String [] vals = { "0", "1" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyAccumuloCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
+ // Run a second time.
+ runImport(argv);
+ verifyAccumuloCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
+ }
+
+ @Test
+ public void testStrings() throws IOException {
+ String [] argv = getArgv("stringT", "stringF", true, null);
+ String [] types = { "INT", "VARCHAR(32)" };
+ String [] vals = { "0", "'abc'" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyAccumuloCell("stringT", "0", "stringF", getColName(1), "abc");
+ }
+
+ @Test
+ public void testNulls() throws IOException {
+ String [] argv = getArgv("nullT", "nullF", true, null);
+ String [] types = { "INT", "INT", "INT" };
+ String [] vals = { "0", "42", "null" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ // This cell should import correctly.
+ verifyAccumuloCell("nullT", "0", "nullF", getColName(1), "42");
+
+ // This cell should not be placed in the results..
+ verifyAccumuloCell("nullT", "0", "nullF", getColName(2), null);
+ }
+
+ @Test
+ public void testExitFailure() throws IOException {
+ String [] argv = getArgv("NoAccumuloT", "NoAccumuloF", true, null);
+ String [] types = { "INT", "INT", "INT" };
+ String [] vals = { "0", "42", "43" };
+ createTableWithColTypes(types, vals);
+ try {
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
+ runImport(argv);
+ fail("should have gotten exception");
+ } catch (IOException e) {
+ // Got the exception, so we're happy
+ LOG.info("Got exception -- ok; we expected that to fail.");
+ } finally {
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
+ }
+ }
+}
diff --git a/src/test/org/apache/sqoop/accumulo/TestAccumuloQueryImport.java b/src/test/org/apache/sqoop/accumulo/TestAccumuloQueryImport.java
new file mode 100644
index 00000000..be735940
--- /dev/null
+++ b/src/test/org/apache/sqoop/accumulo/TestAccumuloQueryImport.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * Test import of free-form query into Accumulo.
+ */
+public class TestAccumuloQueryImport extends AccumuloTestCase {
+
+ @Test
+ public void testImportFromQuery() throws IOException {
+ String [] types = { "INT", "INT", "INT" };
+ String [] vals = { "0", "42", "43" };
+ createTableWithColTypes(types, vals);
+
+ String [] argv = getArgv("queryT", "queryF", true,
+ "SELECT " + getColName(0) + ", " + getColName(1) + " FROM "
+ + getTableName() + " WHERE $CONDITIONS");
+ runImport(argv);
+
+ // This cell should import correctly.
+ verifyAccumuloCell("queryT", "0", "queryF", getColName(1), "42");
+
+ // This cell should not be placed in the results..
+ verifyAccumuloCell("queryT", "0", "queryF", getColName(2), null);
+ }
+
+ @Test
+ public void testExitFailure() throws IOException {
+ String [] argv = getArgv("NoAccumuloT", "NoAccumuloF", true, null);
+ String [] types = { "INT", "INT", "INT" };
+ String [] vals = { "0", "42", "43" };
+ createTableWithColTypes(types, vals);
+ try {
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
+ runImport(argv);
+ fail("should have gotten exception");
+ } catch (IOException e) {
+ // Got the exception, so we're happy
+ LOG.info("Got exception -- ok; we expected that to fail.");
+ } finally {
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
+ }
+ }
+}
diff --git a/src/test/org/apache/sqoop/accumulo/TestAccumuloUtil.java b/src/test/org/apache/sqoop/accumulo/TestAccumuloUtil.java
new file mode 100644
index 00000000..c236b8af
--- /dev/null
+++ b/src/test/org/apache/sqoop/accumulo/TestAccumuloUtil.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.accumulo;
+
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests to verify that Accumulo is present (default when running
+ * test cases) and that when in fake not present mode, the method returns
+ * false.
+ */
+public class TestAccumuloUtil extends TestCase {
+
+ @Test
+ public void testAccumuloPresent() {
+ assertTrue(AccumuloUtil.isAccumuloJarPresent());
+ }
+
+ @Test
+ public void testAccumuloNotPresent() {
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
+ boolean present = AccumuloUtil.isAccumuloJarPresent();
+ AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
+ assertFalse(present);
+ }
+}