diff --git a/bin/configure-sqoop b/bin/configure-sqoop index bb022547..6bb300d6 100755 --- a/bin/configure-sqoop +++ b/bin/configure-sqoop @@ -29,10 +29,35 @@ if [ -z "$SQOOP_HOME" ]; then export SQOOP_HOME=${bin}/.. fi +# Find paths to our dependency systems. If they are unset, use CDH defaults. + if [ -z "${HADOOP_HOME}" ]; then - # Try CDH default if the user hasn't set this. HADOOP_HOME=/usr/lib/hadoop fi +if [ -z "${HBASE_HOME}" ]; then + HBASE_HOME=/usr/lib/hbase +fi +if [ -z "${ZOOKEEPER_HOME}" ]; then + ZOOKEEPER_HOME=/usr/lib/zookeeper +fi + +# Check: If we can't find our dependencies, give up here. +if [ ! -d "${HADOOP_HOME}" ]; then + echo "Error: $HADOOP_HOME does not exist!" + echo "Please set $$HADOOP_HOME to the root of your Hadoop installation." + exit 1 +fi +if [ ! -d "${HBASE_HOME}" ]; then + echo "Error: $HBASE_HOME does not exist!" + echo "Please set $$HBASE_HOME to the root of your HBase installation." + exit 1 +fi +if [ ! -d "${ZOOKEEPER_HOME}" ]; then + echo "Error: $ZOOKEEPER_HOME does not exist!" + echo "Please set $$ZOOKEEPER_HOME to the root of your ZooKeeper installation." + exit 1 +fi + # Where to find the main Sqoop jar SQOOP_JAR_DIR=$SQOOP_HOME @@ -50,14 +75,31 @@ if [ -d "$SQOOP_JAR_DIR/build" ]; then fi fi +function add_to_classpath() { + dir=$1 + for f in $dir/*.jar; do + SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f; + done + + export SQOOP_CLASSPATH +} + # Add sqoop dependencies to classpath. SQOOP_CLASSPATH="" if [ -d "$SQOOP_HOME/lib" ]; then - for f in $SQOOP_HOME/lib/*.jar; do - SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:$f; - done + add_to_classpath $SQOOP_HOME/lib fi +# Add HBase to dependency list +add_to_classpath $HBASE_HOME +add_to_classpath $HBASE_HOME/lib + +HBASE_CONF_DIR=${HBASE_CONF_DIR:-${HBASE_HOME}/conf} +SQOOP_CLASSPATH=${HBASE_CONF_DIR}:${SQOOP_CLASSPATH} + +add_to_classpath $ZOOKEEPER_HOME +add_to_classpath $ZOOKEEPER_HOME/lib + SQOOP_CONF_DIR=${SQOOP_CONF_DIR:-${SQOOP_HOME}/conf} SQOOP_CLASSPATH=${SQOOP_CONF_DIR}:${SQOOP_CLASSPATH} @@ -74,5 +116,6 @@ export SQOOP_SHIM_DIR export SQOOP_JAR=`ls -1 ${SQOOP_JAR_DIR}/sqoop-*.jar | head -n 1` export HADOOP_CLASSPATH="${SQOOP_CLASSPATH}:${HADOOP_CLASSPATH}" export HADOOP_HOME +export HBASE_HOME export HADOOP_OPTS="-Dsqoop.shim.jar.dir=${SQOOP_SHIM_DIR} ${HADOOP_OPTS}" diff --git a/build.xml b/build.xml index dd0f4372..eb82ff75 100644 --- a/build.xml +++ b/build.xml @@ -149,6 +149,18 @@ + + + + + + + + + + + + @@ -167,6 +179,7 @@ + @@ -174,6 +187,7 @@ + diff --git a/ivy.xml b/ivy.xml index c826501a..a7ab7f93 100644 --- a/ivy.xml +++ b/ivy.xml @@ -118,6 +118,10 @@ conf="common->master" /> + + representing the fields of the dataset. +It returns a +List+ describing how to insert the cells into HBase. +The default +PutTransformer+ implementation is the +ToStringPutTransformer+ +that uses the string-based representation of each field to serialize the +fields to HBase. + +You can override this implementation by implementing your own +PutTransformer+ +and adding it to the classpath for the map tasks (e.g., with the +-libjars+ +option). To tell Sqoop to use your implementation, set the ++sqoop.hbase.insert.put.transformer.class+ property to identify your class +with +-D+. + +Within your PutTransformer implementation, the specified row key +column and column family are +available via the +getRowKeyColumn()+ and +getColumnFamily()+ methods. +You are free to make additional Put operations outside these constraints; +for example, to inject additional rows representing a secondary index. +However, Sqoop will execute all +Put+ operations against the table +specified with +\--hbase-table+. Sqoop Internals ~~~~~~~~~~~~~~~ diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt new file mode 100644 index 00000000..b1bb36ea --- /dev/null +++ b/src/docs/man/hbase-args.txt @@ -0,0 +1,35 @@ + +//// + Licensed to Cloudera, Inc. under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +HBase options +~~~~~~~~~~~~~ + +--column-family (family):: + Sets the target column family for the import + +--hbase-create-table:: + If specified, create missing HBase tables + +--hbase-row-key (col):: + Specifies which input column to use as the row key + +--hbase-table (table-name):: + Specifies an HBase table to use as the target instead of HDFS + + diff --git a/src/docs/man/sqoop-import.txt b/src/docs/man/sqoop-import.txt index 23ac5d73..28255567 100644 --- a/src/docs/man/sqoop-import.txt +++ b/src/docs/man/sqoop-import.txt @@ -82,6 +82,8 @@ include::input-args.txt[] include::hive-args.txt[] +include::hbase-args.txt[] + include::codegen-args.txt[] Database-specific options diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt new file mode 100644 index 00000000..954173b3 --- /dev/null +++ b/src/docs/user/hbase-args.txt @@ -0,0 +1,32 @@ + +//// + Licensed to Cloudera, Inc. under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +.HBase arguments: +[grid="all"] +`-----------------------------`------------------------------------------- +Argument Description +-------------------------------------------------------------------------- ++\--column-family + Sets the target column family for the import ++\--hbase-create-table+ If specified, create missing HBase tables ++\--hbase-row-key + Specifies which input column to use as the\ + row key ++\--hbase-table + Specifies an HBase table to use as the \ + target instead of HDFS +-------------------------------------------------------------------------- + diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt new file mode 100644 index 00000000..9e4d29da --- /dev/null +++ b/src/docs/user/hbase.txt @@ -0,0 +1,49 @@ + +//// + Licensed to Cloudera, Inc. under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + Cloudera, Inc. licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +Importing Data Into HBase +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop supports additional import targets beyond HDFS and Hive. Sqoop +can also import records into a table in HBase. + +By specifying +\--hbase-table+, you instruct Sqoop to import +to a table in HBase rather than a directory in HDFS. Sqoop will +import data to the table specified as the argument to +\--hbase-table+. +Each row of the input table will be transformed into an HBase ++Put+ operation to a row of the output table. The key for each row is +taken from a column of the input. By default Sqoop will use the split-by +column as the row key column. If that is not specified, it will try to +identify the primary key column, if any, of the source table. You can +manually specify the row key column with +\--hbase-row-key+. Each output +column will be placed in the same column family, which must be specified +with +\--column-family+. + +If the target table and column family do not exist, the Sqoop job will +exit with an error. You should create the target table and column family +before running an import. If you specify +\--hbase-create-table+, Sqoop +will create the target table and column family if they do not exist, +using the default parameters from your HBase configuration. + +Sqoop currently serializes all values to HBase by converting each field +to its string representation (as if you were importing to HDFS in text +mode), and then inserts the UTF-8 bytes of this string in the target +cell. + + diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 39f0436a..4a6519a4 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -412,6 +412,9 @@ include::hive-args.txt[] include::hive.txt[] +include::hbase-args.txt[] +include::hbase.txt[] + include::codegen-args.txt[] As mentioned earlier, a byproduct of importing a table to HDFS is a diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java index 8b37e979..961b007f 100644 --- a/src/java/com/cloudera/sqoop/SqoopOptions.java +++ b/src/java/com/cloudera/sqoop/SqoopOptions.java @@ -137,6 +137,11 @@ public enum FileLayout { private String [] extraArgs; + private String hbaseTable; // HBase table to import into. + private String hbaseColFamily; // Column family to prepend to inserted cols. + private String hbaseRowKeyCol; // Column of the input to use as the row key. + private boolean hbaseCreateTable; // if true, create tables/col families. + public SqoopOptions() { initDefaults(null); } @@ -1006,5 +1011,63 @@ public void setDbOutputColumns(String [] outCols) { this.dbOutColumns = Arrays.copyOf(outCols, outCols.length); } } + + /** + * Set whether we should create missing HBase tables. + */ + public void setCreateHBaseTable(boolean create) { + this.hbaseCreateTable = create; + } + + /** + * Returns true if we should create HBase tables/column families + * that are missing. + */ + public boolean getCreateHBaseTable() { + return this.hbaseCreateTable; + } + + /** + * Sets the HBase target column family. + */ + public void setHBaseColFamily(String colFamily) { + this.hbaseColFamily = colFamily; + } + + /** + * Gets the HBase import target column family. + */ + public String getHBaseColFamily() { + return this.hbaseColFamily; + } + + /** + * Gets the column to use as the row id in an hbase import. + * If null, use the primary key column. + */ + public String getHBaseRowKeyColumn() { + return this.hbaseRowKeyCol; + } + + /** + * Sets the column to use as the row id in an hbase import. + */ + public void setHBaseRowKeyColumn(String col) { + this.hbaseRowKeyCol = col; + } + + /** + * Gets the target HBase table name, if any. + */ + public String getHBaseTable() { + return this.hbaseTable; + } + + /** + * Sets the target HBase table name for an import. + */ + public void setHBaseTable(String table) { + this.hbaseTable = table; + } } diff --git a/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java b/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java new file mode 100644 index 00000000..aeecba70 --- /dev/null +++ b/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java @@ -0,0 +1,133 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.util.ReflectionUtils; + +import com.cloudera.sqoop.lib.FieldMappable; +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.lib.ProcessingException; + +/** + * SqoopRecordProcessor that performs an HBase "put" operation + * that contains all the fields of the record. + */ +public class HBasePutProcessor implements Closeable, Configurable, + FieldMapProcessor { + + /** Configuration key specifying the table to insert into. */ + public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table"; + + /** Configuration key specifying the column family to insert into. */ + public static final String COL_FAMILY_KEY = + "sqoop.hbase.insert.column.family"; + + /** Configuration key specifying the column of the input whose value + * should be used as the row id. + */ + public static final String ROW_KEY_COLUMN_KEY = + "sqoop.hbase.insert.row.key.column"; + + /** + * Configuration key specifying the PutTransformer implementation to use. + */ + public static final String TRANSFORMER_CLASS_KEY = + "sqoop.hbase.insert.put.transformer.class"; + + private Configuration conf; + + // An object that can transform a map of fieldName->object + // into a Put command. + private PutTransformer putTransformer; + + private String tableName; + private HTable table; + + public HBasePutProcessor() { + } + + @Override + @SuppressWarnings("unchecked") + public void setConf(Configuration config) { + this.conf = config; + + // Get the implementation of PutTransformer to use. + // By default, we call toString() on every non-null field. + Class xformerClass = + (Class) + this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class); + this.putTransformer = (PutTransformer) + ReflectionUtils.newInstance(xformerClass, this.conf); + if (null == putTransformer) { + throw new RuntimeException("Could not instantiate PutTransformer."); + } + + this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); + this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + + this.tableName = conf.get(TABLE_NAME_KEY, null); + try { + this.table = new HTable(conf, this.tableName); + } catch (IOException ioe) { + throw new RuntimeException("Could not access HBase table " + tableName, + ioe); + } + this.table.setAutoFlush(false); + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + /** + * Processes a record by extracting its field map and converting + * it into a list of Put commands into HBase. + */ + public void accept(FieldMappable record) + throws IOException, ProcessingException { + Map fields = record.getFieldMap(); + + List putList = putTransformer.getPutCommand(fields); + if (null != putList) { + for (Put put : putList) { + this.table.put(put); + } + } + } + + @Override + /** + * Closes the HBase table and commits all pending operations. + */ + public void close() throws IOException { + this.table.flushCommits(); + this.table.close(); + } +} diff --git a/src/java/com/cloudera/sqoop/hbase/PutTransformer.java b/src/java/com/cloudera/sqoop/hbase/PutTransformer.java new file mode 100644 index 00000000..ee216f36 --- /dev/null +++ b/src/java/com/cloudera/sqoop/hbase/PutTransformer.java @@ -0,0 +1,75 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.client.Put; + +/** + * Interface that takes a map of jdbc field names to values + * and converts them to a Put command for HBase. + */ +public abstract class PutTransformer { + + public PutTransformer() { + } + + private String columnFamily; + private String rowKeyColumn; + + /** + * @return the default column family to insert into. + */ + public String getColumnFamily() { + return this.columnFamily; + } + + /** + * Set the default column family to insert into. + */ + public void setColumnFamily(String colFamily) { + this.columnFamily = colFamily; + } + + /** + * @return the field name identifying the value to use as the row id. + */ + public String getRowKeyColumn() { + return this.rowKeyColumn; + } + + /** + * Set the column of the input fields which should be used to calculate + * the row id. + */ + public void setRowKeyColumn(String rowKeyCol) { + this.rowKeyColumn = rowKeyCol; + } + + /** + * Returns a list of Put commands that inserts the fields into a row in HBase. + * @param fields a map of field names to values to insert. + * @return A list of Put commands that inserts these into HBase. + */ + public abstract List getPutCommand(Map fields) + throws IOException; +} diff --git a/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java b/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java new file mode 100644 index 00000000..6ed3386a --- /dev/null +++ b/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java @@ -0,0 +1,100 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * PutTransformer that calls toString on all non-null fields. + */ +public class ToStringPutTransformer extends PutTransformer { + + public static final Log LOG = LogFactory.getLog( + ToStringPutTransformer.class.getName()); + + // A mapping from field name -> bytes for that field name. + // Used to cache serialization work done for fields names. + private Map serializedFieldNames; + + public ToStringPutTransformer() { + serializedFieldNames = new TreeMap(); + } + + /** + * Return the serialized bytes for a field name, using + * the cache if it's already in there. + */ + private byte [] getFieldNameBytes(String fieldName) { + byte [] cachedName = serializedFieldNames.get(fieldName); + if (null != cachedName) { + // Cache hit. We're done. + return cachedName; + } + + // Do the serialization and memoize the result. + byte [] nameBytes = Bytes.toBytes(fieldName); + serializedFieldNames.put(fieldName, nameBytes); + return nameBytes; + } + + @Override + /** {@inheritDoc} */ + public List getPutCommand(Map fields) + throws IOException { + + String rowKeyCol = getRowKeyColumn(); + String colFamily = getColumnFamily(); + byte [] colFamilyBytes = Bytes.toBytes(colFamily); + + Object rowKey = fields.get(rowKeyCol); + if (null == rowKey) { + // If the row-key column is null, we don't insert this row. + LOG.warn("Could not insert row with null value for row-key column: " + + rowKeyCol); + return null; + } + + Put put = new Put(Bytes.toBytes(rowKey.toString())); + + for (Map.Entry fieldEntry : fields.entrySet()) { + String colName = fieldEntry.getKey(); + if (!colName.equals(rowKeyCol)) { + // This is a regular field, not the row key. + // Add it if it's not null. + Object val = fieldEntry.getValue(); + if (null != val) { + put.add(colFamilyBytes, getFieldNameBytes(colName), + Bytes.toBytes(val.toString())); + } + } + } + + return Collections.singletonList(put); + } +} diff --git a/src/java/com/cloudera/sqoop/lib/FieldMapProcessor.java b/src/java/com/cloudera/sqoop/lib/FieldMapProcessor.java new file mode 100644 index 00000000..0a9897eb --- /dev/null +++ b/src/java/com/cloudera/sqoop/lib/FieldMapProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.lib; + +import java.io.IOException; + +/** + * Interface implemented by classes that process FieldMappable objects. + */ +public interface FieldMapProcessor { + + /** + * Allow arbitrary processing of a FieldMappable object. + * @param record an object which can emit a map of its field names to values. + * @throws IOException if the processor encounters an IO error when + * operating on this object. + * @throws ProcessingException if the FieldMapProcessor encounters + * a general processing error when operating on this object. + */ + void accept(FieldMappable record) throws IOException, ProcessingException; +} + diff --git a/src/java/com/cloudera/sqoop/lib/FieldMappable.java b/src/java/com/cloudera/sqoop/lib/FieldMappable.java new file mode 100644 index 00000000..cd170c8f --- /dev/null +++ b/src/java/com/cloudera/sqoop/lib/FieldMappable.java @@ -0,0 +1,36 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.lib; + +import java.util.Map; + +/** + * Interface describing a class capable of returning a map of the fields + * of the object to their values. + */ +public interface FieldMappable { + + /** + * Returns a map containing all fields of this record. + * @return a map from column names to the object-based values for + * this record. The map may not be null, though it may be empty. + */ + Map getFieldMap(); +} + diff --git a/src/java/com/cloudera/sqoop/lib/ProcessingException.java b/src/java/com/cloudera/sqoop/lib/ProcessingException.java new file mode 100644 index 00000000..4817b255 --- /dev/null +++ b/src/java/com/cloudera/sqoop/lib/ProcessingException.java @@ -0,0 +1,48 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.lib; + +/** + * General error during processing of a SqoopRecord. + */ +@SuppressWarnings("serial") +public class ProcessingException extends Exception { + + public ProcessingException() { + super("ProcessingException"); + } + + public ProcessingException(final String message) { + super(message); + } + + public ProcessingException(final Throwable cause) { + super(cause); + } + + public ProcessingException(final String message, final Throwable cause) { + super(message, cause); + } + + @Override + public String toString() { + String msg = getMessage(); + return (null == msg) ? "ProcessingException" : msg; + } +} diff --git a/src/java/com/cloudera/sqoop/lib/SqoopRecord.java b/src/java/com/cloudera/sqoop/lib/SqoopRecord.java index 1b8be9bd..90d075aa 100644 --- a/src/java/com/cloudera/sqoop/lib/SqoopRecord.java +++ b/src/java/com/cloudera/sqoop/lib/SqoopRecord.java @@ -23,6 +23,7 @@ import java.nio.CharBuffer; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -31,7 +32,8 @@ /** * Interface implemented by the classes generated by sqoop's orm.ClassWriter. */ -public abstract class SqoopRecord implements Cloneable, DBWritable, Writable { +public abstract class SqoopRecord implements Cloneable, DBWritable, + FieldMappable, Writable { public SqoopRecord() { } @@ -67,5 +69,32 @@ public Object clone() throws CloneNotSupportedException { * @return the API version this class was generated against. */ public abstract int getClassFormatVersion(); + + /** + * Use the delegate pattern to allow arbitrary processing of the + * fields of this record. + * @param processor A delegate that operates on this object. + * @throws IOException if the processor encounters an IO error when + * operating on this object. + * @throws ProcessingException if the FieldMapProcessor encounters + * a general processing error when operating on this object. + */ + public void delegate(FieldMapProcessor processor) + throws IOException, ProcessingException { + processor.accept(this); + } + + @Override + /** + * {@inheriDoc} + * @throws RuntimeException if used with a record that was generated + * before this capability was added (1.1.0). + */ + public Map getFieldMap() { + // Default implementation does not support field iteration. + // ClassWriter should provide an overriding version. + throw new RuntimeException( + "Got null field map from record. Regenerate your record class."); + } } diff --git a/src/java/com/cloudera/sqoop/manager/ImportJobContext.java b/src/java/com/cloudera/sqoop/manager/ImportJobContext.java index 56b7169e..50472b15 100644 --- a/src/java/com/cloudera/sqoop/manager/ImportJobContext.java +++ b/src/java/com/cloudera/sqoop/manager/ImportJobContext.java @@ -34,6 +34,7 @@ public class ImportJobContext { private SqoopOptions options; private Class inputFormatClass; private Path destination; + private ConnManager manager; public ImportJobContext(final String table, final String jar, final SqoopOptions opts, final Path destination) { @@ -79,5 +80,21 @@ public Path getDestination() { return this.destination; } + /** + * Set the ConnManager instance to be used during the import's + * configuration. + */ + public void setConnManager(ConnManager mgr) { + this.manager = mgr; + } + + /** + * Get the ConnManager instance to use during an import's + * configuration stage. + */ + public ConnManager getConnManager() { + return this.manager; + } + } diff --git a/src/java/com/cloudera/sqoop/manager/OracleManager.java b/src/java/com/cloudera/sqoop/manager/OracleManager.java index 0b93f06a..8f061199 100644 --- a/src/java/com/cloudera/sqoop/manager/OracleManager.java +++ b/src/java/com/cloudera/sqoop/manager/OracleManager.java @@ -291,6 +291,7 @@ private void setSessionTimeZone(Connection conn) throws SQLException { @Override public void importTable(ImportJobContext context) throws IOException, ImportException { + context.setConnManager(this); // Specify the Oracle-specific DBInputFormat for import. context.setInputFormat(OracleDataDrivenDBInputFormat.class); super.importTable(context); diff --git a/src/java/com/cloudera/sqoop/manager/SqlManager.java b/src/java/com/cloudera/sqoop/manager/SqlManager.java index 9e204346..b96b4777 100644 --- a/src/java/com/cloudera/sqoop/manager/SqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/SqlManager.java @@ -23,6 +23,8 @@ import com.cloudera.sqoop.lib.BlobRef; import com.cloudera.sqoop.lib.ClobRef; import com.cloudera.sqoop.mapreduce.DataDrivenImportJob; +import com.cloudera.sqoop.mapreduce.HBaseImportJob; +import com.cloudera.sqoop.mapreduce.ImportJobBase; import com.cloudera.sqoop.mapreduce.JdbcExportJob; import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob; import com.cloudera.sqoop.util.ExportException; @@ -341,8 +343,17 @@ public void importTable(ImportJobContext context) String jarFile = context.getJarFile(); SqoopOptions opts = context.getOptions(); - DataDrivenImportJob importer = - new DataDrivenImportJob(opts, context.getInputFormat(), context); + context.setConnManager(this); + + ImportJobBase importer; + if (opts.getHBaseTable() != null) { + // Import to HBase. + importer = new HBaseImportJob(opts, context); + } else { + // Import to HDFS. + importer = new DataDrivenImportJob(opts, context.getInputFormat(), + context); + } String splitCol = getSplitColumn(opts, tableName); if (null == splitCol && opts.getNumMappers() > 1) { @@ -365,8 +376,17 @@ public void importQuery(ImportJobContext context) String jarFile = context.getJarFile(); SqoopOptions opts = context.getOptions(); - DataDrivenImportJob importer = - new DataDrivenImportJob(opts, context.getInputFormat(), context); + context.setConnManager(this); + + ImportJobBase importer; + if (opts.getHBaseTable() != null) { + // Import to HBase. + importer = new HBaseImportJob(opts, context); + } else { + // Import to HDFS. + importer = new DataDrivenImportJob(opts, context.getInputFormat(), + context); + } String splitCol = getSplitColumn(opts, null); if (null == splitCol && opts.getNumMappers() > 1) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java index f561be1d..95e36a90 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java @@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import com.cloudera.sqoop.ConnFactory; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.lib.LargeObjectLoader; @@ -105,7 +104,7 @@ protected Class getOutputFormatClass() @Override protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws IOException { - ConnManager mgr = new ConnFactory(options.getConf()).getManager(options); + ConnManager mgr = getContext().getConnManager(); try { String username = options.getUsername(); if (null == username || username.length() == 0) { diff --git a/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java b/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java new file mode 100644 index 00000000..99eb8127 --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java @@ -0,0 +1,183 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.hbase.HBasePutProcessor; +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.shims.ShimLoader; +import com.cloudera.sqoop.util.ImportException; + +/** + * Runs an HBase import via DataDrivenDBInputFormat to the HBasePutProcessor + * in the DelegatingOutputFormat. + */ +public class HBaseImportJob extends DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog( + HBaseImportJob.class.getName()); + + public HBaseImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, DataDrivenDBInputFormat.class, importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(SqoopRecord.class); + job.setOutputValueClass(NullWritable.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return HBaseImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return (Class) ShimLoader.getShimClass( + "com.cloudera.sqoop.mapreduce.DelegatingOutputFormat"); + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + + // Use the DelegatingOutputFormat with the HBasePutProcessor. + job.setOutputFormatClass(getOutputFormatClass()); + + Configuration conf = job.getConfiguration(); + conf.setClass("sqoop.output.delegate.field.map.processor.class", + HBasePutProcessor.class, + FieldMapProcessor.class); + + // Set the HBase parameters (table, column family, row key): + conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable()); + conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily()); + + // What column of the input becomes the row key? + String rowKeyCol = options.getHBaseRowKeyColumn(); + if (null == rowKeyCol) { + // User didn't explicitly set one. If there's a split-by column set, + // use that. + rowKeyCol = options.getSplitByCol(); + } + + if (null == rowKeyCol) { + // No split-by column is explicitly set. + // If the table has a primary key, use that. + ConnManager manager = getContext().getConnManager(); + rowKeyCol = manager.getPrimaryKey(tableName); + } + + if (null == rowKeyCol) { + // Give up here if this is still unset. + throw new IOException("Could not determine the row-key column. " + + "Use --hbase-row-key to specify the input column that " + + "names each row."); + } + + conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol); + } + + @Override + /** Create the target HBase table before running the job. */ + protected void jobSetup(Job job) throws IOException, ImportException { + Configuration conf = job.getConfiguration(); + String tableName = conf.get(HBasePutProcessor.TABLE_NAME_KEY); + String familyName = conf.get(HBasePutProcessor.COL_FAMILY_KEY); + + if (null == tableName) { + throw new ImportException( + "Import to HBase error: Table name not specified"); + } + + if (null == familyName) { + throw new ImportException( + "Import to HBase error: Column family not specified"); + } + + // Add HBase configuration files to this conf object. + HBaseConfiguration.addHbaseResources(conf); + + HBaseAdmin admin = new HBaseAdmin(conf); + + // Check to see if the table exists. + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + byte [] familyBytes = Bytes.toBytes(familyName); + HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes); + if (!admin.tableExists(tableName)) { + if (options.getCreateHBaseTable()) { + // Create the table. + LOG.info("Creating missing HBase table " + tableName); + tableDesc.addFamily(colDesc); + admin.createTable(tableDesc); + } else { + LOG.warn("Could not find HBase table " + tableName); + LOG.warn("This job may fail. Either explicitly create the table,"); + LOG.warn("or re-run with --hbase-create-table."); + } + } else if (!tableDesc.hasFamily(familyBytes)) { + if (options.getCreateHBaseTable()) { + // Create the column family. + LOG.info("Creating missing column family " + familyName); + admin.disableTable(tableName); + admin.addColumn(tableName, colDesc); + admin.enableTable(tableName); + } else { + LOG.warn("Could not find column family " + familyName + " in table " + + tableName); + LOG.warn("This job may fail. Either create the column family,"); + LOG.warn("or re-run with --hbase-create-table."); + } + } + + // Make sure HBase libraries are shipped as part of the job. + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + + super.jobSetup(job); + } +} + diff --git a/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java b/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java new file mode 100644 index 00000000..b5823eef --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import com.cloudera.sqoop.lib.SqoopRecord; + +/** + * Imports records by writing them to HBase via the DelegatingOutputFormat + * and the HBasePutProcessor. + */ +public class HBaseImportMapper + extends AutoProgressMapper { + + @Override + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + context.write(val, NullWritable.get()); + } +} + diff --git a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java index 28cf6136..069c5119 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java @@ -153,6 +153,7 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, configureMapper(job, tableName, tableClassName); configureNumTasks(job); + jobSetup(job); boolean success = runJob(job); if (!success) { throw new ImportException("Import job failed!"); @@ -165,4 +166,16 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, unloadJars(); } } + + /** + * Open-ended "setup" routine that is called after the job is configured + * but just before it is submitted to MapReduce. Subclasses may override + * if necessary. + */ + protected void jobSetup(Job job) throws IOException, ImportException { + } + + protected ImportJobContext getContext() { + return context; + } } diff --git a/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java b/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java new file mode 100644 index 00000000..8cd2247f --- /dev/null +++ b/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java @@ -0,0 +1,44 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + + +/** + * OutputCommitter instance that does nothing. + */ +public class NullOutputCommitter extends OutputCommitter { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } +} + diff --git a/src/java/com/cloudera/sqoop/orm/ClassWriter.java b/src/java/com/cloudera/sqoop/orm/ClassWriter.java index 4f447b80..a9d4ced1 100644 --- a/src/java/com/cloudera/sqoop/orm/ClassWriter.java +++ b/src/java/com/cloudera/sqoop/orm/ClassWriter.java @@ -634,9 +634,26 @@ private void generateCloneMethod(Map columnTypes, } sb.append(" return o;\n"); - sb.append(" }\n"); - + sb.append(" }\n\n"); + } + /** + * Generate the getFieldMap() method. + * @param columnTypes - mapping from column names to sql types + * @param colNames - ordered list of column names for table. + * @param sb - StringBuilder to append code to + */ + private void generateGetFieldMap(Map columnTypes, + String [] colNames, StringBuilder sb) { + sb.append(" public Map getFieldMap() {\n"); + sb.append(" Map __sqoop$field_map = " + + "new TreeMap();\n"); + for (String colName : colNames) { + sb.append(" __sqoop$field_map.put(\"" + colName + "\", this." + + colName + ");\n"); + } + sb.append(" return __sqoop$field_map;\n"); + sb.append(" }\n\n"); } /** @@ -875,7 +892,7 @@ public void generate() throws IOException { // This is based on an arbitrary query. String query = this.options.getSqlQuery(); if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) { - throw new IOException("Query must contain '" + throw new IOException("Query [" + query + "] must contain '" + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause."); } @@ -1049,6 +1066,8 @@ private StringBuilder generateClassForColumns( sb.append("import java.util.Arrays;\n"); sb.append("import java.util.Iterator;\n"); sb.append("import java.util.List;\n"); + sb.append("import java.util.Map;\n"); + sb.append("import java.util.TreeMap;\n"); sb.append("\n"); String className = tableNameInfo.getShortClassForTable(tableName); @@ -1068,6 +1087,7 @@ private StringBuilder generateClassForColumns( generateToString(columnTypes, colNames, sb); generateParser(columnTypes, colNames, sb); generateCloneMethod(columnTypes, colNames, sb); + generateGetFieldMap(columnTypes, colNames, sb); // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a // WritableComparable diff --git a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java index 852bb0f9..6ed2114f 100644 --- a/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java +++ b/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java @@ -112,6 +112,12 @@ public abstract class BaseSqoopTool extends SqoopTool { public static final String HELP_ARG = "help"; public static final String UPDATE_KEY_ARG = "update-key"; + // HBase arguments. + public static final String HBASE_TABLE_ARG = "hbase-table"; + public static final String HBASE_COL_FAM_ARG = "column-family"; + public static final String HBASE_ROW_KEY_ARG = "hbase-row-key"; + public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; + public BaseSqoopTool() { } @@ -413,6 +419,33 @@ protected RelatedOptions getCodeGenOpts(boolean multiTable) { return codeGenOpts; } + protected RelatedOptions getHBaseOptions() { + RelatedOptions hbaseOpts = + new RelatedOptions("HBase arguments"); + hbaseOpts.addOption(OptionBuilder.withArgName("table") + .hasArg() + .withDescription("Import to in HBase") + .withLongOpt(HBASE_TABLE_ARG) + .create()); + hbaseOpts.addOption(OptionBuilder.withArgName("family") + .hasArg() + .withDescription("Sets the target column family for the import") + .withLongOpt(HBASE_COL_FAM_ARG) + .create()); + hbaseOpts.addOption(OptionBuilder.withArgName("col") + .hasArg() + .withDescription("Specifies which input column to use as the row key") + .withLongOpt(HBASE_ROW_KEY_ARG) + .create()); + hbaseOpts.addOption(OptionBuilder + .withDescription("If specified, create missing HBase tables") + .withLongOpt(HBASE_CREATE_TABLE_ARG) + .create()); + + return hbaseOpts; + } + + /** * Apply common command-line to the state. @@ -582,6 +615,24 @@ protected void applyCodeGenOptions(CommandLine in, SqoopOptions out, } } + protected void applyHBaseOptions(CommandLine in, SqoopOptions out) { + if (in.hasOption(HBASE_TABLE_ARG)) { + out.setHBaseTable(in.getOptionValue(HBASE_TABLE_ARG)); + } + + if (in.hasOption(HBASE_COL_FAM_ARG)) { + out.setHBaseColFamily(in.getOptionValue(HBASE_COL_FAM_ARG)); + } + + if (in.hasOption(HBASE_ROW_KEY_ARG)) { + out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG)); + } + + if (in.hasOption(HBASE_CREATE_TABLE_ARG)) { + out.setCreateHBaseTable(true); + } + } + protected void validateCommonOptions(SqoopOptions options) throws InvalidOptionsException { if (options.getConnectString() == null) { @@ -623,10 +674,21 @@ protected void validateOutputFormatOptions(SqoopOptions options) } } - protected void validateHiveOptions(SqoopOptions options) { + protected void validateHiveOptions(SqoopOptions options) + throws InvalidOptionsException { // Empty; this method is present to maintain API consistency, and // is reserved for future constraints on Hive options. } + protected void validateHBaseOptions(SqoopOptions options) + throws InvalidOptionsException { + if ((options.getHBaseColFamily() != null && options.getHBaseTable() == null) + || (options.getHBaseColFamily() == null + && options.getHBaseTable() != null)) { + throw new InvalidOptionsException( + "Both --hbase-table and --column-family must be set together." + + HELP_STR); + } + } } diff --git a/src/java/com/cloudera/sqoop/tool/ImportTool.java b/src/java/com/cloudera/sqoop/tool/ImportTool.java index 460fcbc1..b69c17d8 100644 --- a/src/java/com/cloudera/sqoop/tool/ImportTool.java +++ b/src/java/com/cloudera/sqoop/tool/ImportTool.java @@ -108,6 +108,7 @@ protected void importTable(SqoopOptions options, String tableName, /** * @return the output path for the imported files; * in append mode this will point to a temporary folder. + * if importing to hbase, this may return null. */ private Path getOutputPath(SqoopOptions options, String tableName) { // Get output directory @@ -124,7 +125,7 @@ private Path getOutputPath(SqoopOptions options, String tableName) { outputPath = new Path(hdfsTargetDir); } else if (hdfsWarehouseDir != null) { outputPath = new Path(hdfsWarehouseDir, tableName); - } else { + } else if (null != tableName) { outputPath = new Path(tableName); } } @@ -272,6 +273,7 @@ public void configureOptions(ToolOptions toolOptions) { toolOptions.addUniqueOptions(getOutputFormatOptions()); toolOptions.addUniqueOptions(getInputFormatOptions()); toolOptions.addUniqueOptions(getHiveOptions(true)); + toolOptions.addUniqueOptions(getHBaseOptions()); // get common codegen opts. RelatedOptions codeGenOpts = getCodeGenOpts(allTables); @@ -384,6 +386,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) applyOutputFormatOptions(in, out); applyInputFormatOptions(in, out); applyCodeGenOptions(in, out, allTables); + applyHBaseOptions(in, out); } catch (NumberFormatException nfe) { throw new InvalidOptionsException("Error: expected numeric argument.\n" + "Try --help for usage."); @@ -417,7 +420,7 @@ protected void validateImportOptions(SqoopOptions options) "Cannot specify --" + SQL_QUERY_ARG + " and --table together." + HELP_STR); } else if (options.getSqlQuery() != null - && options.getTargetDir() == null) { + && options.getTargetDir() == null && options.getHBaseTable() == null) { throw new InvalidOptionsException( "Must specify destination with --target-dir." + HELP_STR); @@ -458,6 +461,7 @@ public void validateOptions(SqoopOptions options) validateCommonOptions(options); validateCodeGenOptions(options); validateOutputFormatOptions(options); + validateHBaseOptions(options); } } diff --git a/src/scripts/hudson/run-code-quality.sh b/src/scripts/hudson/run-code-quality.sh index 708c5386..2f569275 100755 --- a/src/scripts/hudson/run-code-quality.sh +++ b/src/scripts/hudson/run-code-quality.sh @@ -43,7 +43,8 @@ ${ANT} clean jar-all-shims findbugs javadoc cobertura checkstyle \ -Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST} \ -Dcobertura.home=${COBERTURA_HOME} -Dcobertura.format=xml \ -Dfindbugs.home=${FINDBUGS_HOME} \ - -Dtest.junit.output.format=xml + -Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \ + -Dtest.junit.output.format=xml ${ANT_ARGUMENTS} if [ "$?" != "0" ]; then echo "Error during compilation phase. Aborting!" @@ -56,7 +57,8 @@ ${ANT} cobertura \ -Dhadoop.dist=${COMPILE_HADOOP_DIST} \ -Dcobertura.home=${COBERTURA_HOME} -Dcobertura.format=xml \ -Dsqoop.thirdparty.lib.dir=${THIRDPARTY_LIBS} \ - -Dtestcase=ThirdPartyTests + -Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \ + -Dtestcase=ThirdPartyTests ${ANT_ARGUMENTS} if [ "$?" != "0" ]; then echo "Unit tests failed!" diff --git a/src/scripts/hudson/run-tests.sh b/src/scripts/hudson/run-tests.sh index 8f574196..03b687f6 100755 --- a/src/scripts/hudson/run-tests.sh +++ b/src/scripts/hudson/run-tests.sh @@ -29,7 +29,9 @@ source ${bin}/test-config.sh # Run compilation step. -${ANT} clean jar -Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST} +${ANT} clean jar -Divy.home=$IVY_HOME -Dhadoop.dist=${COMPILE_HADOOP_DIST} \ + -Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \ + ${ANT_ARGUMENTS} if [ "$?" != "0" ]; then echo "Error during compilation phase. Aborting!" exit 1 @@ -40,7 +42,8 @@ testfailed=0 # Run basic unit tests. ${ANT} clean-cache test -Divy.home=$IVY_HOME -Dtest.junit.output.format=xml \ - -Dhadoop.dist=${TEST_HADOOP_DIST} + -Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \ + -Dhadoop.dist=${TEST_HADOOP_DIST} ${ANT_ARGUMENTS} if [ "$?" != "0" ]; then testfailed=1 fi @@ -53,7 +56,8 @@ fi ${ANT} test -Dthirdparty=true -Dsqoop.thirdparty.lib.dir=${THIRDPARTY_LIBS} \ -Dtest.junit.output.format=xml -Divy.home=$IVY_HOME \ - -Dhadoop.dist=${TEST_HADOOP_DIST} + -Dhbase.home=${HBASE_HOME} -Dzookeeper.home=${ZOOKEEPER_HOME} \ + -Dhadoop.dist=${TEST_HADOOP_DIST} ${ANT_ARGUMENTS} if [ "$?" != "0" ]; then testfailed=1 fi diff --git a/src/scripts/hudson/test-config.sh b/src/scripts/hudson/test-config.sh index 97650649..c30cd723 100755 --- a/src/scripts/hudson/test-config.sh +++ b/src/scripts/hudson/test-config.sh @@ -45,3 +45,10 @@ export TEST_HADOOP_DIST=${TEST_HADOOP_DIST:-apache} export WORKSPACE=${WORKSPACE:-$projroot} export IVY_HOME=${IVY_HOME:-$WORKSPACE/.ivy2} +export HBASE_HOME=${HBASE_HOME:-/usr/lib/hbase} +export ZOOKEEPER_HOME=${ZOOKEEPER_HOME:-/usr/lib/zookeeper} + +if [ -z "${ANT_ARGUMENTS}" ]; then + export ANT_ARGUMENTS="" +fi + diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java index 9013c55e..c5353091 100644 --- a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java @@ -95,16 +95,7 @@ public void checkOutputSpecs(JobContext context) /** {@inheritDoc} */ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return new OutputCommitter() { - public void abortTask(TaskAttemptContext taskContext) { } - public void cleanupJob(JobContext jobContext) { } - public void commitTask(TaskAttemptContext taskContext) { } - public boolean needsTaskCommit(TaskAttemptContext taskContext) { - return false; - } - public void setupJob(JobContext jobContext) { } - public void setupTask(TaskAttemptContext taskContext) { } - }; + return new NullOutputCommitter(); } /** diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java new file mode 100644 index 00000000..d9ddd861 --- /dev/null +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java @@ -0,0 +1,136 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.mapreduce; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +import com.cloudera.sqoop.lib.FieldMappable; +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.lib.ProcessingException; + +/** + * OutputFormat that produces a RecordReader which instantiates + * a FieldMapProcessor which will process FieldMappable + * output keys. + * + *

The output value is ignored.

+ * + *

The FieldMapProcessor implementation may do any arbitrary + * processing on the object. For example, it may write an object + * to HBase, etc.

+ * + *

If the FieldMapProcessor implementation also implements + * Closeable, it will be close()'d in the RecordReader's close() + * method.

+ * + *

If the FMP implements Configurable, it will be configured + * correctly via ReflectionUtils.

+ */ +public class DelegatingOutputFormat + extends OutputFormat { + + /** conf key: the FieldMapProcessor class to instantiate. */ + public static final String DELEGATE_CLASS_KEY = + "sqoop.output.delegate.field.map.processor.class"; + + @Override + /** {@inheritDoc} */ + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + if (null == conf.get(DELEGATE_CLASS_KEY)) { + throw new IOException("Delegate FieldMapProcessor class is not set."); + } + } + + @Override + /** {@inheritDoc} */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new NullOutputCommitter(); + } + + @Override + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException { + try { + return new DelegatingRecordWriter(context); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } + + /** + * RecordWriter to write the output to a row in a database table. + * The actual database updates are executed in a second thread. + */ + public class DelegatingRecordWriter extends RecordWriter { + + private Configuration conf; + + private FieldMapProcessor mapProcessor; + + public DelegatingRecordWriter(TaskAttemptContext context) + throws ClassNotFoundException { + + this.conf = context.getConfiguration(); + + @SuppressWarnings("unchecked") + Class procClass = + (Class) + conf.getClass(DELEGATE_CLASS_KEY, null); + this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf); + } + + protected Configuration getConf() { + return this.conf; + } + + @Override + /** {@inheritDoc} */ + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + if (mapProcessor instanceof Closeable) { + ((Closeable) mapProcessor).close(); + } + } + + @Override + /** {@inheritDoc} */ + public void write(K key, V value) + throws InterruptedException, IOException { + try { + mapProcessor.accept(key); + } catch (ProcessingException pe) { + throw new IOException(pe); + } + } + } +} diff --git a/src/test/com/cloudera/sqoop/AllTests.java b/src/test/com/cloudera/sqoop/AllTests.java index e79f7451..3d82a242 100644 --- a/src/test/com/cloudera/sqoop/AllTests.java +++ b/src/test/com/cloudera/sqoop/AllTests.java @@ -18,6 +18,9 @@ package com.cloudera.sqoop; +import com.cloudera.sqoop.hbase.TestHBaseImport; +import com.cloudera.sqoop.hbase.TestHBaseQueryImport; + import junit.framework.Test; import junit.framework.TestSuite; @@ -33,6 +36,8 @@ public static Test suite() { suite.addTest(SmokeTests.suite()); suite.addTest(ThirdPartyTests.suite()); + suite.addTestSuite(TestHBaseImport.class); + suite.addTestSuite(TestHBaseQueryImport.class); return suite; } diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java new file mode 100644 index 00000000..52689b12 --- /dev/null +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java @@ -0,0 +1,136 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.AfterClass; +import org.junit.Before; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +/** + * Utility methods that facilitate HBase import tests. + */ +public class HBaseTestCase extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags, + String hbaseTable, String hbaseColFam, boolean hbaseCreate, + String queryStr) { + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + if (null != queryStr) { + args.add("--query"); + args.add(queryStr); + } else { + args.add("--table"); + args.add(getTableName()); + } + args.add("--split-by"); + args.add(getColName(0)); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--num-mappers"); + args.add("1"); + args.add("--column-family"); + args.add(hbaseColFam); + args.add("--hbase-table"); + args.add(hbaseTable); + if (hbaseCreate) { + args.add("--hbase-create-table"); + } + + return args.toArray(new String[0]); + } + + // Starts a mini hbase cluster in this process. + private HBaseTestingUtility hbaseTestUtil; + + private void startMaster() throws Exception { + if (null == hbaseTestUtil) { + Configuration conf = new Configuration(); + conf = HBaseConfiguration.addHbaseResources(conf); + hbaseTestUtil = new HBaseTestingUtility(conf); + hbaseTestUtil.startMiniCluster(1); + } + } + + @Override + @Before + public void setUp() { + try { + startMaster(); + } catch (Exception e) { + fail(e.toString()); + } + super.setUp(); + } + + + @AfterClass + public void shutdown() throws Exception { + LOG.info("In shutdown() method"); + if (null != hbaseTestUtil) { + LOG.info("Shutting down HBase cluster"); + hbaseTestUtil.shutdownMiniCluster(); + this.hbaseTestUtil = null; + } + } + + protected void verifyHBaseCell(String tableName, String rowKey, + String colFamily, String colName, String val) throws IOException { + Get get = new Get(Bytes.toBytes(rowKey)); + get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName)); + HTable table = new HTable(Bytes.toBytes(tableName)); + try { + Result r = table.get(get); + byte [] actualVal = r.getValue(Bytes.toBytes(colFamily), + Bytes.toBytes(colName)); + if (null == val) { + assertNull("Got a result when expected null", actualVal); + } else { + assertNotNull("No result, but we expected one", actualVal); + assertEquals(val, Bytes.toString(actualVal)); + } + } finally { + table.close(); + } + } +} diff --git a/src/test/com/cloudera/sqoop/hbase/TestHBaseImport.java b/src/test/com/cloudera/sqoop/hbase/TestHBaseImport.java new file mode 100644 index 00000000..a4466215 --- /dev/null +++ b/src/test/com/cloudera/sqoop/hbase/TestHBaseImport.java @@ -0,0 +1,96 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.IOException; + +import org.junit.Test; + +/** + * Test imports of tables into HBase. + */ +public class TestHBaseImport extends HBaseTestCase { + + @Test + public void testBasicUsage() throws IOException { + // Create the HBase table in Sqoop as we run the job. + String [] argv = getArgv(true, "BasicUsage", "BasicColFam", true, null); + String [] types = { "INT", "INT" }; + String [] vals = { "0", "1" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("BasicUsage", "0", "BasicColFam", getColName(1), "1"); + } + + @Test + public void testMissingTableFails() throws IOException { + // Test that if the table doesn't exist, we fail unless we + // explicitly create the table. + String [] argv = getArgv(true, "MissingTable", "MissingFam", false, null); + String [] types = { "INT", "INT" }; + String [] vals = { "0", "1" }; + createTableWithColTypes(types, vals); + try { + runImport(argv); + fail("Expected IOException"); + } catch (IOException ioe) { + LOG.info("Got exception -- ok; we expected that job to fail."); + } + } + + @Test + public void testOverwriteSucceeds() throws IOException { + // Test that we can create a table and then import immediately + // back on top of it without problem. + String [] argv = getArgv(true, "OverwriteT", "OverwriteF", true, null); + String [] types = { "INT", "INT" }; + String [] vals = { "0", "1" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("OverwriteT", "0", "OverwriteF", getColName(1), "1"); + // Run a second time. + runImport(argv); + verifyHBaseCell("OverwriteT", "0", "OverwriteF", getColName(1), "1"); + } + + @Test + public void testStrings() throws IOException { + String [] argv = getArgv(true, "stringT", "stringF", true, null); + String [] types = { "INT", "VARCHAR(32)" }; + String [] vals = { "0", "'abc'" }; + createTableWithColTypes(types, vals); + runImport(argv); + verifyHBaseCell("stringT", "0", "stringF", getColName(1), "abc"); + } + + @Test + public void testNulls() throws IOException { + String [] argv = getArgv(true, "nullT", "nullF", true, null); + String [] types = { "INT", "INT", "INT" }; + String [] vals = { "0", "42", "null" }; + createTableWithColTypes(types, vals); + runImport(argv); + + // This cell should import correctly. + verifyHBaseCell("nullT", "0", "nullF", getColName(1), "42"); + + // This cell should not be placed in the results.. + verifyHBaseCell("nullT", "0", "nullF", getColName(2), null); + } +} diff --git a/src/test/com/cloudera/sqoop/hbase/TestHBaseQueryImport.java b/src/test/com/cloudera/sqoop/hbase/TestHBaseQueryImport.java new file mode 100644 index 00000000..b40d7aba --- /dev/null +++ b/src/test/com/cloudera/sqoop/hbase/TestHBaseQueryImport.java @@ -0,0 +1,47 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.hbase; + +import java.io.IOException; + +import org.junit.Test; + +/** + * Test import of free-form query into HBase. + */ +public class TestHBaseQueryImport extends HBaseTestCase { + + @Test + public void testImportFromQuery() throws IOException { + String [] types = { "INT", "INT", "INT" }; + String [] vals = { "0", "42", "43" }; + createTableWithColTypes(types, vals); + + String [] argv = getArgv(true, "queryT", "queryF", true, + "SELECT " + getColName(0) + ", " + getColName(1) + " FROM " + + getTableName() + " WHERE $CONDITIONS"); + runImport(argv); + + // This cell should import correctly. + verifyHBaseCell("queryT", "0", "queryF", getColName(1), "42"); + + // This cell should not be placed in the results.. + verifyHBaseCell("queryT", "0", "queryF", getColName(2), null); + } +} diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java index 9efab286..e675df92 100644 --- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java @@ -222,6 +222,10 @@ public void tearDown() { static final String BASE_COL_NAME = "DATA_COL"; + protected String getColName(int i) { + return BASE_COL_NAME + i; + } + /** * Drop a table if it already exists in the database. * @param table the name of the table to drop.