diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt index 8ba23eb5..53040f50 100644 --- a/src/docs/user/hbase-args.txt +++ b/src/docs/user/hbase-args.txt @@ -33,5 +33,6 @@ Argument Description attributes +\--hbase-table + Specifies an HBase table to use as the \ target instead of HDFS ++\--hbase-bulkload+ Enables bulk loading -------------------------------------------------------------------------- diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt index 34f9875b..ab4aedc0 100644 --- a/src/docs/user/hbase.txt +++ b/src/docs/user/hbase.txt @@ -58,4 +58,5 @@ mode), and then inserts the UTF-8 bytes of this string in the target cell. Sqoop will skip all rows containing null values in all columns except the row key column. - +To decrease the load on hbase, Sqoop can do bulk loading as opposed to +direct writes. To use bulk loading, enable it using +\--hbase-bulkload+. diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 01805f98..836f5889 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -230,6 +230,9 @@ public String toString() { // Column of the input to use as the row key. @StoredAsProperty("hbase.row.key.col") private String hbaseRowKeyCol; + // if true, bulk loading will be used. + @StoredAsProperty("hbase.bulk.load.enabled") private boolean hbaseBulkLoadEnabled; + // if true, create tables/col families. @StoredAsProperty("hbase.create.table") private boolean hbaseCreateTable; @@ -1923,6 +1926,20 @@ public void setHBaseRowKeyColumn(String col) { this.hbaseRowKeyCol = col; } + /** + * @return true if bulk load is enabled and false otherwise. + */ + public boolean isBulkLoadEnabled() { + return this.hbaseBulkLoadEnabled; + } + + /** + * Sets the temp dir to use as the bulk load dir in an hbase import. + */ + public void setHBaseBulkLoadEnabled(boolean hbaseBulkLoadEnabled) { + this.hbaseBulkLoadEnabled = hbaseBulkLoadEnabled; + } + /** * Gets the target HBase table name, if any. */ diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index 9ceb5bdd..b2431ac0 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -66,6 +66,12 @@ public class HBasePutProcessor implements Closeable, Configurable, public static final String TRANSFORMER_CLASS_KEY = "sqoop.hbase.insert.put.transformer.class"; + /** + * Configuration key to enable/disable hbase bulkLoad. + */ + public static final String BULK_LOAD_ENABLED_KEY = + "sqoop.hbase.bulk.load.enabled"; + /** Configuration key to specify whether to add the row key column into * HBase. Set to false by default. */ diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index 5ccf311a..b5cad1db 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -181,8 +181,13 @@ private List putRecordInHBase(Map record, // check addRowKey flag before including rowKey field. Object val = fieldEntry.getValue(); if (null != val) { - put.add(colFamilyBytes, getFieldNameBytes(colName), - Bytes.toBytes(toHBaseString(val))); + if ( val instanceof byte[]) { + put.add(colFamilyBytes, getFieldNameBytes(colName), + (byte[])val); + } else { + put.add(colFamilyBytes, getFieldNameBytes(colName), + Bytes.toBytes(toHBaseString(val))); + } } } } diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index 2a4992d1..1ffa40f4 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -41,7 +41,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sqoop.mapreduce.JdbcCallExportJob; +import org.apache.sqoop.tool.BaseSqoopTool; import org.apache.sqoop.util.LoggingUtils; +import org.apache.sqoop.mapreduce.HBaseBulkImportJob; import org.apache.sqoop.util.SqlTypeMap; import com.cloudera.sqoop.SqoopOptions; @@ -587,7 +589,11 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) throw new ImportException("HBase jars are not present in " + "classpath, cannot import to HBase!"); } - importer = new HBaseImportJob(opts, context); + if(!opts.isBulkLoadEnabled()){ + importer = new HBaseImportJob(opts, context); + } else { + importer = new HBaseBulkImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), @@ -619,7 +625,11 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context) throw new ImportException("HBase jars are not present in classpath," + " cannot import to HBase!"); } - importer = new HBaseImportJob(opts, context); + if(!opts.isBulkLoadEnabled()){ + importer = new HBaseImportJob(opts, context); + } else { + importer = new HBaseBulkImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java new file mode 100644 index 00000000..b32cdd1c --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.cloudera.sqoop.SqoopOptions; +import com.google.common.base.Preconditions; + +/** + * Runs an HBase bulk import via DataDrivenDBInputFormat to the + * HBasePutProcessor in the DelegatingOutputFormat. + */ +public class HBaseBulkImportJob extends HBaseImportJob { + + public static final Log LOG = LogFactory.getLog( + HBaseBulkImportJob.class.getName()); + + public HBaseBulkImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return HBaseBulkImportMapper.class; + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + + // we shouldn't have gotten here if bulk load dir is not set + // so let's throw a ImportException + if(getContext().getDestination() == null){ + throw new ImportException("Can't run HBaseBulkImportJob without a " + + "valid destination directory."); + } + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class); + FileOutputFormat.setOutputPath(job, getContext().getDestination()); + HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); + HFileOutputFormat.configureIncrementalLoad(job, hTable); + } + + /** + * Perform the loading of Hfiles. + */ + @Override + protected void completeImport(Job job) throws IOException, ImportException { + super.completeImport(job); + + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + + // Make the bulk load files source directory accessible to the world + // so that the hbase user can deal with it + Path bulkLoadDir = getContext().getDestination(); + setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), + FsPermission.createImmutable((short) 00777)); + + HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); + + // Load generated HFiles into table + try { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hTable); + } + catch (Exception e) { + String errorMessage = String.format("Unrecoverable error while " + + "performing the bulk load of files in [%s]", + bulkLoadDir.toString()); + throw new ImportException(errorMessage, e); + } + } + + @Override + protected void jobTeardown(Job job) throws IOException, ImportException { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + fileSystem.delete(getContext().getDestination(), true); + } + + /** + * Set the file permission of the path of the given fileStatus. If the path + * is a directory, apply permission recursively to all subdirectories and + * files. + * + * @param fs the filesystem + * @param fileStatus containing the path + * @param permission the permission + * @throws java.io.IOException + */ + private void setPermission(FileSystem fs, FileStatus fileStatus, + FsPermission permission) throws IOException { + if(fileStatus.isDir()) { + for(FileStatus file : fs.listStatus(fileStatus.getPath())){ + setPermission(fs, file, permission); + } + } + fs.setPermission(fileStatus.getPath(), permission); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java new file mode 100644 index 00000000..9c9d6cd3 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.hbase.PutTransformer; +import org.apache.sqoop.hbase.ToStringPutTransformer; + +import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import static org.apache.sqoop.hbase.HBasePutProcessor.*; + +/** + * Imports records by writing them to HBase via the DelegatingOutputFormat + * and the HBasePutProcessor. + */ +public class HBaseBulkImportMapper + extends AutoProgressMapper + { + + private LargeObjectLoader lobLoader; + //An object that can transform a map of fieldName->object + // into a Put command. + private PutTransformer putTransformer; + private Configuration conf; + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + this.conf = context.getConfiguration(); + Path largeFilePath = new Path(this.conf.get("sqoop.hbase.lob.extern.dir", + "/tmp/sqoop-hbase-" + context.getTaskAttemptID())); + this.lobLoader = new LargeObjectLoader(context.getConfiguration(), + largeFilePath); + + // Get the implementation of PutTransformer to use. + // By default, we call toString() on every non-null field. + Class xformerClass = + (Class) + this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class); + this.putTransformer = (PutTransformer) + ReflectionUtils.newInstance(xformerClass, this.conf); + if (null == putTransformer) { + throw new RuntimeException("Could not instantiate PutTransformer."); + } + this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); + this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + } + @Override + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(lobLoader); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + Map fields = val.getFieldMap(); + + List putList = putTransformer.getPutCommand(fields); + for(Put put: putList){ + context.write(new ImmutableBytesWritable(put.getRow()), put); + } + } + @Override + protected void cleanup(Context context) throws IOException { + if (null != lobLoader) { + lobLoader.close(); + } + } +} + diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 36959e13..8b1493d0 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -249,6 +249,8 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, throw new ImportException("Import job failed!"); } + completeImport(job); + if (options.isValidationEnabled()) { validateImport(tableName, conf, job); } @@ -262,6 +264,13 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, } } + /** + * Perform any operation that needs to be done post map/reduce job to + * complete the import. + */ + protected void completeImport(Job job) throws IOException, ImportException { + } + protected void validateImport(String tableName, Configuration conf, Job job) throws ImportException { LOG.debug("Validating imported data."); diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index ebb18573..a1080d35 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Properties; +import com.cloudera.sqoop.util.ImportException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -175,6 +176,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String HBASE_TABLE_ARG = "hbase-table"; public static final String HBASE_COL_FAM_ARG = "column-family"; public static final String HBASE_ROW_KEY_ARG = "hbase-row-key"; + public static final String HBASE_BULK_LOAD_ENABLED_ARG = + "hbase-bulkload"; public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; @@ -709,6 +712,10 @@ protected RelatedOptions getHBaseOptions() { .withDescription("Specifies which input column to use as the row key") .withLongOpt(HBASE_ROW_KEY_ARG) .create()); + hbaseOpts.addOption(OptionBuilder + .withDescription("Enables HBase bulk loading") + .withLongOpt(HBASE_BULK_LOAD_ENABLED_ARG) + .create()); hbaseOpts.addOption(OptionBuilder .withDescription("If specified, create missing HBase tables") .withLongOpt(HBASE_CREATE_TABLE_ARG) @@ -1076,6 +1083,8 @@ protected void applyHBaseOptions(CommandLine in, SqoopOptions out) { out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG)); } + out.setHBaseBulkLoadEnabled(in.hasOption(HBASE_BULK_LOAD_ENABLED_ARG)); + if (in.hasOption(HBASE_CREATE_TABLE_ARG)) { out.setCreateHBaseTable(true); } @@ -1326,6 +1335,14 @@ protected void validateHBaseOptions(SqoopOptions options) throw new InvalidOptionsException("Direct import is incompatible with " + "HBase. Please remove parameter --direct"); } + + if (options.isBulkLoadEnabled() && options.getHBaseTable() == null) { + String validationMessage = String.format("Can't run import with %s " + + "without %s", + BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG, + BaseSqoopTool.HBASE_TABLE_ARG); + throw new InvalidOptionsException(validationMessage); + } } /** diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index 03e2504b..90bc08e7 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -20,6 +20,7 @@ import java.util.Properties; +import com.cloudera.sqoop.tool.BaseSqoopTool; import junit.framework.TestCase; import org.apache.commons.lang.ArrayUtils; @@ -433,4 +434,32 @@ public void testDeleteWithIncrementalImport() throws Exception { } } + // test that hbase bulk load import with table name and target dir + // passes validation + public void testHBaseBulkLoad() throws Exception { + String [] extraArgs = { + longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG), + longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test", + longArgument(BaseSqoopTool.HBASE_TABLE_ARG), "test_table", + longArgument(BaseSqoopTool.HBASE_COL_FAM_ARG), "d"}; + + validateImportOptions(extraArgs); + } + + // test that hbase bulk load import with a missing --hbase-table fails + public void testHBaseBulkLoadMissingHbaseTable() throws Exception { + String [] extraArgs = { + longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG), + longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test"}; + try { + validateImportOptions(extraArgs); + fail("Expected InvalidOptionsException"); + } catch (SqoopOptions.InvalidOptionsException ioe) { + // Expected + } + } + + private static String longArgument(String argument) { + return String.format("--%s", argument); + } }