5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 03:39:29 +08:00

SQOOP-14. Allow Sqoop to import data into HBase.

Added FieldMappable and FieldMapProcessor interfaces.
Added ProcessingException class.
Added NullOutputCommitter class.
SqoopRecord now has delegate() method which calls a FieldMapProcessor.
ClassWriter now generates getFieldMap() method for SqoopRecords.
Added HBasePutProcessor to transform SqoopRecords into Put commands,
implementing FieldMapProcessor.
Added PutTransformer interface class and ToStringPutTransformer implementation.
Added DelegatingOutputFormat that uses a FieldMapProcessor.
Added HBase deps to build.xml via hbase.home property.
Added HBase, ZooKeeper to the dependency net added by configure-sqoop.
Added HBaseImportJob, HBaseImportMapper.
ImportJobBase now has jobSetup() step executed just before job submission.
ImportJobContext now holds a reference to the ConnManager.
DataDrivenImportJob retrieves ConnManager from ImportJobContext, it no longer
creates a new one.
Added HBase table import configuration parameters to SqoopOptions, ImportTool.
SqlManager.importQuery() needs to set ConnManager in ImportJobContext.
Added HBase import user documentation.
Described PutTransformer API in developer docs.
Added HBase unit tests.
Added ANT_ARGUMENTS env variable to Hudson test scripts to allow freeform parameters.
Added HBASE_HOME and ZOOKEEPER_HOME variables to hudson scripts.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:04:03 +00:00
parent 7e473cafa0
commit 7e0ccb4008
39 changed files with 1605 additions and 32 deletions

View File

@ -29,10 +29,35 @@ if [ -z "$SQOOP_HOME" ]; then
export SQOOP_HOME=${bin}/..
fi
# Find paths to our dependency systems. If they are unset, use CDH defaults.
if [ -z "${HADOOP_HOME}" ]; then
# Try CDH default if the user hasn't set this.
HADOOP_HOME=/usr/lib/hadoop
fi
if [ -z "${HBASE_HOME}" ]; then
HBASE_HOME=/usr/lib/hbase
fi
if [ -z "${ZOOKEEPER_HOME}" ]; then
ZOOKEEPER_HOME=/usr/lib/zookeeper
fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_HOME}" ]; then
echo "Error: $HADOOP_HOME does not exist!"
echo "Please set $$HADOOP_HOME to the root of your Hadoop installation."
exit 1
fi
if [ ! -d "${HBASE_HOME}" ]; then
echo "Error: $HBASE_HOME does not exist!"
echo "Please set $$HBASE_HOME to the root of your HBase installation."
exit 1
fi
if [ ! -d "${ZOOKEEPER_HOME}" ]; then
echo "Error: $ZOOKEEPER_HOME does not exist!"
echo "Please set $$ZOOKEEPER_HOME to the root of your ZooKeeper installation."
exit 1
fi
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@ -50,14 +75,31 @@ if [ -d "$SQOOP_JAR_DIR/build" ]; then
fi
fi
function add_to_classpath() {
dir=$1
for f in $dir/*.jar; do
SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f;
done
export SQOOP_CLASSPATH
}
# Add sqoop dependencies to classpath.
SQOOP_CLASSPATH=""
if [ -d "$SQOOP_HOME/lib" ]; then
for f in $SQOOP_HOME/lib/*.jar; do
SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f;
done
add_to_classpath $SQOOP_HOME/lib
fi
# Add HBase to dependency list
add_to_classpath $HBASE_HOME
add_to_classpath $HBASE_HOME/lib
HBASE_CONF_DIR=${HBASE_CONF_DIR:-${HBASE_HOME}/conf}
SQOOP_CLASSPATH=${HBASE_CONF_DIR}:${SQOOP_CLASSPATH}
add_to_classpath $ZOOKEEPER_HOME
add_to_classpath $ZOOKEEPER_HOME/lib
SQOOP_CONF_DIR=${SQOOP_CONF_DIR:-${SQOOP_HOME}/conf}
SQOOP_CLASSPATH=${SQOOP_CONF_DIR}:${SQOOP_CLASSPATH}
@ -74,5 +116,6 @@ export SQOOP_SHIM_DIR
export SQOOP_JAR=`ls -1 ${SQOOP_JAR_DIR}/sqoop-*.jar | head -n 1`
export HADOOP_CLASSPATH="${SQOOP_CLASSPATH}:${HADOOP_CLASSPATH}"
export HADOOP_HOME
export HBASE_HOME
export HADOOP_OPTS="-Dsqoop.shim.jar.dir=${SQOOP_SHIM_DIR} ${HADOOP_OPTS}"

View File

@ -149,6 +149,18 @@
</classpath>
</taskdef>
<!-- manually-specified HBase classpath til it works with Ivy. -->
<path id="hbase.classpath">
<fileset dir="${hbase.home}">
<include name="*.jar" />
<include name="lib/*.jar" />
</fileset>
<fileset dir="${zookeeper.home}">
<include name="*.jar" />
<include name="lib/*.jar" />
</fileset>
</path>
<!-- The classpath for compiling and running Sqoop -->
<if>
<isset property="hadoop.home" />
@ -167,6 +179,7 @@
<include name="*.jar" />
</fileset>
<path refid="${name}.hadoop.classpath"/>
<path refid="hbase.classpath"/>
</path>
</then>
<else>
@ -174,6 +187,7 @@
<pathelement location="${build.classes}"/>
<path refid="lib.path"/>
<path refid="${name}.hadoop.classpath"/>
<path refid="hbase.classpath"/>
</path>
</else>
</if>

View File

@ -118,6 +118,10 @@
conf="common->master" />
<dependency org="junit" name="junit" rev="${junit.version}"
conf="common->default"/>
<!--
<dependency org="org.apache.hbase" name="hbase" rev="${hbase.version}"
conf="common->default" />
-->
<dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}"
conf="common->default"/>
<dependency org="commons-io" name="commons-io" rev="${commons-io.version}"

View File

@ -37,6 +37,8 @@ hadoop-mapred.apache21.version=0.21.0-SNAPSHOT
# Cloudera Distribution dependency version
hadoop-core.cloudera.version=0.20.2-314
hbase.version=0.89.20100621
hsqldb.version=1.8.0.10
ivy.version=2.0.0-rc2

View File

@ -137,6 +137,41 @@ Extension authors may make use of classes in the +com.cloudera.sqoop.io+,
These packages and classes are described in more detail in the following
section.
HBase Serialization Extensions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop supports imports from databases to HBase. When copying data into
HBase, it must be transformed into a format HBase can accept. Specifically:
* Data must be placed into one (or more) tables in HBase.
* Columns of input data must be placed into a column family.
* Values must be serialized to byte arrays to put into cells.
All of this is done via +Put+ statements in the HBase client API.
Sqoop's interaction with HBase is performed in the +com.cloudera.sqoop.hbase+
package. Records are deserialzed from the database and emitted from the mapper.
The OutputFormat is responsible for inserting the results into HBase. This is
done through an interface called +PutTransformer+. The +PutTransformer+
has a method called +getPutCommand()+ that
takes as input a +Map<String, Object>+ representing the fields of the dataset.
It returns a +List<Put>+ describing how to insert the cells into HBase.
The default +PutTransformer+ implementation is the +ToStringPutTransformer+
that uses the string-based representation of each field to serialize the
fields to HBase.
You can override this implementation by implementing your own +PutTransformer+
and adding it to the classpath for the map tasks (e.g., with the +-libjars+
option). To tell Sqoop to use your implementation, set the
+sqoop.hbase.insert.put.transformer.class+ property to identify your class
with +-D+.
Within your PutTransformer implementation, the specified row key
column and column family are
available via the +getRowKeyColumn()+ and +getColumnFamily()+ methods.
You are free to make additional Put operations outside these constraints;
for example, to inject additional rows representing a secondary index.
However, Sqoop will execute all +Put+ operations against the table
specified with +\--hbase-table+.
Sqoop Internals
~~~~~~~~~~~~~~~

View File

@ -0,0 +1,35 @@
////
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
HBase options
~~~~~~~~~~~~~
--column-family (family)::
Sets the target column family for the import
--hbase-create-table::
If specified, create missing HBase tables
--hbase-row-key (col)::
Specifies which input column to use as the row key
--hbase-table (table-name)::
Specifies an HBase table to use as the target instead of HDFS

View File

@ -82,6 +82,8 @@ include::input-args.txt[]
include::hive-args.txt[]
include::hbase-args.txt[]
include::codegen-args.txt[]
Database-specific options

View File

@ -0,0 +1,32 @@
////
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
.HBase arguments:
[grid="all"]
`-----------------------------`-------------------------------------------
Argument Description
--------------------------------------------------------------------------
+\--column-family <family>+ Sets the target column family for the import
+\--hbase-create-table+ If specified, create missing HBase tables
+\--hbase-row-key <col>+ Specifies which input column to use as the\
row key
+\--hbase-table <table-name>+ Specifies an HBase table to use as the \
target instead of HDFS
--------------------------------------------------------------------------

49
src/docs/user/hbase.txt Normal file
View File

@ -0,0 +1,49 @@
////
Licensed to Cloudera, Inc. under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
Importing Data Into HBase
^^^^^^^^^^^^^^^^^^^^^^^^^
Sqoop supports additional import targets beyond HDFS and Hive. Sqoop
can also import records into a table in HBase.
By specifying +\--hbase-table+, you instruct Sqoop to import
to a table in HBase rather than a directory in HDFS. Sqoop will
import data to the table specified as the argument to +\--hbase-table+.
Each row of the input table will be transformed into an HBase
+Put+ operation to a row of the output table. The key for each row is
taken from a column of the input. By default Sqoop will use the split-by
column as the row key column. If that is not specified, it will try to
identify the primary key column, if any, of the source table. You can
manually specify the row key column with +\--hbase-row-key+. Each output
column will be placed in the same column family, which must be specified
with +\--column-family+.
If the target table and column family do not exist, the Sqoop job will
exit with an error. You should create the target table and column family
before running an import. If you specify +\--hbase-create-table+, Sqoop
will create the target table and column family if they do not exist,
using the default parameters from your HBase configuration.
Sqoop currently serializes all values to HBase by converting each field
to its string representation (as if you were importing to HDFS in text
mode), and then inserts the UTF-8 bytes of this string in the target
cell.

View File

@ -412,6 +412,9 @@ include::hive-args.txt[]
include::hive.txt[]
include::hbase-args.txt[]
include::hbase.txt[]
include::codegen-args.txt[]
As mentioned earlier, a byproduct of importing a table to HDFS is a

View File

@ -137,6 +137,11 @@ public enum FileLayout {
private String [] extraArgs;
private String hbaseTable; // HBase table to import into.
private String hbaseColFamily; // Column family to prepend to inserted cols.
private String hbaseRowKeyCol; // Column of the input to use as the row key.
private boolean hbaseCreateTable; // if true, create tables/col families.
public SqoopOptions() {
initDefaults(null);
}
@ -1006,5 +1011,63 @@ public void setDbOutputColumns(String [] outCols) {
this.dbOutColumns = Arrays.copyOf(outCols, outCols.length);
}
}
/**
* Set whether we should create missing HBase tables.
*/
public void setCreateHBaseTable(boolean create) {
this.hbaseCreateTable = create;
}
/**
* Returns true if we should create HBase tables/column families
* that are missing.
*/
public boolean getCreateHBaseTable() {
return this.hbaseCreateTable;
}
/**
* Sets the HBase target column family.
*/
public void setHBaseColFamily(String colFamily) {
this.hbaseColFamily = colFamily;
}
/**
* Gets the HBase import target column family.
*/
public String getHBaseColFamily() {
return this.hbaseColFamily;
}
/**
* Gets the column to use as the row id in an hbase import.
* If null, use the primary key column.
*/
public String getHBaseRowKeyColumn() {
return this.hbaseRowKeyCol;
}
/**
* Sets the column to use as the row id in an hbase import.
*/
public void setHBaseRowKeyColumn(String col) {
this.hbaseRowKeyCol = col;
}
/**
* Gets the target HBase table name, if any.
*/
public String getHBaseTable() {
return this.hbaseTable;
}
/**
* Sets the target HBase table name for an import.
*/
public void setHBaseTable(String table) {
this.hbaseTable = table;
}
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.util.ReflectionUtils;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.ProcessingException;
/**
* SqoopRecordProcessor that performs an HBase "put" operation
* that contains all the fields of the record.
*/
public class HBasePutProcessor implements Closeable, Configurable,
FieldMapProcessor {
/** Configuration key specifying the table to insert into. */
public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
/** Configuration key specifying the column family to insert into. */
public static final String COL_FAMILY_KEY =
"sqoop.hbase.insert.column.family";
/** Configuration key specifying the column of the input whose value
* should be used as the row id.
*/
public static final String ROW_KEY_COLUMN_KEY =
"sqoop.hbase.insert.row.key.column";
/**
* Configuration key specifying the PutTransformer implementation to use.
*/
public static final String TRANSFORMER_CLASS_KEY =
"sqoop.hbase.insert.put.transformer.class";
private Configuration conf;
// An object that can transform a map of fieldName->object
// into a Put command.
private PutTransformer putTransformer;
private String tableName;
private HTable table;
public HBasePutProcessor() {
}
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration config) {
this.conf = config;
// Get the implementation of PutTransformer to use.
// By default, we call toString() on every non-null field.
Class<? extends PutTransformer> xformerClass =
(Class<? extends PutTransformer>)
this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
this.putTransformer = (PutTransformer)
ReflectionUtils.newInstance(xformerClass, this.conf);
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
this.tableName = conf.get(TABLE_NAME_KEY, null);
try {
this.table = new HTable(conf, this.tableName);
} catch (IOException ioe) {
throw new RuntimeException("Could not access HBase table " + tableName,
ioe);
}
this.table.setAutoFlush(false);
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
/**
* Processes a record by extracting its field map and converting
* it into a list of Put commands into HBase.
*/
public void accept(FieldMappable record)
throws IOException, ProcessingException {
Map<String, Object> fields = record.getFieldMap();
List<Put> putList = putTransformer.getPutCommand(fields);
if (null != putList) {
for (Put put : putList) {
this.table.put(put);
}
}
}
@Override
/**
* Closes the HBase table and commits all pending operations.
*/
public void close() throws IOException {
this.table.flushCommits();
this.table.close();
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.Put;
/**
* Interface that takes a map of jdbc field names to values
* and converts them to a Put command for HBase.
*/
public abstract class PutTransformer {
public PutTransformer() {
}
private String columnFamily;
private String rowKeyColumn;
/**
* @return the default column family to insert into.
*/
public String getColumnFamily() {
return this.columnFamily;
}
/**
* Set the default column family to insert into.
*/
public void setColumnFamily(String colFamily) {
this.columnFamily = colFamily;
}
/**
* @return the field name identifying the value to use as the row id.
*/
public String getRowKeyColumn() {
return this.rowKeyColumn;
}
/**
* Set the column of the input fields which should be used to calculate
* the row id.
*/
public void setRowKeyColumn(String rowKeyCol) {
this.rowKeyColumn = rowKeyCol;
}
/**
* Returns a list of Put commands that inserts the fields into a row in HBase.
* @param fields a map of field names to values to insert.
* @return A list of Put commands that inserts these into HBase.
*/
public abstract List<Put> getPutCommand(Map<String, Object> fields)
throws IOException;
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
/**
* PutTransformer that calls toString on all non-null fields.
*/
public class ToStringPutTransformer extends PutTransformer {
public static final Log LOG = LogFactory.getLog(
ToStringPutTransformer.class.getName());
// A mapping from field name -> bytes for that field name.
// Used to cache serialization work done for fields names.
private Map<String, byte[]> serializedFieldNames;
public ToStringPutTransformer() {
serializedFieldNames = new TreeMap<String, byte[]>();
}
/**
* Return the serialized bytes for a field name, using
* the cache if it's already in there.
*/
private byte [] getFieldNameBytes(String fieldName) {
byte [] cachedName = serializedFieldNames.get(fieldName);
if (null != cachedName) {
// Cache hit. We're done.
return cachedName;
}
// Do the serialization and memoize the result.
byte [] nameBytes = Bytes.toBytes(fieldName);
serializedFieldNames.put(fieldName, nameBytes);
return nameBytes;
}
@Override
/** {@inheritDoc} */
public List<Put> getPutCommand(Map<String, Object> fields)
throws IOException {
String rowKeyCol = getRowKeyColumn();
String colFamily = getColumnFamily();
byte [] colFamilyBytes = Bytes.toBytes(colFamily);
Object rowKey = fields.get(rowKeyCol);
if (null == rowKey) {
// If the row-key column is null, we don't insert this row.
LOG.warn("Could not insert row with null value for row-key column: "
+ rowKeyCol);
return null;
}
Put put = new Put(Bytes.toBytes(rowKey.toString()));
for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
String colName = fieldEntry.getKey();
if (!colName.equals(rowKeyCol)) {
// This is a regular field, not the row key.
// Add it if it's not null.
Object val = fieldEntry.getValue();
if (null != val) {
put.add(colFamilyBytes, getFieldNameBytes(colName),
Bytes.toBytes(val.toString()));
}
}
}
return Collections.singletonList(put);
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.lib;
import java.io.IOException;
/**
* Interface implemented by classes that process FieldMappable objects.
*/
public interface FieldMapProcessor {
/**
* Allow arbitrary processing of a FieldMappable object.
* @param record an object which can emit a map of its field names to values.
* @throws IOException if the processor encounters an IO error when
* operating on this object.
* @throws ProcessingException if the FieldMapProcessor encounters
* a general processing error when operating on this object.
*/
void accept(FieldMappable record) throws IOException, ProcessingException;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.lib;
import java.util.Map;
/**
* Interface describing a class capable of returning a map of the fields
* of the object to their values.
*/
public interface FieldMappable {
/**
* Returns a map containing all fields of this record.
* @return a map from column names to the object-based values for
* this record. The map may not be null, though it may be empty.
*/
Map<String, Object> getFieldMap();
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.lib;
/**
* General error during processing of a SqoopRecord.
*/
@SuppressWarnings("serial")
public class ProcessingException extends Exception {
public ProcessingException() {
super("ProcessingException");
}
public ProcessingException(final String message) {
super(message);
}
public ProcessingException(final Throwable cause) {
super(cause);
}
public ProcessingException(final String message, final Throwable cause) {
super(message, cause);
}
@Override
public String toString() {
String msg = getMessage();
return (null == msg) ? "ProcessingException" : msg;
}
}

View File

@ -23,6 +23,7 @@
import java.nio.CharBuffer;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -31,7 +32,8 @@
/**
* Interface implemented by the classes generated by sqoop's orm.ClassWriter.
*/
public abstract class SqoopRecord implements Cloneable, DBWritable, Writable {
public abstract class SqoopRecord implements Cloneable, DBWritable,
FieldMappable, Writable {
public SqoopRecord() {
}
@ -67,5 +69,32 @@ public Object clone() throws CloneNotSupportedException {
* @return the API version this class was generated against.
*/
public abstract int getClassFormatVersion();
/**
* Use the delegate pattern to allow arbitrary processing of the
* fields of this record.
* @param processor A delegate that operates on this object.
* @throws IOException if the processor encounters an IO error when
* operating on this object.
* @throws ProcessingException if the FieldMapProcessor encounters
* a general processing error when operating on this object.
*/
public void delegate(FieldMapProcessor processor)
throws IOException, ProcessingException {
processor.accept(this);
}
@Override
/**
* {@inheriDoc}
* @throws RuntimeException if used with a record that was generated
* before this capability was added (1.1.0).
*/
public Map<String, Object> getFieldMap() {
// Default implementation does not support field iteration.
// ClassWriter should provide an overriding version.
throw new RuntimeException(
"Got null field map from record. Regenerate your record class.");
}
}

View File

@ -34,6 +34,7 @@ public class ImportJobContext {
private SqoopOptions options;
private Class<? extends InputFormat> inputFormatClass;
private Path destination;
private ConnManager manager;
public ImportJobContext(final String table, final String jar,
final SqoopOptions opts, final Path destination) {
@ -79,5 +80,21 @@ public Path getDestination() {
return this.destination;
}
/**
* Set the ConnManager instance to be used during the import's
* configuration.
*/
public void setConnManager(ConnManager mgr) {
this.manager = mgr;
}
/**
* Get the ConnManager instance to use during an import's
* configuration stage.
*/
public ConnManager getConnManager() {
return this.manager;
}
}

View File

@ -291,6 +291,7 @@ private void setSessionTimeZone(Connection conn) throws SQLException {
@Override
public void importTable(ImportJobContext context)
throws IOException, ImportException {
context.setConnManager(this);
// Specify the Oracle-specific DBInputFormat for import.
context.setInputFormat(OracleDataDrivenDBInputFormat.class);
super.importTable(context);

View File

@ -23,6 +23,8 @@
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
import com.cloudera.sqoop.mapreduce.HBaseImportJob;
import com.cloudera.sqoop.mapreduce.ImportJobBase;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
import com.cloudera.sqoop.util.ExportException;
@ -341,8 +343,17 @@ public void importTable(ImportJobContext context)
String jarFile = context.getJarFile();
SqoopOptions opts = context.getOptions();
DataDrivenImportJob importer =
new DataDrivenImportJob(opts, context.getInputFormat(), context);
context.setConnManager(this);
ImportJobBase importer;
if (opts.getHBaseTable() != null) {
// Import to HBase.
importer = new HBaseImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
context);
}
String splitCol = getSplitColumn(opts, tableName);
if (null == splitCol && opts.getNumMappers() > 1) {
@ -365,8 +376,17 @@ public void importQuery(ImportJobContext context)
String jarFile = context.getJarFile();
SqoopOptions opts = context.getOptions();
DataDrivenImportJob importer =
new DataDrivenImportJob(opts, context.getInputFormat(), context);
context.setConnManager(this);
ImportJobBase importer;
if (opts.getHBaseTable() != null) {
// Import to HBase.
importer = new HBaseImportJob(opts, context);
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
context);
}
String splitCol = getSplitColumn(opts, null);
if (null == splitCol && opts.getNumMappers() > 1) {

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import com.cloudera.sqoop.ConnFactory;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.lib.LargeObjectLoader;
@ -105,7 +104,7 @@ protected Class<? extends OutputFormat> getOutputFormatClass()
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
ConnManager mgr = new ConnFactory(options.getConf()).getManager(options);
ConnManager mgr = getContext().getConnManager();
try {
String username = options.getUsername();
if (null == username || username.length() == 0) {

View File

@ -0,0 +1,183 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.hbase.HBasePutProcessor;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.shims.ShimLoader;
import com.cloudera.sqoop.util.ImportException;
/**
* Runs an HBase import via DataDrivenDBInputFormat to the HBasePutProcessor
* in the DelegatingOutputFormat.
*/
public class HBaseImportJob extends DataDrivenImportJob {
public static final Log LOG = LogFactory.getLog(
HBaseImportJob.class.getName());
public HBaseImportJob(final SqoopOptions opts,
final ImportJobContext importContext) {
super(opts, DataDrivenDBInputFormat.class, importContext);
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setOutputKeyClass(SqoopRecord.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(getMapperClass());
}
@Override
protected Class<? extends Mapper> getMapperClass() {
return HBaseImportMapper.class;
}
@Override
protected Class<? extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat>) ShimLoader.getShimClass(
"com.cloudera.sqoop.mapreduce.DelegatingOutputFormat");
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
// Use the DelegatingOutputFormat with the HBasePutProcessor.
job.setOutputFormatClass(getOutputFormatClass());
Configuration conf = job.getConfiguration();
conf.setClass("sqoop.output.delegate.field.map.processor.class",
HBasePutProcessor.class,
FieldMapProcessor.class);
// Set the HBase parameters (table, column family, row key):
conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
// What column of the input becomes the row key?
String rowKeyCol = options.getHBaseRowKeyColumn();
if (null == rowKeyCol) {
// User didn't explicitly set one. If there's a split-by column set,
// use that.
rowKeyCol = options.getSplitByCol();
}
if (null == rowKeyCol) {
// No split-by column is explicitly set.
// If the table has a primary key, use that.
ConnManager manager = getContext().getConnManager();
rowKeyCol = manager.getPrimaryKey(tableName);
}
if (null == rowKeyCol) {
// Give up here if this is still unset.
throw new IOException("Could not determine the row-key column. "
+ "Use --hbase-row-key to specify the input column that "
+ "names each row.");
}
conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol);
}
@Override
/** Create the target HBase table before running the job. */
protected void jobSetup(Job job) throws IOException, ImportException {
Configuration conf = job.getConfiguration();
String tableName = conf.get(HBasePutProcessor.TABLE_NAME_KEY);
String familyName = conf.get(HBasePutProcessor.COL_FAMILY_KEY);
if (null == tableName) {
throw new ImportException(
"Import to HBase error: Table name not specified");
}
if (null == familyName) {
throw new ImportException(
"Import to HBase error: Column family not specified");
}
// Add HBase configuration files to this conf object.
HBaseConfiguration.addHbaseResources(conf);
HBaseAdmin admin = new HBaseAdmin(conf);
// Check to see if the table exists.
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
byte [] familyBytes = Bytes.toBytes(familyName);
HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
if (!admin.tableExists(tableName)) {
if (options.getCreateHBaseTable()) {
// Create the table.
LOG.info("Creating missing HBase table " + tableName);
tableDesc.addFamily(colDesc);
admin.createTable(tableDesc);
} else {
LOG.warn("Could not find HBase table " + tableName);
LOG.warn("This job may fail. Either explicitly create the table,");
LOG.warn("or re-run with --hbase-create-table.");
}
} else if (!tableDesc.hasFamily(familyBytes)) {
if (options.getCreateHBaseTable()) {
// Create the column family.
LOG.info("Creating missing column family " + familyName);
admin.disableTable(tableName);
admin.addColumn(tableName, colDesc);
admin.enableTable(tableName);
} else {
LOG.warn("Could not find column family " + familyName + " in table "
+ tableName);
LOG.warn("This job may fail. Either create the column family,");
LOG.warn("or re-run with --hbase-create-table.");
}
}
// Make sure HBase libraries are shipped as part of the job.
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(conf, HTable.class);
super.jobSetup(job);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import com.cloudera.sqoop.lib.SqoopRecord;
/**
* Imports records by writing them to HBase via the DelegatingOutputFormat
* and the HBasePutProcessor.
*/
public class HBaseImportMapper
extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
NullWritable> {
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
context.write(val, NullWritable.get());
}
}

View File

@ -153,6 +153,7 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
configureMapper(job, tableName, tableClassName);
configureNumTasks(job);
jobSetup(job);
boolean success = runJob(job);
if (!success) {
throw new ImportException("Import job failed!");
@ -165,4 +166,16 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
unloadJars();
}
}
/**
* Open-ended "setup" routine that is called after the job is configured
* but just before it is submitted to MapReduce. Subclasses may override
* if necessary.
*/
protected void jobSetup(Job job) throws IOException, ImportException {
}
protected ImportJobContext getContext() {
return context;
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* OutputCommitter instance that does nothing.
*/
public class NullOutputCommitter extends OutputCommitter {
public void abortTask(TaskAttemptContext taskContext) { }
public void cleanupJob(JobContext jobContext) { }
public void commitTask(TaskAttemptContext taskContext) { }
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
}

View File

@ -634,9 +634,26 @@ private void generateCloneMethod(Map<String, Integer> columnTypes,
}
sb.append(" return o;\n");
sb.append(" }\n");
sb.append(" }\n\n");
}
/**
* Generate the getFieldMap() method.
* @param columnTypes - mapping from column names to sql types
* @param colNames - ordered list of column names for table.
* @param sb - StringBuilder to append code to
*/
private void generateGetFieldMap(Map<String, Integer> columnTypes,
String [] colNames, StringBuilder sb) {
sb.append(" public Map<String, Object> getFieldMap() {\n");
sb.append(" Map<String, Object> __sqoop$field_map = "
+ "new TreeMap<String, Object>();\n");
for (String colName : colNames) {
sb.append(" __sqoop$field_map.put(\"" + colName + "\", this."
+ colName + ");\n");
}
sb.append(" return __sqoop$field_map;\n");
sb.append(" }\n\n");
}
/**
@ -875,7 +892,7 @@ public void generate() throws IOException {
// This is based on an arbitrary query.
String query = this.options.getSqlQuery();
if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
throw new IOException("Query must contain '"
throw new IOException("Query [" + query + "] must contain '"
+ SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
}
@ -1049,6 +1066,8 @@ private StringBuilder generateClassForColumns(
sb.append("import java.util.Arrays;\n");
sb.append("import java.util.Iterator;\n");
sb.append("import java.util.List;\n");
sb.append("import java.util.Map;\n");
sb.append("import java.util.TreeMap;\n");
sb.append("\n");
String className = tableNameInfo.getShortClassForTable(tableName);
@ -1068,6 +1087,7 @@ private StringBuilder generateClassForColumns(
generateToString(columnTypes, colNames, sb);
generateParser(columnTypes, colNames, sb);
generateCloneMethod(columnTypes, colNames, sb);
generateGetFieldMap(columnTypes, colNames, sb);
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
// WritableComparable

View File

@ -112,6 +112,12 @@ public abstract class BaseSqoopTool extends SqoopTool {
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
// HBase arguments.
public static final String HBASE_TABLE_ARG = "hbase-table";
public static final String HBASE_COL_FAM_ARG = "column-family";
public static final String HBASE_ROW_KEY_ARG = "hbase-row-key";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
public BaseSqoopTool() {
}
@ -413,6 +419,33 @@ protected RelatedOptions getCodeGenOpts(boolean multiTable) {
return codeGenOpts;
}
protected RelatedOptions getHBaseOptions() {
RelatedOptions hbaseOpts =
new RelatedOptions("HBase arguments");
hbaseOpts.addOption(OptionBuilder.withArgName("table")
.hasArg()
.withDescription("Import to <table> in HBase")
.withLongOpt(HBASE_TABLE_ARG)
.create());
hbaseOpts.addOption(OptionBuilder.withArgName("family")
.hasArg()
.withDescription("Sets the target column family for the import")
.withLongOpt(HBASE_COL_FAM_ARG)
.create());
hbaseOpts.addOption(OptionBuilder.withArgName("col")
.hasArg()
.withDescription("Specifies which input column to use as the row key")
.withLongOpt(HBASE_ROW_KEY_ARG)
.create());
hbaseOpts.addOption(OptionBuilder
.withDescription("If specified, create missing HBase tables")
.withLongOpt(HBASE_CREATE_TABLE_ARG)
.create());
return hbaseOpts;
}
/**
* Apply common command-line to the state.
@ -582,6 +615,24 @@ protected void applyCodeGenOptions(CommandLine in, SqoopOptions out,
}
}
protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
if (in.hasOption(HBASE_TABLE_ARG)) {
out.setHBaseTable(in.getOptionValue(HBASE_TABLE_ARG));
}
if (in.hasOption(HBASE_COL_FAM_ARG)) {
out.setHBaseColFamily(in.getOptionValue(HBASE_COL_FAM_ARG));
}
if (in.hasOption(HBASE_ROW_KEY_ARG)) {
out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG));
}
if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
out.setCreateHBaseTable(true);
}
}
protected void validateCommonOptions(SqoopOptions options)
throws InvalidOptionsException {
if (options.getConnectString() == null) {
@ -623,10 +674,21 @@ protected void validateOutputFormatOptions(SqoopOptions options)
}
}
protected void validateHiveOptions(SqoopOptions options) {
protected void validateHiveOptions(SqoopOptions options)
throws InvalidOptionsException {
// Empty; this method is present to maintain API consistency, and
// is reserved for future constraints on Hive options.
}
protected void validateHBaseOptions(SqoopOptions options)
throws InvalidOptionsException {
if ((options.getHBaseColFamily() != null && options.getHBaseTable() == null)
|| (options.getHBaseColFamily() == null
&& options.getHBaseTable() != null)) {
throw new InvalidOptionsException(
"Both --hbase-table and --column-family must be set together."
+ HELP_STR);
}
}
}

View File

@ -108,6 +108,7 @@ protected void importTable(SqoopOptions options, String tableName,
/**
* @return the output path for the imported files;
* in append mode this will point to a temporary folder.
* if importing to hbase, this may return null.
*/
private Path getOutputPath(SqoopOptions options, String tableName) {
// Get output directory
@ -124,7 +125,7 @@ private Path getOutputPath(SqoopOptions options, String tableName) {
outputPath = new Path(hdfsTargetDir);
} else if (hdfsWarehouseDir != null) {
outputPath = new Path(hdfsWarehouseDir, tableName);
} else {
} else if (null != tableName) {
outputPath = new Path(tableName);
}
}
@ -272,6 +273,7 @@ public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getOutputFormatOptions());
toolOptions.addUniqueOptions(getInputFormatOptions());
toolOptions.addUniqueOptions(getHiveOptions(true));
toolOptions.addUniqueOptions(getHBaseOptions());
// get common codegen opts.
RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
@ -384,6 +386,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
applyOutputFormatOptions(in, out);
applyInputFormatOptions(in, out);
applyCodeGenOptions(in, out, allTables);
applyHBaseOptions(in, out);
} catch (NumberFormatException nfe) {
throw new InvalidOptionsException("Error: expected numeric argument.\n"
+ "Try --help for usage.");
@ -417,7 +420,7 @@ protected void validateImportOptions(SqoopOptions options)
"Cannot specify --" + SQL_QUERY_ARG + " and --table together."
+ HELP_STR);
} else if (options.getSqlQuery() != null
&& options.getTargetDir() == null) {
&& options.getTargetDir() == null && options.getHBaseTable() == null) {
throw new InvalidOptionsException(
"Must specify destination with --target-dir."
+ HELP_STR);
@ -458,6 +461,7 @@ public void validateOptions(SqoopOptions options)
validateCommonOptions(options);
validateCodeGenOptions(options);
validateOutputFormatOptions(options);
validateHBaseOptions(options);
}
}

View File

@ -43,7 +43,8 @@ ${ANT} clean jar-all-shims findbugs javadoc cobertura checkstyle \
-Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST} \
-Dcobertura.home=${COBERTURA_HOME} -Dcobertura.format=xml \
-Dfindbugs.home=${FINDBUGS_HOME} \
-Dtest.junit.output.format=xml
-Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \
-Dtest.junit.output.format=xml ${ANT_ARGUMENTS}
if [ "$?" != "0" ]; then
echo "Error during compilation phase. Aborting!"
@ -56,7 +57,8 @@ ${ANT} cobertura \
-Dhadoop.dist=${COMPILE_HADOOP_DIST} \
-Dcobertura.home=${COBERTURA_HOME} -Dcobertura.format=xml \
-Dsqoop.thirdparty.lib.dir=${THIRDPARTY_LIBS} \
-Dtestcase=ThirdPartyTests
-Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \
-Dtestcase=ThirdPartyTests ${ANT_ARGUMENTS}
if [ "$?" != "0" ]; then
echo "Unit tests failed!"

View File

@ -29,7 +29,9 @@ source ${bin}/test-config.sh
# Run compilation step.
${ANT} clean jar -Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST}
${ANT} clean jar -Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST} \
-Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \
${ANT_ARGUMENTS}
if [ "$?" != "0" ]; then
echo "Error during compilation phase. Aborting!"
exit 1
@ -40,7 +42,8 @@ testfailed=0
# Run basic unit tests.
${ANT} clean-cache test -Divy.home=$IVY_HOME -Dtest.junit.output.format=xml \
-Dhadoop.dist=${TEST_HADOOP_DIST}
-Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \
-Dhadoop.dist=${TEST_HADOOP_DIST} ${ANT_ARGUMENTS}
if [ "$?" != "0" ]; then
testfailed=1
fi
@ -53,7 +56,8 @@ fi
${ANT} test -Dthirdparty=true -Dsqoop.thirdparty.lib.dir=${THIRDPARTY_LIBS} \
-Dtest.junit.output.format=xml -Divy.home=$IVY_HOME \
-Dhadoop.dist=${TEST_HADOOP_DIST}
-Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \
-Dhadoop.dist=${TEST_HADOOP_DIST} ${ANT_ARGUMENTS}
if [ "$?" != "0" ]; then
testfailed=1
fi

View File

@ -45,3 +45,10 @@ export TEST_HADOOP_DIST=${TEST_HADOOP_DIST:-apache}
export WORKSPACE=${WORKSPACE:-$projroot}
export IVY_HOME=${IVY_HOME:-$WORKSPACE/.ivy2}
export HBASE_HOME=${HBASE_HOME:-/usr/lib/hbase}
export ZOOKEEPER_HOME=${ZOOKEEPER_HOME:-/usr/lib/zookeeper}
if [ -z "${ANT_ARGUMENTS}" ]; then
export ANT_ARGUMENTS=""
fi

View File

@ -95,16 +95,7 @@ public void checkOutputSpecs(JobContext context)
/** {@inheritDoc} */
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new OutputCommitter() {
public void abortTask(TaskAttemptContext taskContext) { }
public void cleanupJob(JobContext jobContext) { }
public void commitTask(TaskAttemptContext taskContext) { }
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
};
return new NullOutputCommitter();
}
/**

View File

@ -0,0 +1,136 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.mapreduce;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.ProcessingException;
/**
* OutputFormat that produces a RecordReader which instantiates
* a FieldMapProcessor which will process FieldMappable
* output keys.
*
* <p>The output value is ignored.</p>
*
* <p>The FieldMapProcessor implementation may do any arbitrary
* processing on the object. For example, it may write an object
* to HBase, etc.</p>
*
* <p>If the FieldMapProcessor implementation also implements
* Closeable, it will be close()'d in the RecordReader's close()
* method.</p>
*
* <p>If the FMP implements Configurable, it will be configured
* correctly via ReflectionUtils.</p>
*/
public class DelegatingOutputFormat<K extends FieldMappable, V>
extends OutputFormat<K, V> {
/** conf key: the FieldMapProcessor class to instantiate. */
public static final String DELEGATE_CLASS_KEY =
"sqoop.output.delegate.field.map.processor.class";
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
if (null == conf.get(DELEGATE_CLASS_KEY)) {
throw new IOException("Delegate FieldMapProcessor class is not set.");
}
}
@Override
/** {@inheritDoc} */
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new NullOutputCommitter();
}
@Override
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
try {
return new DelegatingRecordWriter(context);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
}
/**
* RecordWriter to write the output to a row in a database table.
* The actual database updates are executed in a second thread.
*/
public class DelegatingRecordWriter extends RecordWriter<K, V> {
private Configuration conf;
private FieldMapProcessor mapProcessor;
public DelegatingRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException {
this.conf = context.getConfiguration();
@SuppressWarnings("unchecked")
Class<? extends FieldMapProcessor> procClass =
(Class<? extends FieldMapProcessor>)
conf.getClass(DELEGATE_CLASS_KEY, null);
this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf);
}
protected Configuration getConf() {
return this.conf;
}
@Override
/** {@inheritDoc} */
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
if (mapProcessor instanceof Closeable) {
((Closeable) mapProcessor).close();
}
}
@Override
/** {@inheritDoc} */
public void write(K key, V value)
throws InterruptedException, IOException {
try {
mapProcessor.accept(key);
} catch (ProcessingException pe) {
throw new IOException(pe);
}
}
}
}

View File

@ -18,6 +18,9 @@
package com.cloudera.sqoop;
import com.cloudera.sqoop.hbase.TestHBaseImport;
import com.cloudera.sqoop.hbase.TestHBaseQueryImport;
import junit.framework.Test;
import junit.framework.TestSuite;
@ -33,6 +36,8 @@ public static Test suite() {
suite.addTest(SmokeTests.suite());
suite.addTest(ThirdPartyTests.suite());
suite.addTestSuite(TestHBaseImport.class);
suite.addTestSuite(TestHBaseQueryImport.class);
return suite;
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
/**
* Utility methods that facilitate HBase import tests.
*/
public class HBaseTestCase extends ImportJobTestCase {
/**
* Create the argv to pass to Sqoop.
* @return the argv as an array of strings.
*/
protected String [] getArgv(boolean includeHadoopFlags,
String hbaseTable, String hbaseColFam, boolean hbaseCreate,
String queryStr) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
if (null != queryStr) {
args.add("--query");
args.add(queryStr);
} else {
args.add("--table");
args.add(getTableName());
}
args.add("--split-by");
args.add(getColName(0));
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--num-mappers");
args.add("1");
args.add("--column-family");
args.add(hbaseColFam);
args.add("--hbase-table");
args.add(hbaseTable);
if (hbaseCreate) {
args.add("--hbase-create-table");
}
return args.toArray(new String[0]);
}
// Starts a mini hbase cluster in this process.
private HBaseTestingUtility hbaseTestUtil;
private void startMaster() throws Exception {
if (null == hbaseTestUtil) {
Configuration conf = new Configuration();
conf = HBaseConfiguration.addHbaseResources(conf);
hbaseTestUtil = new HBaseTestingUtility(conf);
hbaseTestUtil.startMiniCluster(1);
}
}
@Override
@Before
public void setUp() {
try {
startMaster();
} catch (Exception e) {
fail(e.toString());
}
super.setUp();
}
@AfterClass
public void shutdown() throws Exception {
LOG.info("In shutdown() method");
if (null != hbaseTestUtil) {
LOG.info("Shutting down HBase cluster");
hbaseTestUtil.shutdownMiniCluster();
this.hbaseTestUtil = null;
}
}
protected void verifyHBaseCell(String tableName, String rowKey,
String colFamily, String colName, String val) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
HTable table = new HTable(Bytes.toBytes(tableName));
try {
Result r = table.get(get);
byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
Bytes.toBytes(colName));
if (null == val) {
assertNull("Got a result when expected null", actualVal);
} else {
assertNotNull("No result, but we expected one", actualVal);
assertEquals(val, Bytes.toString(actualVal));
}
} finally {
table.close();
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import org.junit.Test;
/**
* Test imports of tables into HBase.
*/
public class TestHBaseImport extends HBaseTestCase {
@Test
public void testBasicUsage() throws IOException {
// Create the HBase table in Sqoop as we run the job.
String [] argv = getArgv(true, "BasicUsage", "BasicColFam", true, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyHBaseCell("BasicUsage", "0", "BasicColFam", getColName(1), "1");
}
@Test
public void testMissingTableFails() throws IOException {
// Test that if the table doesn't exist, we fail unless we
// explicitly create the table.
String [] argv = getArgv(true, "MissingTable", "MissingFam", false, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
try {
runImport(argv);
fail("Expected IOException");
} catch (IOException ioe) {
LOG.info("Got exception -- ok; we expected that job to fail.");
}
}
@Test
public void testOverwriteSucceeds() throws IOException {
// Test that we can create a table and then import immediately
// back on top of it without problem.
String [] argv = getArgv(true, "OverwriteT", "OverwriteF", true, null);
String [] types = { "INT", "INT" };
String [] vals = { "0", "1" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyHBaseCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
// Run a second time.
runImport(argv);
verifyHBaseCell("OverwriteT", "0", "OverwriteF", getColName(1), "1");
}
@Test
public void testStrings() throws IOException {
String [] argv = getArgv(true, "stringT", "stringF", true, null);
String [] types = { "INT", "VARCHAR(32)" };
String [] vals = { "0", "'abc'" };
createTableWithColTypes(types, vals);
runImport(argv);
verifyHBaseCell("stringT", "0", "stringF", getColName(1), "abc");
}
@Test
public void testNulls() throws IOException {
String [] argv = getArgv(true, "nullT", "nullF", true, null);
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "null" };
createTableWithColTypes(types, vals);
runImport(argv);
// This cell should import correctly.
verifyHBaseCell("nullT", "0", "nullF", getColName(1), "42");
// This cell should not be placed in the results..
verifyHBaseCell("nullT", "0", "nullF", getColName(2), null);
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import org.junit.Test;
/**
* Test import of free-form query into HBase.
*/
public class TestHBaseQueryImport extends HBaseTestCase {
@Test
public void testImportFromQuery() throws IOException {
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
createTableWithColTypes(types, vals);
String [] argv = getArgv(true, "queryT", "queryF", true,
"SELECT " + getColName(0) + ", " + getColName(1) + " FROM "
+ getTableName() + " WHERE $CONDITIONS");
runImport(argv);
// This cell should import correctly.
verifyHBaseCell("queryT", "0", "queryF", getColName(1), "42");
// This cell should not be placed in the results..
verifyHBaseCell("queryT", "0", "queryF", getColName(2), null);
}
}

View File

@ -222,6 +222,10 @@ public void tearDown() {
static final String BASE_COL_NAME = "DATA_COL";
protected String getColName(int i) {
return BASE_COL_NAME + i;
}
/**
* Drop a table if it already exists in the database.
* @param table the name of the table to drop.