5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 03:39:29 +08:00

SQOOP-767: Add support for Accumulo

(Philip A Grim II via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-11-20 13:42:23 -08:00
parent 07cc3b3fad
commit 5f7013eae1
22 changed files with 1881 additions and 8 deletions

View File

@ -86,6 +86,20 @@ if [ -z "${HCAT_HOME}" ]; then
HCAT_HOME=${SQOOP_HOME}/../hive-hcatalog
fi
fi
if [ -z "${ACCUMULO_HOME}" ]; then
if [ -d "/usr/lib/accumulo" ]; then
ACCUMULO_HOME=/usr/lib/accumulo
else
ACCUMULO_HOME=${SQOOP_HOME}/../accumulo
fi
fi
if [ -z "${ZOOKEEPER_HOME}" ]; then
if [ -d "/usr/lib/zookeeper" ]; then
ZOOKEEPER_HOME=/usr/lib/zookeeper
else
ZOOKEEPER_HOME=${SQOOP_HOME}/../zookeeper
fi
fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@ -111,6 +125,15 @@ if [ ! -d "${HCAT_HOME}" ]; then
echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
fi
if [ ! -d "${ACCUMULO_HOME}" ]; then
echo "Warning: $ACCUMULO_HOME does not exist! Accumulo imports will fail."
echo 'Please set $ACCUMULO_HOME to the root of your Accumulo installation.'
fi
if [ ! -d "${ZOOKEEPER_HOME}" ]; then
echo "Warning: $ZOOKEEPER_HOME does not exist! Accumulo imports will fail."
echo 'Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.'
fi
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@ -150,6 +173,16 @@ if [ -e "${HCAT_HOME}/bin/hcat" ]; then
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
# Add Accumulo to dependency list
if [ -e "$ACCUMULO_HOME/bin/accumulo" ]; then
for jn in `$ACCUMULO_HOME/bin/accumulo classpath | grep file:.*accumulo.*jar | cut -d':' -f2`; do
SQOOP_CLASSPATH=$SQOOP_CLASSPATH:$jn
done
for jn in `$ACCUMULO_HOME/bin/accumulo classpath | grep file:.*zookeeper.*jar | cut -d':' -f2`; do
SQOOP_CLASSPATH=$SQOOP_CLASSPATH:$jn
done
fi
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
if [ -d "${ZOOCFGDIR}" ]; then
SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@ -182,4 +215,4 @@ export HADOOP_MAPRED_HOME
export HBASE_HOME
export HCAT_HOME
export HIVE_CONF_DIR
export ACCUMULO_HOME

View File

@ -72,6 +72,14 @@ if not defined HBASE_HOME (
echo Warning: HBASE_HOME and HBASE_VERSION not set.
)
)
::
:: Check for Accumulo dependency
if not defined ACCUMULO_HOME (
echo Warning: ACCUMULO_HOME not set.
)
if not defined ZOOKEEPER_HOME (
echo Warning: ZOOKEEPER_HOME not set.
)
:: Check: If we can't find our dependencies, give up here.
@ -91,6 +99,14 @@ if not exist "%HBASE_HOME%" (
echo Warning: HBASE_HOME does not exist! HBase imports will fail.
echo Please set HBASE_HOME to the root of your HBase installation.
)
if not exist "%ACCUMULO_HOME%" (
echo Warning: ACCUMULO_HOME does not exist! Accumulo imports will fail.
echo Please set ACCUMULO_HOME to the root of your Accumulo installation.
)
if not exist "%ZOOKEEPER_HOME%" (
echo Warning: ZOOKEEPER_HOME does not exist! Accumulo imports will fail.
echo Please set ZOOKEEPER_HOME to the root of your Zookeeper installation.
)
:: Add sqoop dependencies to classpath
set SQOOP_CLASSPATH=
@ -114,6 +130,12 @@ if exist "%HBASE_HOME%" (
call :add_dir_to_classpath %HBASE_HOME%
call :add_dir_to_classpath %HBASE_HOME%\lib
)
::
:: Add Accumulo to dependency list
if exist "%ACCUMULO_HOME%" (
call :add_dir_to_classpath %ACCUMULO_HOME%
call :add_dir_to_classpath %ACCUMULO_HOME%\lib
)
if not defined ZOOCFGDIR (
if defined ZOOKEEPER_CONF_DIR (

19
ivy.xml
View File

@ -36,21 +36,22 @@ under the License.
<conf name="common" visibility="private"
extends="runtime"
description="artifacts needed to compile/test the application"/>
<conf name="accumulo" visibility="private" />
<conf name="hbase94" visibility="private" />
<conf name="hbase95" visibility="private" extends="hbasecompat${hbasecompatprofile}" />
<conf name="hbasecompat1" visibility="private" />
<conf name="hbasecompat2" visibility="private" />
<conf name="hcatalog" visibility="private" />
<conf name="hadoop23" visibility="private"
extends="common,runtime,hbase${hbaseprofile},hcatalog" />
extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
<conf name="hadoop20" visibility="private"
extends="common,runtime,hbase${hbaseprofile},hcatalog" />
extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
<conf name="hadoop100" visibility="private"
extends="common,runtime,hbase${hbaseprofile},hcatalog" />
extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
<conf name="hadoop200" visibility="private"
extends="common,runtime,hbase${hbaseprofile},hcatalog" />
extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
<conf name="hadoop210" visibility="private"
extends="common,runtime,hbase${hbaseprofile},hcatalog" />
extends="common,runtime,hbase${hbaseprofile},hcatalog,accumulo" />
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@ -185,6 +186,14 @@ under the License.
<dependency org="commons-collections" name="commons-collections"
rev="${commons-collections.version}" conf="releaseaudit->default"/>
<!-- Accumulo 1.5.0 -->
<dependency org="org.apache.accumulo" name="accumulo-core" rev="1.5.0"
conf="accumulo->default">
</dependency>
<dependency org="org.apache.accumulo" name="accumulo-minicluster" rev="1.5.0"
conf="accumulo->default">
</dependency>
<!-- HBase 0.94 -->
<dependency org="org.apache.hbase" name="hbase" rev="${hbase94.version}" conf="hbase94->default">
<artifact name="hbase" type="jar"/>

View File

@ -0,0 +1,50 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
.Accumulo arguments:
[grid="all"]
`-------------------------------------`-----------------------------------
Argument Description
--------------------------------------------------------------------------
+\--accumulo-table <table-nam>+ Specifies an Accumulo table to use\
as the target instead of HDFS
+\--accumulo-column-family <family>+ Sets the target column family for\
the import
+\--accumulo-create-table+ If specified, create missing\
Accumulo tables
+\--accumulo-row-key <col>+ Specifies which input column to use\
as the row key
+\--accumulo-visibility <vis>+ (Optional) Specifies a visibility\
token to apply to all rows inserted\
into Accumulo. Default is the\
empty string.
+\--accumulo-batch-size <size>+ (Optional) Sets the size in bytes\
of Accumulo's write buffer. Default\
is 4MB.
+\--accumulo-max-latency <ms>+ (Optional) Sets the max latency in\
milliseconds for the Accumulo\
batch writer. Default is 0.
+\--accumulo-zookeepers <host:port>+ Comma-separated list of Zookeeper\
servers used by the Accumulo instance
+\--accumulo-instance <table-name>+ Name of the target Accumulo instance
+\--accumulo-user <username>+ Name of the Accumulo user to import as
+\--accumulo-password <password>+ Password for the Accumulo user
--------------------------------------------------------------------------

View File

@ -0,0 +1,65 @@
////
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Importing Data Into Accumulo
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop supports importing records into a table in Accumulo
By specifying +\--accumulo-table+, you instruct Sqoop to import
to a table in Accumulo rather than a directory in HDFS. Sqoop will
import data to the table specified as the argument to +\--accumulo-table+.
Each row of the input table will be transformed into an Accumulo
+Mutation+ operation to a row of the output table. The key for each row is
taken from a column of the input. By default Sqoop will use the split-by
column as the row key column. If that is not specified, it will try to
identify the primary key column, if any, of the source table. You can
manually specify the row key column with +\--accumulo-row-key+. Each output
column will be placed in the same column family, which must be specified
with +\--accumulo-column-family+.
NOTE: This function is incompatible with direct import (parameter
+\--direct+), and cannot be used in the same operation as an HBase import.
If the target table does not exist, the Sqoop job will
exit with an error, unless the +--accumulo-create-table+ parameter is
specified. Otherwise, you should create the target table before running
an import.
Sqoop currently serializes all values to Accumulo by converting each field
to its string representation (as if you were importing to HDFS in text
mode), and then inserts the UTF-8 bytes of this string in the target
cell.
By default, no visibility is applied to the resulting cells in Accumulo,
so the data will be visible to any Accumulo user. Use the
+\--accumulo-visibility+ parameter to specify a visibility token to
apply to all rows in the import job.
For performance tuning, use the optional +\--accumulo-buffer-size\+ and
+\--accumulo-max-latency+ parameters. See Accumulo's documentation for
an explanation of the effects of these parameters.
In order to connect to an Accumulo instance, you must specify the location
of a Zookeeper ensemble using the +\--accumulo-zookeepers+ parameter,
the name of the Accumulo instance (+\--accumulo-instance+), and the
username and password to connect with (+\--accumulo-user+ and
+\--accumulo-password+ respectively).

View File

@ -554,6 +554,9 @@ include::hive.txt[]
include::hbase-args.txt[]
include::hbase.txt[]
include::accumulo-args.txt[]
include::accumulo.txt[]
include::codegen-args.txt[]
As mentioned earlier, a byproduct of importing a table to HDFS is a

View File

@ -101,7 +101,7 @@ The following are the limitations in the current implementation:
* all-tables option
* free-form query option
* Data imported into Hive or HBase
* Data imported into Hive, HBase or Accumulo
* table import with --where argument
* incremental imports

View File

@ -39,6 +39,7 @@
import com.cloudera.sqoop.tool.SqoopTool;
import com.cloudera.sqoop.util.RandomHash;
import com.cloudera.sqoop.util.StoredAsProperty;
import org.apache.sqoop.accumulo.AccumuloConstants;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.validation.AbortOnFailureHandler;
@ -257,6 +258,46 @@ public String toString() {
// "key" column for the merge operation.
@StoredAsProperty("merge.key.col") private String mergeKeyCol;
// Accumulo home directory
private String accumuloHome; // not serialized to metastore.
// Zookeeper home directory
private String zookeeperHome; // not serialized to metastore.
// Accumulo table to import into.
@StoredAsProperty("accumulo.table") private String accumuloTable;
// Column family to prepend to inserted cols.
@StoredAsProperty("accumulo.col.family") private String accumuloColFamily;
// Column of the input to use as the row key.
@StoredAsProperty("accumulo.row.key.col") private String accumuloRowKeyCol;
//
// Visibility token to be applied to each row imported.
@StoredAsProperty("accumulo.visibility") private String accumuloVisibility;
// Size of the write buffer.
@StoredAsProperty("accumulo.batch.size")
private long accumuloBatchSize;
// Maximum latency for batch writer.
@StoredAsProperty("accumulo.max.latency")
private long accumuloMaxLatency;
// if true, create table.
@StoredAsProperty("accumulo.create.table")
private boolean accumuloCreateTable;
// Accumulo user name
@StoredAsProperty("accumulo.user") private String accumuloUser;
// Accumulo password
@StoredAsProperty("accumulo.password") private String accumuloPassword;
// Accumulo instance
@StoredAsProperty("accumulo.instance") private String accumuloInstance;
// Accumulo zookeeper
@StoredAsProperty("accumulo.zookeepers") private String accumuloZookeepers;
// These next two fields are not serialized to the metastore.
// If this SqoopOptions is created by reading a saved job, these will
@ -846,6 +887,8 @@ private void initDefaults(Configuration baseConfiguration) {
// default action is to run the full pipeline.
this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
this.accumuloHome = getAccumuloHomeDefault();
this.zookeeperHome = getZookeeperHomeDefault();
this.hiveHome = getHiveHomeDefault();
this.hCatHome = getHCatHomeDefault();
@ -902,6 +945,11 @@ private void initDefaults(Configuration baseConfiguration) {
this.mapColumnHive = new Properties();
this.mapColumnJava = new Properties();
// Set Accumulo batch size defaults, since 0 is not the same
// as "not set"
this.accumuloBatchSize = AccumuloConstants.DEFAULT_BATCH_SIZE;
this.accumuloMaxLatency = AccumuloConstants.DEFAULT_LATENCY;
// We do not want to be verbose too much if not explicitly needed
this.verbose = false;
this.isValidationEnabled = false; // validation is disabled by default
@ -2100,6 +2148,195 @@ public String getMergeKeyCol() {
return this.mergeKeyCol;
}
public static String getAccumuloHomeDefault() {
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
String accumuloHome = System.getenv("ACCUMULO_HOME");
accumuloHome = System.getProperty("accumulo.home", accumuloHome);
return accumuloHome;
}
public static String getZookeeperHomeDefault() {
// Set this with $ZOOKEEPER_HOME, but -Dzookeeper.home can override.
String zookeeperHome = System.getenv("ZOOKEEPER_HOME");
zookeeperHome = System.getProperty("zookeeper.home", zookeeperHome);
return zookeeperHome;
}
public String getAccumuloHome() {
return accumuloHome;
}
public void setAccumuloHome(String home) {
this.accumuloHome = home;
}
public String getZookeeperHome() {
return zookeeperHome;
}
public void setZookeeperHome(String home) {
this.zookeeperHome = home;
}
/**
* Set whether we should create missing Accumulo tables.
*/
public void setCreateAccumuloTable(boolean create) {
this.accumuloCreateTable = create;
}
/**
* Returns true if we should create Accumulo tables/column families
* that are missing.
*/
public boolean getCreateAccumuloTable() {
return this.accumuloCreateTable;
}
/**
* Sets the Accumulo batch size (in bytes).
*/
public void setAccumuloBatchSize(long batchSize) {
this.accumuloBatchSize = batchSize;
}
/**
* Gets the Accumulo batch size (in bytes).
*/
public long getAccumuloBatchSize() {
return this.accumuloBatchSize;
}
/**
* Sets the Accumulo target column family.
*/
public void setAccumuloColFamily(String colFamily) {
this.accumuloColFamily = colFamily;
}
/**
* Gets the Accumulo import target column family.
*/
public String getAccumuloColFamily() {
return this.accumuloColFamily;
}
/**
* Sets the Accumulo max latency.
*/
public void setAccumuloMaxLatency(long maxLatency) {
this.accumuloMaxLatency = maxLatency;
}
/**
* Gets the Accumulo max latency.
*/
public long getAccumuloMaxLatency() {
return this.accumuloMaxLatency;
}
/**
* Gets the column to use as the row id in an Accumulo import.
* If null, use the primary key column.
*/
public String getAccumuloRowKeyColumn() {
return this.accumuloRowKeyCol;
}
/**
* Sets the column to use as the row id in an Accumulo import.
*
*/
public void setAccumuloRowKeyColumn(String col) {
this.accumuloRowKeyCol = col;
}
/**
* Gets the visibility token to use.
* If null, don't assign a visibility.
*/
public String getAccumuloVisibility() {
return this.accumuloVisibility;
}
/**
* Sets the visibility token to use.
*
*/
public void setAccumuloVisibility(String vis) {
this.accumuloVisibility = vis;
}
/**
* Gets the target Accumulo table name, if any.
*/
public String getAccumuloTable() {
return this.accumuloTable;
}
/**
* Sets the target Accumulo table name.
*/
public void setAccumuloTable(String table) {
this.accumuloTable = table;
}
/**
* Gets the target Accumulo user name, if any.
*/
public String getAccumuloUser() {
return this.accumuloUser;
}
/**
* Sets the target Accumulo user name for an import.
*/
public void setAccumuloUser(String user) {
this.accumuloUser = user;
}
/**
* Gets the target Accumulo password, if any.
*/
public String getAccumuloPassword() {
return this.accumuloPassword;
}
/**
* Sets the target Accumulo password for an import.
*/
public void setAccumuloPassword(String passwd) {
this.accumuloPassword = passwd;
}
/**
* Gets the target Accumulo instance, if any.
*/
public String getAccumuloInstance() {
return this.accumuloInstance;
}
/**
* Sets the target Accumulo instance for an import.
*/
public void setAccumuloInstance(String instance) {
this.accumuloInstance = instance;
}
/**
* Gets the target Accumulo zookeeper instance, if any.
*/
public String getAccumuloZookeepers() {
return this.accumuloZookeepers;
}
/**
** Sets the target Accumulo zookeeper instance for an import.
**/
public void setAccumuloZookeepers(String zookeepers) {
this.accumuloZookeepers = zookeepers;
}
public void setConnManagerClassName(String connManagerClass) {
this.connManagerClassName = connManagerClass;
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
/**
* This class provides constants used to define properties relating to
* Accumulo imports.
*/
public final class AccumuloConstants {
// Some defaults to use if these aren't specified.
// Default buffer size for BatchWriter
public static final long DEFAULT_BATCH_SIZE = 10240000L;
// Default latency for BatchWriter
public static final long DEFAULT_LATENCY = 5000L;
/** Configuration key specifying the table to insert into. */
public static final String TABLE_NAME_KEY = "sqoop.accumulo.insert.table";
/** Configuration key specifying the column family to insert into. */
public static final String COL_FAMILY_KEY =
"sqoop.accumulo.insert.column.family";
/** Configuration key specifying the column of the input whose value
* should be used as the row id.
*/
public static final String ROW_KEY_COLUMN_KEY =
"sqoop.accumulo.insert.row.key.column";
/** Configuration key specifying the column of the input whose value
* should be used as the row id.
*/
public static final String VISIBILITY_KEY =
"sqoop.accumulo.insert.visibility";
/**
* Configuration key specifying the Transformer implementation to use.
*/
public static final String TRANSFORMER_CLASS_KEY =
"sqoop.accumulo.insert.put.transformer.class";
public static final String MAX_LATENCY =
"sqoop.accumulo.max.latency";
public static final String BATCH_SIZE =
"sqoop.accumulo.batch.size";
public static final String ZOOKEEPERS =
"sqoop.accumulo.zookeeper.hostnames";
public static final String ACCUMULO_INSTANCE =
"sqoop.accumulo.instance.name";
public static final String ACCUMULO_USER_NAME =
"sqoop.accumulo.user.name";
public static final String ACCUMULO_PASSWORD =
"sqoop.accumulo.password";
public static final String ACCUMULO_SITE_XML_PATH =
"/conf/accumulo-site.xml";
// Prevent instantiation.
private AccumuloConstants(){
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.ProcessingException;
/**
* SqoopRecordProcessor that performs an Accumulo mutation operation
* that contains all the fields of the record.
*/
public class AccumuloMutationProcessor implements Closeable, Configurable,
FieldMapProcessor {
private Configuration conf;
// An object that can transform a map of fieldName->object
// into a Mutation.
private MutationTransformer mutationTransformer;
private String tableName;
private BatchWriter table;
public AccumuloMutationProcessor() {
}
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration config) {
this.conf = config;
// Get the implementation of MutationTransformer to use.
// By default, we call toString() on every non-null field.
Class<? extends MutationTransformer> xformerClass =
(Class<? extends MutationTransformer>)
this.conf.getClass(AccumuloConstants.TRANSFORMER_CLASS_KEY,
ToStringMutationTransformer.class);
this.mutationTransformer = (MutationTransformer)
ReflectionUtils.newInstance(xformerClass, this.conf);
if (null == mutationTransformer) {
throw new RuntimeException("Could not instantiate MutationTransformer.");
}
String colFam = conf.get(AccumuloConstants.COL_FAMILY_KEY, null);
if (null == colFam) {
throw new RuntimeException("Accumulo column family not set.");
}
this.mutationTransformer.setColumnFamily(colFam);
String rowKey = conf.get(AccumuloConstants.ROW_KEY_COLUMN_KEY, null);
if (null == rowKey) {
throw new RuntimeException("Row key column not set.");
}
this.mutationTransformer.setRowKeyColumn(rowKey);
String vis = conf.get(AccumuloConstants.VISIBILITY_KEY, null);
this.mutationTransformer.setVisibility(vis);
this.tableName = conf.get(AccumuloConstants.TABLE_NAME_KEY, null);
String zookeeper = conf.get(AccumuloConstants.ZOOKEEPERS);
String instance = conf.get(AccumuloConstants.ACCUMULO_INSTANCE);
Instance inst = new ZooKeeperInstance(instance, zookeeper);
String username = conf.get(AccumuloConstants.ACCUMULO_USER_NAME);
String pw = conf.get(AccumuloConstants.ACCUMULO_PASSWORD);
if (null == pw) {
pw = "";
}
byte[] password = pw.getBytes();
BatchWriterConfig bwc = new BatchWriterConfig();
long bs = conf.getLong(AccumuloConstants.BATCH_SIZE,
AccumuloConstants.DEFAULT_BATCH_SIZE);
bwc.setMaxMemory(bs);
long la = conf.getLong(AccumuloConstants.MAX_LATENCY,
AccumuloConstants.DEFAULT_LATENCY);
bwc.setMaxLatency(la, TimeUnit.MILLISECONDS);
try {
Connector conn = inst.getConnector(username, new PasswordToken(password));
this.table = conn.createBatchWriter(tableName, bwc);
} catch (AccumuloException ex) {
throw new RuntimeException("Error accessing Accumulo", ex);
} catch (AccumuloSecurityException aex){
throw new RuntimeException("Security exception accessing Accumulo", aex);
} catch(TableNotFoundException tex){
throw new RuntimeException("Accumulo table " + tableName
+ " not found", tex);
}
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
/**
* Processes a record by extracting its field map and converting
* it into a list of Mutations into Accumulo.
*/
public void accept(FieldMappable record)
throws IOException, ProcessingException {
Map<String, Object> fields = record.getFieldMap();
Iterable<Mutation> putList = mutationTransformer.getMutations(fields);
if (null != putList) {
for (Mutation m : putList) {
try {
this.table.addMutation(m);
} catch (MutationsRejectedException ex) {
throw new IOException("Mutation rejected" , ex);
}
}
}
}
@Override
/**
* Closes the Accumulo table and commits all pending operations.
*/
public void close() throws IOException {
try {
this.table.close();
} catch (MutationsRejectedException ex) {
throw new IOException("Mutations rejected", ex);
}
}
}

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.config.ConfigurationConstants;
/**
* This class provides a method that checks if Accumulo jars are present in the
* current classpath. It also provides a setAlwaysNoAccumuloJarMode mechanism
* for testing and simulation the condition where the is on Accumulo jar (since
* Accumulo is pulled automatically by ivy)
*/
public final class AccumuloUtil {
private static final Log LOG = LogFactory.getLog(AccumuloUtil.class);
private static boolean testingMode = false;
private static final String INSTANCE_CLASS
= "org.apache.accumulo.core.client.Instance";
// Prevent instantiation
private AccumuloUtil() {
}
/**
* This is a way to make this always return false for testing.
*/
public static void setAlwaysNoAccumuloJarMode(boolean mode) {
testingMode = mode;
}
public static boolean isAccumuloJarPresent() {
if (testingMode) {
return false;
}
try {
Class.forName(INSTANCE_CLASS);
} catch (ClassNotFoundException cnfe) {
return false;
}
return true;
}
/**
* Add the Accumulo jar files to local classpath and dist cache.
* @throws IOException
*/
public static void addJars(Job job, SqoopOptions options) throws IOException {
if (isLocalJobTracker(job)) {
LOG.info("Not adding Accumulo jars to distributed cache in local mode");
} else if (options.isSkipDistCache()) {
LOG.info("Not adding Accumulo jars to distributed cache as requested");
} else {
Configuration conf = job.getConfiguration();
String accumuloHome = null;
String zookeeperHome = null;
FileSystem fs = FileSystem.getLocal(conf);
if (options != null) {
accumuloHome = options.getAccumuloHome();
}
if (accumuloHome == null) {
accumuloHome = SqoopOptions.getAccumuloHomeDefault();
}
LOG.info("Accumulo job : Accumulo Home = " + accumuloHome);
if (options != null) {
zookeeperHome = options.getZookeeperHome();
}
if (zookeeperHome == null) {
zookeeperHome = SqoopOptions.getZookeeperHomeDefault();
}
LOG.info("Accumulo job : Zookeeper Home = " + zookeeperHome);
conf.addResource(accumuloHome + AccumuloConstants.ACCUMULO_SITE_XML_PATH);
// Add any libjars already specified
Set<String> localUrls = new HashSet<String>();
localUrls
.addAll(conf.getStringCollection(
ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
String dir = accumuloHome + File.separator + "lib";
LOG.info("Adding jar files under " + dir + " to distributed cache");
addDirToCache(new File(dir), fs, localUrls, false);
dir = zookeeperHome;
LOG.info("Adding jar files under " + dir + " to distributed cache");
addDirToCache(new File(dir), fs, localUrls, false);
String tmpjars = conf
.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
StringBuilder sb = new StringBuilder(1024);
if (null != tmpjars) {
sb.append(tmpjars);
sb.append(",");
}
sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM,
sb.toString());
}
}
/**
* Add the .jar elements of a directory to the DCache classpath, optionally
* recursively.
*/
private static void addDirToCache(File dir, FileSystem fs,
Set<String> localUrls, boolean recursive) {
if (dir != null) {
File[] fileList = dir.listFiles();
if (fileList != null) {
for (File libFile : dir.listFiles()) {
if (libFile.exists() && !libFile.isDirectory()
&& libFile.getName().endsWith("jar")) {
Path p = new Path(libFile.toString());
if (libFile.canRead()) {
String qualified = p.makeQualified(fs).toString();
LOG.info("Adding to job classpath: " + qualified);
localUrls.add(qualified);
} else {
LOG.warn("Ignoring unreadable file " + libFile);
}
}
if (recursive && libFile.isDirectory()) {
addDirToCache(libFile, fs, localUrls, recursive);
}
}
} else {
LOG.warn("No files under " + dir
+ " to add to distributed cache for Accumulo job");
}
}
}
/**
* Check to see if the job is running in local mode.
*/
public static boolean isLocalJobTracker(Job job) {
boolean retval = false;
Configuration conf = job.getConfiguration();
// If framework is set to YARN, then we can't be running in local mode
if ("yarn".equalsIgnoreCase(conf
.get(ConfigurationConstants.PROP_MAPREDUCE_FRAMEWORK_NAME))) {
retval = false;
} else {
String jtAddr = conf
.get(ConfigurationConstants.PROP_MAPRED_JOB_TRACKER_ADDRESS);
String jtAddr2 = conf
.get(ConfigurationConstants.PROP_MAPREDUCE_JOB_TRACKER_ADDRESS);
retval = (jtAddr != null && jtAddr.equals("local"))
|| (jtAddr2 != null && jtAddr2.equals("local"));
}
return retval;
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.IOException;
import java.util.Map;
import org.apache.accumulo.core.data.Mutation;
/**
* Abstract class that takes a map of jdbc field names to values
* and converts them to Mutations for Accumulo.
*/
public abstract class MutationTransformer {
private String columnFamily;
private String rowKeyColumn;
private String visibility;
/**
* @return the default column family to insert into.
*/
public String getColumnFamily() {
return this.columnFamily;
}
/**
* Set the default column family to insert into.
*/
public void setColumnFamily(String colFamily) {
this.columnFamily = colFamily;
}
/**
* @return the field name identifying the value to use as the row id.
*/
public String getRowKeyColumn() {
return this.rowKeyColumn;
}
/**
* Set the column of the input fields which should be used to calculate
* the row id.
*/
public void setRowKeyColumn(String rowKeyCol) {
this.rowKeyColumn = rowKeyCol;
}
/**
* @return the field name identifying the visibility token.
*/
public String getVisibility() {
return this.visibility;
}
/**
* Set the visibility token to set for each cell.
*/
public void setVisibility(String vis) {
this.visibility = vis;
}
/**
* Returns a list of Mutations that inserts the fields into a row in Accumulo.
* @param fields a map of field names to values to insert.
* @return A list of Mutations that inserts these into Accumulo.
*/
public abstract Iterable<Mutation> getMutations(Map<String, Object> fields)
throws IOException;
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
/**
* PutTransformer that calls toString on all non-null fields.
*/
public class ToStringMutationTransformer extends MutationTransformer {
public static final Log LOG = LogFactory.getLog(
ToStringMutationTransformer.class.getName());
public ToStringMutationTransformer() {
}
/**
* Return the serialized bytes for a field name, using
* the cache if it's already in there.
*/
@Override
public Iterable<Mutation> getMutations(Map<String, Object> fields)
throws IOException {
String rowKeyCol = getRowKeyColumn();
String colFamily = getColumnFamily();
Object rowKey = fields.get(rowKeyCol);
String vis = getVisibility();
if (null == rowKey) {
// If the row-key column is null, we don't insert this row.
LOG.warn("Could not insert row with null value for row-key column: "
+ rowKeyCol);
return null;
}
ColumnVisibility colVis = null;
if (null != vis && vis.length() > 0) {
colVis = new ColumnVisibility(vis);
}
Mutation mut = new Mutation(rowKey.toString());
for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
String colName = fieldEntry.getKey();
if (!colName.equals(rowKeyCol)) {
// This is a regular field, not the row key.
// Add it if it's not null.
Object val = fieldEntry.getValue();
if (null != val) {
if (null == colVis) {
mut.put(new Text(colFamily), new Text(colName),
new Value(val.toString().getBytes("UTF8")));
} else {
mut.put(new Text(colFamily), new Text(colName),
colVis, new Value(val.toString().getBytes("UTF8")));
}
}
}
}
return Collections.singletonList(mut);
}
}

View File

@ -40,6 +40,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.accumulo.AccumuloUtil;
import org.apache.sqoop.mapreduce.AccumuloImportJob;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.LoggingUtils;
@ -594,6 +596,13 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
} else {
importer = new HBaseBulkImportJob(opts, context);
}
} else if (opts.getAccumuloTable() != null) {
// Import to Accumulo.
if (!AccumuloUtil.isAccumuloJarPresent()) {
throw new ImportException("Accumulo jars are not present in "
+ "classpath, cannot import to Accumulo!");
}
importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
@ -630,6 +639,13 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
} else {
importer = new HBaseBulkImportJob(opts, context);
}
} else if (opts.getAccumuloTable() != null) {
// Import to Accumulo.
if (!AccumuloUtil.isAccumuloJarPresent()) {
throw new ImportException("Accumulo jars are not present in classpath,"
+ " cannot import to Accumulo!");
}
importer = new AccumuloImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.accumulo.AccumuloConstants;
import org.apache.sqoop.accumulo.AccumuloMutationProcessor;
import org.apache.sqoop.accumulo.AccumuloUtil;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
import com.cloudera.sqoop.util.ImportException;
/**
* Runs an Accumulo import via DataDrivenDBInputFormat to the
* AccumuloMutationProcessor in the DelegatingOutputFormat.
*/
public class AccumuloImportJob extends DataDrivenImportJob {
public static final Log LOG
= LogFactory.getLog(AccumuloImportJob.class.getName());
protected static SqoopOptions opts;
public AccumuloImportJob(final SqoopOptions opts,
final ImportJobContext importContext) {
super(opts, importContext.getInputFormat(), importContext);
this.opts = opts;
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setOutputKeyClass(SqoopRecord.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(getMapperClass());
}
@Override
protected Class<? extends Mapper> getMapperClass() {
return AccumuloImportMapper.class;
}
@Override
protected Class<? extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
return DelegatingOutputFormat.class;
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
// Use the DelegatingOutputFormat with the AccumuloMutationProcessor.
job.setOutputFormatClass(getOutputFormatClass());
Configuration conf = job.getConfiguration();
conf.setClass("sqoop.output.delegate.field.map.processor.class",
AccumuloMutationProcessor.class, FieldMapProcessor.class);
// Set the Accumulo parameters (table, column family, row key):
conf.set(AccumuloConstants.ZOOKEEPERS,
options.getAccumuloZookeepers());
conf.set(AccumuloConstants.ACCUMULO_INSTANCE,
options.getAccumuloInstance());
conf.set(AccumuloConstants.ACCUMULO_USER_NAME,
options.getAccumuloUser());
String pw = options.getAccumuloPassword();
if (null == pw) {
pw = "";
}
conf.set(AccumuloConstants.ACCUMULO_PASSWORD, pw);
conf.set(AccumuloConstants.TABLE_NAME_KEY,
options.getAccumuloTable());
conf.set(AccumuloConstants.COL_FAMILY_KEY,
options.getAccumuloColFamily());
conf.setLong(AccumuloConstants.BATCH_SIZE,
options.getAccumuloBatchSize());
conf.setLong(AccumuloConstants.MAX_LATENCY,
options.getAccumuloMaxLatency());
// What column of the input becomes the row key?
String rowKeyCol = options.getAccumuloRowKeyColumn();
if (null == rowKeyCol) {
// User didn't explicitly set one. If there's a split-by column set,
// use that.
rowKeyCol = options.getSplitByCol();
}
if (null == rowKeyCol) {
// No split-by column is explicitly set.
// If the table has a primary key, use that.
ConnManager manager = getContext().getConnManager();
rowKeyCol = manager.getPrimaryKey(tableName);
}
if (null == rowKeyCol) {
// Give up here if this is still unset.
throw new IOException(
"Could not determine the row-key column. "
+ "Use --accumulo-row-key to specify the input column that "
+ "names each row.");
}
conf.set(AccumuloConstants.ROW_KEY_COLUMN_KEY, rowKeyCol);
}
@Override
/** Create the target Accumulo table before running the job, if appropriate.*/
protected void jobSetup(Job job) throws IOException, ImportException {
Configuration conf = job.getConfiguration();
String tableName = conf.get(AccumuloConstants.TABLE_NAME_KEY);
String familyName = conf.get(AccumuloConstants.COL_FAMILY_KEY);
String zookeepers = conf.get(AccumuloConstants.ZOOKEEPERS);
String instance = conf.get(AccumuloConstants.ACCUMULO_INSTANCE);
String user = conf.get(AccumuloConstants.ACCUMULO_USER_NAME);
if (null == tableName) {
throw new ImportException(
"Import to Accumulo error: Table name not specified");
}
if (null == familyName) {
throw new ImportException(
"Import to Accumulo error: Column family not specified");
}
try {
// Set up the libjars
AccumuloUtil.addJars(job, opts);
Instance inst = new ZooKeeperInstance(instance, zookeepers);
String password = conf.get(AccumuloConstants.ACCUMULO_PASSWORD);
Connector conn = inst.getConnector(user, new PasswordToken(password));
if (!conn.tableOperations().exists(tableName)) {
if (options.getCreateAccumuloTable()) {
LOG.info("Table " + tableName + " doesn't exist, creating.");
try {
conn.tableOperations().create(tableName);
} catch (TableExistsException e) {
// Should only happen if the table was created
// by another process between the existence check
// and the create command
LOG.info("Table " + tableName + " created by another process.");
}
} else {
throw new ImportException(
"Table "
+ tableName
+ " does not exist, and --accumulo-create-table "
+ "not specified.");
}
}
} catch (AccumuloException e) {
throw new ImportException(e);
} catch (AccumuloSecurityException e) {
throw new ImportException(e);
}
super.jobSetup(job);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
/**
* Imports records by writing them to Accumulo via the DelegatingOutputFormat
* and the AccumuloMutationProcessor.
*/
public class AccumuloImportMapper
extends AutoProgressMapper
<LongWritable, SqoopRecord, SqoopRecord, NullWritable> {
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
context.write(val, NullWritable.get());
}
}

View File

@ -180,6 +180,20 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
"hbase-bulkload";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
//Accumulo arguments.
public static final String ACCUMULO_TABLE_ARG = "accumulo-table";
public static final String ACCUMULO_COL_FAM_ARG = "accumulo-column-family";
public static final String ACCUMULO_ROW_KEY_ARG = "accumulo-row-key";
public static final String ACCUMULO_VISIBILITY_ARG = "accumulo-visibility";
public static final String ACCUMULO_CREATE_TABLE_ARG
= "accumulo-create-table";
public static final String ACCUMULO_BATCH_SIZE_ARG = "accumulo-batch-size";
public static final String ACCUMULO_MAX_LATENCY_ARG = "accumulo-max-latency";
public static final String ACCUMULO_ZOOKEEPERS_ARG = "accumulo-zookeepers";
public static final String ACCUMULO_INSTANCE_ARG = "accumulo-instance";
public static final String ACCUMULO_USER_ARG = "accumulo-user";
public static final String ACCUMULO_PASSWORD_ARG = "accumulo-password";
// Arguments for the saved job management system.
public static final String STORAGE_METASTORE_ARG = "meta-connect";
@ -728,6 +742,116 @@ protected RelatedOptions getHBaseOptions() {
return hbaseOpts;
}
protected RelatedOptions getAccumuloOptions() {
RelatedOptions accumuloOpts =
new RelatedOptions("Accumulo arguments");
accumuloOpts.addOption(OptionBuilder.withArgName("table")
.hasArg()
.withDescription("Import to <table> in Accumulo")
.withLongOpt(ACCUMULO_TABLE_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("family")
.hasArg()
.withDescription("Sets the target column family for the import")
.withLongOpt(ACCUMULO_COL_FAM_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("col")
.hasArg()
.withDescription("Specifies which input column to use as the row key")
.withLongOpt(ACCUMULO_ROW_KEY_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("vis")
.hasArg()
.withDescription("Visibility token to be applied to all rows imported")
.withLongOpt(ACCUMULO_VISIBILITY_ARG)
.create());
accumuloOpts.addOption(OptionBuilder
.withDescription("If specified, create missing Accumulo tables")
.withLongOpt(ACCUMULO_CREATE_TABLE_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("size")
.hasArg()
.withDescription("Batch size in bytes")
.withLongOpt(ACCUMULO_BATCH_SIZE_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("latency")
.hasArg()
.withDescription("Max write latency in milliseconds")
.withLongOpt(ACCUMULO_MAX_LATENCY_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("zookeepers")
.hasArg()
.withDescription("Comma-separated list of zookeepers (host:port)")
.withLongOpt(ACCUMULO_ZOOKEEPERS_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("instance")
.hasArg()
.withDescription("Accumulo instance name.")
.withLongOpt(ACCUMULO_INSTANCE_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("user")
.hasArg()
.withDescription("Accumulo user name.")
.withLongOpt(ACCUMULO_USER_ARG)
.create());
accumuloOpts.addOption(OptionBuilder.withArgName("password")
.hasArg()
.withDescription("Accumulo password.")
.withLongOpt(ACCUMULO_PASSWORD_ARG)
.create());
return accumuloOpts;
}
protected void applyAccumuloOptions(CommandLine in, SqoopOptions out) {
if (in.hasOption(ACCUMULO_TABLE_ARG)) {
out.setAccumuloTable(in.getOptionValue(ACCUMULO_TABLE_ARG));
}
if (in.hasOption(ACCUMULO_COL_FAM_ARG)) {
out.setAccumuloColFamily(in.getOptionValue(ACCUMULO_COL_FAM_ARG));
}
if (in.hasOption(ACCUMULO_ROW_KEY_ARG)) {
out.setAccumuloRowKeyColumn(in.getOptionValue(ACCUMULO_ROW_KEY_ARG));
}
if (in.hasOption(ACCUMULO_VISIBILITY_ARG)) {
out.setAccumuloVisibility(in.getOptionValue(ACCUMULO_VISIBILITY_ARG));
}
if (in.hasOption(ACCUMULO_CREATE_TABLE_ARG)) {
out.setCreateAccumuloTable(true);
}
if (in.hasOption(ACCUMULO_BATCH_SIZE_ARG)) {
out.setAccumuloBatchSize(Long.parseLong(
in.getOptionValue(ACCUMULO_BATCH_SIZE_ARG)));
}
if (in.hasOption(ACCUMULO_MAX_LATENCY_ARG)) {
out.setAccumuloMaxLatency(Long.parseLong(
in.getOptionValue(ACCUMULO_MAX_LATENCY_ARG)));
}
if (in.hasOption(ACCUMULO_ZOOKEEPERS_ARG)) {
out.setAccumuloZookeepers(in.getOptionValue(ACCUMULO_ZOOKEEPERS_ARG));
}
if (in.hasOption(ACCUMULO_INSTANCE_ARG)) {
out.setAccumuloInstance(in.getOptionValue(ACCUMULO_INSTANCE_ARG));
}
if (in.hasOption(ACCUMULO_USER_ARG)) {
out.setAccumuloUser(in.getOptionValue(ACCUMULO_USER_ARG));
}
if (in.hasOption(ACCUMULO_PASSWORD_ARG)) {
out.setAccumuloPassword(in.getOptionValue(ACCUMULO_PASSWORD_ARG));
}
}
@SuppressWarnings("static-access")
protected void addValidationOpts(RelatedOptions validationOptions) {
validationOptions.addOption(OptionBuilder
@ -1256,6 +1380,52 @@ protected void validateHiveOptions(SqoopOptions options)
}
}
protected void validateAccumuloOptions(SqoopOptions options)
throws InvalidOptionsException {
if ((options.getAccumuloColFamily() != null
&& options.getAccumuloTable() == null)
|| (options.getAccumuloColFamily() == null
&& options.getAccumuloTable() != null)) {
throw new InvalidOptionsException(
"Both --accumulo-table and --accumulo-column-family must be set."
+ HELP_STR);
}
if (options.getAccumuloTable() != null && options.isDirect()) {
throw new InvalidOptionsException("Direct import is incompatible with "
+ "Accumulo. Please remove parameter --direct");
}
if (options.getAccumuloTable() != null
&& options.getHBaseTable() != null) {
throw new InvalidOptionsException("HBase import is incompatible with "
+ "Accumulo import.");
}
if (options.getAccumuloTable() != null
&& options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
throw new InvalidOptionsException("Accumulo import is not compatible "
+ "with importing into file format.");
}
if (options.getAccumuloTable() != null
&& options.getHBaseColFamily() != null) {
throw new InvalidOptionsException("Use --accumulo-column-family with "
+ "Accumulo import.");
}
if (options.getAccumuloTable() != null
&& options.getAccumuloUser() == null) {
throw
new InvalidOptionsException("Must specify Accumulo user.");
}
if (options.getAccumuloTable() != null
&& options.getAccumuloInstance() == null) {
throw new
InvalidOptionsException("Must specify Accumulo instance.");
}
if (options.getAccumuloTable() != null
&& options.getAccumuloZookeepers() == null) {
throw new
InvalidOptionsException("Must specify Zookeeper server(s).");
}
}
protected void validateHCatalogOptions(SqoopOptions options)
throws InvalidOptionsException {
// Make sure that one of hCatalog or hive jobs are used

View File

@ -685,6 +685,7 @@ public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getHBaseOptions());
toolOptions.addUniqueOptions(getHCatalogOptions());
toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
toolOptions.addUniqueOptions(getAccumuloOptions());
// get common codegen opts.
RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
@ -856,6 +857,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
applyCodeGenOptions(in, out, allTables);
applyHBaseOptions(in, out);
applyHCatOptions(in, out);
applyAccumuloOptions(in, out);
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Error: expected numeric argument.\n"
@ -890,7 +892,9 @@ protected void validateImportOptions(SqoopOptions options)
"Cannot specify --" + SQL_QUERY_ARG + " and --table together."
+ HELP_STR);
} else if (options.getSqlQuery() != null
&& options.getTargetDir() == null && options.getHBaseTable() == null) {
&& options.getTargetDir() == null
&& options.getHBaseTable() == null
&& options.getAccumuloTable() == null) {
throw new InvalidOptionsException(
"Must specify destination with --target-dir."
+ HELP_STR);
@ -987,6 +991,7 @@ public void validateOptions(SqoopOptions options)
validateHBaseOptions(options);
validateHiveOptions(options);
validateHCatalogOptions(options);
validateAccumuloOptions(options);
}
}

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
/**
* Utility methods that facilitate Accumulo import tests.
* These test use the MiniAccumuloCluster. They are
* absolutely not thread safe.
*/
public abstract class AccumuloTestCase extends ImportJobTestCase {
private static final String ACCUMULO_USER="root";
private static final String ACCUMULO_PASSWORD="rootroot";
/*
* This is to restore test.build.data system property which gets reset
* when Accumulo tests are run. Since other tests in Sqoop also depend upon
* this property, they can fail if are run subsequently in the same VM.
*/
private static String testBuildDataProperty = "";
private static void recordTestBuildDataProperty() {
testBuildDataProperty = System.getProperty("test.build.data", "");
}
private static void restoreTestBuidlDataProperty() {
System.setProperty("test.build.data", testBuildDataProperty);
}
public static final Log LOG = LogFactory.getLog(
AccumuloTestCase.class.getName());
protected static MiniAccumuloCluster accumuloCluster;
protected static File tempDir;
/**
* Create the argv to pass to Sqoop.
* @return the argv as an array of strings.
*/
protected String [] getArgv(String accumuloTable,
String accumuloColFam, boolean accumuloCreate,
String queryStr) {
ArrayList<String> args = new ArrayList<String>();
if (null != queryStr) {
args.add("--query");
args.add(queryStr);
} else {
args.add("--table");
args.add(getTableName());
}
args.add("--split-by");
args.add(getColName(0));
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--num-mappers");
args.add("1");
args.add("--accumulo-column-family");
args.add(accumuloColFam);
args.add("--accumulo-table");
args.add(accumuloTable);
if (accumuloCreate) {
args.add("--accumulo-create-table");
}
args.add("--accumulo-instance");
args.add(accumuloCluster.getInstanceName());
args.add("--accumulo-zookeepers");
args.add(accumuloCluster.getZooKeepers());
args.add("--accumulo-user");
args.add(ACCUMULO_USER);
args.add("--accumulo-password");
args.add(ACCUMULO_PASSWORD);
return args.toArray(new String[0]);
}
protected static void setUpCluster() throws Exception {
File temp = File.createTempFile("test", "tmp");
tempDir = new File(temp.getParent(), "accumulo"
+ System.currentTimeMillis());
tempDir.mkdir();
tempDir.deleteOnExit();
temp.delete();
accumuloCluster = new MiniAccumuloCluster(tempDir, ACCUMULO_PASSWORD);
accumuloCluster.start();
}
protected static void cleanUpCluster() throws Exception {
accumuloCluster.stop();
delete(tempDir);
}
protected static void delete(File dir) {
if (dir.isDirectory()) {
File[] kids = dir.listFiles();
for (File f : kids) {
if (f.isDirectory()) {
delete(f);
} else {
f.delete();
}
}
}
dir.delete();
}
@Override
@Before
public void setUp() {
try {
setUpCluster();
} catch (Exception e) {
LOG.error("Error setting up MiniAccumuloCluster.", e);
}
AccumuloTestCase.recordTestBuildDataProperty();
super.setUp();
}
@Override
@After
public void tearDown() {
super.tearDown();
try {
cleanUpCluster();
} catch (Exception e) {
LOG.error("Error stopping MiniAccumuloCluster.", e);
}
}
protected void verifyAccumuloCell(String tableName, String rowKey,
String colFamily, String colName, String val) throws IOException {
try {
Instance inst = new ZooKeeperInstance(accumuloCluster.getInstanceName(),
accumuloCluster.getZooKeepers());
Connector conn = inst.getConnector(ACCUMULO_USER,
new PasswordToken(ACCUMULO_PASSWORD));
Scanner scanner = conn.createScanner(tableName, Constants.NO_AUTHS);
scanner.setRange(new Range(rowKey));
Iterator<Entry<Key, Value>> iter = scanner.iterator();
while (iter.hasNext()) {
Entry<Key, Value> entry = iter.next();
String columnFamily = entry.getKey().getColumnFamily().toString();
String qual = entry.getKey().getColumnQualifier().toString();
if (columnFamily.equals(colFamily)
&& qual.equals(colName)) {
String value = entry.getValue().toString();
if (null == val) {
assertNull("Got a result when expected null", value);
} else {
assertNotNull("No result, but we expected one", value);
assertEquals(val, value);
}
}
}
} catch (AccumuloException e) {
throw new IOException("AccumuloException in verifyAccumuloCell", e);
} catch (AccumuloSecurityException e) {
throw new IOException("AccumuloSecurityException in verifyAccumuloCell",
e);
} catch (TableNotFoundException e) {
throw new IOException("TableNotFoundException in verifyAccumuloCell", e);
}
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.IOException;
import org.junit.Test;
/**
* Test imports of tables into Accumulo.
*/
public class TestAccumuloImport extends AccumuloTestCase {
@Test
public void testBasicUsage() throws IOException {
String [] argv = getArgv("BasicUsage", "BasicColFam", true, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyAccumuloCell("BasicUsage", "0", "BasicColFam", getColName(1), "1");
}
@Test
public void testMissingTableFails() throws IOException {
// Test that if the table doesn't exist, we fail unless we
// explicitly create the table.
String [] argv = getArgv("MissingTable", "MissingFam", false, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
try {
runImport(argv);
fail("Expected Exception");
} catch (IOException e) {
LOG.info("Got exception -- ok; we expected that job to fail.");
}
}
@Test
public void testOverwriteSucceeds() throws IOException {
// Test that we can create a table and then import immediately
// back on top of it without problem.
String [] argv = getArgv("OverwriteT", "OverwriteF", true, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyAccumuloCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
// Run a second time.
runImport(argv);
verifyAccumuloCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
}
@Test
public void testStrings() throws IOException {
String [] argv = getArgv("stringT", "stringF", true, null);
String [] types = { "INT", "VARCHAR(32)" };
String [] vals = { "0", "'abc'" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyAccumuloCell("stringT", "0", "stringF", getColName(1), "abc");
}
@Test
public void testNulls() throws IOException {
String [] argv = getArgv("nullT", "nullF", true, null);
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "null" };
createTableWithColTypes(types, vals);
runImport(argv);
// This cell should import correctly.
verifyAccumuloCell("nullT", "0", "nullF", getColName(1), "42");
// This cell should not be placed in the results..
verifyAccumuloCell("nullT", "0", "nullF", getColName(2), null);
}
@Test
public void testExitFailure() throws IOException {
String [] argv = getArgv("NoAccumuloT", "NoAccumuloF", true, null);
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
createTableWithColTypes(types, vals);
try {
AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
runImport(argv);
fail("should have gotten exception");
} catch (IOException e) {
// Got the exception, so we're happy
LOG.info("Got exception -- ok; we expected that to fail.");
} finally {
AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.IOException;
import org.junit.Test;
/**
* Test import of free-form query into Accumulo.
*/
public class TestAccumuloQueryImport extends AccumuloTestCase {
@Test
public void testImportFromQuery() throws IOException {
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
createTableWithColTypes(types, vals);
String [] argv = getArgv("queryT", "queryF", true,
"SELECT " + getColName(0) + ", " + getColName(1) + " FROM "
+ getTableName() + " WHERE $CONDITIONS");
runImport(argv);
// This cell should import correctly.
verifyAccumuloCell("queryT", "0", "queryF", getColName(1), "42");
// This cell should not be placed in the results..
verifyAccumuloCell("queryT", "0", "queryF", getColName(2), null);
}
@Test
public void testExitFailure() throws IOException {
String [] argv = getArgv("NoAccumuloT", "NoAccumuloF", true, null);
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
createTableWithColTypes(types, vals);
try {
AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
runImport(argv);
fail("should have gotten exception");
} catch (IOException e) {
// Got the exception, so we're happy
LOG.info("Got exception -- ok; we expected that to fail.");
} finally {
AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
}
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import org.junit.Test;
import junit.framework.TestCase;
/**
* This tests to verify that Accumulo is present (default when running
* test cases) and that when in fake not present mode, the method returns
* false.
*/
public class TestAccumuloUtil extends TestCase {
@Test
public void testAccumuloPresent() {
assertTrue(AccumuloUtil.isAccumuloJarPresent());
}
@Test
public void testAccumuloNotPresent() {
AccumuloUtil.setAlwaysNoAccumuloJarMode(true);
boolean present = AccumuloUtil.isAccumuloJarPresent();
AccumuloUtil.setAlwaysNoAccumuloJarMode(false);
assertFalse(present);
}
}