5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 05:01:10 +08:00

SQOOP-1032: Add the --bulk-load-dir option to support the HBase doBulkLoad function

(Alexandre Normand via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-10-10 16:57:37 -07:00
parent 66af31d13c
commit ddb81e185b
11 changed files with 344 additions and 5 deletions

View File

@ -33,5 +33,6 @@ Argument Description
attributes
+\--hbase-table <table-name>+ Specifies an HBase table to use as the \
target instead of HDFS
+\--hbase-bulkload+ Enables bulk loading
--------------------------------------------------------------------------

View File

@ -58,4 +58,5 @@ mode), and then inserts the UTF-8 bytes of this string in the target
cell. Sqoop will skip all rows containing null values in all columns
except the row key column.
To decrease the load on hbase, Sqoop can do bulk loading as opposed to
direct writes. To use bulk loading, enable it using +\--hbase-bulkload+.

View File

@ -230,6 +230,9 @@ public String toString() {
// Column of the input to use as the row key.
@StoredAsProperty("hbase.row.key.col") private String hbaseRowKeyCol;
// if true, bulk loading will be used.
@StoredAsProperty("hbase.bulk.load.enabled") private boolean hbaseBulkLoadEnabled;
// if true, create tables/col families.
@StoredAsProperty("hbase.create.table") private boolean hbaseCreateTable;
@ -1923,6 +1926,20 @@ public void setHBaseRowKeyColumn(String col) {
this.hbaseRowKeyCol = col;
}
/**
* @return true if bulk load is enabled and false otherwise.
*/
public boolean isBulkLoadEnabled() {
return this.hbaseBulkLoadEnabled;
}
/**
* Sets the temp dir to use as the bulk load dir in an hbase import.
*/
public void setHBaseBulkLoadEnabled(boolean hbaseBulkLoadEnabled) {
this.hbaseBulkLoadEnabled = hbaseBulkLoadEnabled;
}
/**
* Gets the target HBase table name, if any.
*/

View File

@ -66,6 +66,12 @@ public class HBasePutProcessor implements Closeable, Configurable,
public static final String TRANSFORMER_CLASS_KEY =
"sqoop.hbase.insert.put.transformer.class";
/**
* Configuration key to enable/disable hbase bulkLoad.
*/
public static final String BULK_LOAD_ENABLED_KEY =
"sqoop.hbase.bulk.load.enabled";
/** Configuration key to specify whether to add the row key column into
* HBase. Set to false by default.
*/

View File

@ -181,8 +181,13 @@ private List<Put> putRecordInHBase(Map<String, Object> record,
// check addRowKey flag before including rowKey field.
Object val = fieldEntry.getValue();
if (null != val) {
put.add(colFamilyBytes, getFieldNameBytes(colName),
Bytes.toBytes(toHBaseString(val)));
if ( val instanceof byte[]) {
put.add(colFamilyBytes, getFieldNameBytes(colName),
(byte[])val);
} else {
put.add(colFamilyBytes, getFieldNameBytes(colName),
Bytes.toBytes(toHBaseString(val)));
}
}
}
}

View File

@ -41,7 +41,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
import org.apache.sqoop.util.SqlTypeMap;
import com.cloudera.sqoop.SqoopOptions;
@ -587,7 +589,11 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
throw new ImportException("HBase jars are not present in "
+ "classpath, cannot import to HBase!");
}
importer = new HBaseImportJob(opts, context);
if(!opts.isBulkLoadEnabled()){
importer = new HBaseImportJob(opts, context);
} else {
importer = new HBaseBulkImportJob(opts, context);
}
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
@ -619,7 +625,11 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
throw new ImportException("HBase jars are not present in classpath,"
+ " cannot import to HBase!");
}
importer = new HBaseImportJob(opts, context);
if(!opts.isBulkLoadEnabled()){
importer = new HBaseImportJob(opts, context);
} else {
importer = new HBaseBulkImportJob(opts, context);
}
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.cloudera.sqoop.manager.ImportJobContext;
import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.SqoopOptions;
import com.google.common.base.Preconditions;
/**
* Runs an HBase bulk import via DataDrivenDBInputFormat to the
* HBasePutProcessor in the DelegatingOutputFormat.
*/
public class HBaseBulkImportJob extends HBaseImportJob {
public static final Log LOG = LogFactory.getLog(
HBaseBulkImportJob.class.getName());
public HBaseBulkImportJob(final SqoopOptions opts,
final ImportJobContext importContext) {
super(opts, importContext);
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(getMapperClass());
}
@Override
protected Class<? extends Mapper> getMapperClass() {
return HBaseBulkImportMapper.class;
}
@Override
protected void jobSetup(Job job) throws IOException, ImportException {
super.jobSetup(job);
// we shouldn't have gotten here if bulk load dir is not set
// so let's throw a ImportException
if(getContext().getDestination() == null){
throw new ImportException("Can't run HBaseBulkImportJob without a " +
"valid destination directory.");
}
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
FileOutputFormat.setOutputPath(job, getContext().getDestination());
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
HFileOutputFormat.configureIncrementalLoad(job, hTable);
}
/**
* Perform the loading of Hfiles.
*/
@Override
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
// Load generated HFiles into table
try {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
job.getConfiguration());
loader.doBulkLoad(bulkLoadDir, hTable);
}
catch (Exception e) {
String errorMessage = String.format("Unrecoverable error while " +
"performing the bulk load of files in [%s]",
bulkLoadDir.toString());
throw new ImportException(errorMessage, e);
}
}
@Override
protected void jobTeardown(Job job) throws IOException, ImportException {
super.jobTeardown(job);
// Delete the hfiles directory after we are finished.
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
fileSystem.delete(getContext().getDestination(), true);
}
/**
* Set the file permission of the path of the given fileStatus. If the path
* is a directory, apply permission recursively to all subdirectories and
* files.
*
* @param fs the filesystem
* @param fileStatus containing the path
* @param permission the permission
* @throws java.io.IOException
*/
private void setPermission(FileSystem fs, FileStatus fileStatus,
FsPermission permission) throws IOException {
if(fileStatus.isDir()) {
for(FileStatus file : fs.listStatus(fileStatus.getPath())){
setPermission(fs, file, permission);
}
}
fs.setPermission(fileStatus.getPath(), permission);
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.hbase.PutTransformer;
import org.apache.sqoop.hbase.ToStringPutTransformer;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
import static org.apache.sqoop.hbase.HBasePutProcessor.*;
/**
* Imports records by writing them to HBase via the DelegatingOutputFormat
* and the HBasePutProcessor.
*/
public class HBaseBulkImportMapper
extends AutoProgressMapper
<LongWritable, SqoopRecord, ImmutableBytesWritable, Put> {
private LargeObjectLoader lobLoader;
//An object that can transform a map of fieldName->object
// into a Put command.
private PutTransformer putTransformer;
private Configuration conf;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
this.conf = context.getConfiguration();
Path largeFilePath = new Path(this.conf.get("sqoop.hbase.lob.extern.dir",
"/tmp/sqoop-hbase-" + context.getTaskAttemptID()));
this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
largeFilePath);
// Get the implementation of PutTransformer to use.
// By default, we call toString() on every non-null field.
Class<? extends PutTransformer> xformerClass =
(Class<? extends PutTransformer>)
this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
this.putTransformer = (PutTransformer)
ReflectionUtils.newInstance(xformerClass, this.conf);
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
throws IOException, InterruptedException {
try {
// Loading of LOBs was delayed until we have a Context.
val.loadLargeObjects(lobLoader);
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
Map<String, Object> fields = val.getFieldMap();
List<Put> putList = putTransformer.getPutCommand(fields);
for(Put put: putList){
context.write(new ImmutableBytesWritable(put.getRow()), put);
}
}
@Override
protected void cleanup(Context context) throws IOException {
if (null != lobLoader) {
lobLoader.close();
}
}
}

View File

@ -249,6 +249,8 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
throw new ImportException("Import job failed!");
}
completeImport(job);
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
@ -262,6 +264,13 @@ public void runImport(String tableName, String ormJarFile, String splitByCol,
}
}
/**
* Perform any operation that needs to be done post map/reduce job to
* complete the import.
*/
protected void completeImport(Job job) throws IOException, ImportException {
}
protected void validateImport(String tableName, Configuration conf, Job job)
throws ImportException {
LOG.debug("Validating imported data.");

View File

@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Properties;
import com.cloudera.sqoop.util.ImportException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@ -175,6 +176,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String HBASE_TABLE_ARG = "hbase-table";
public static final String HBASE_COL_FAM_ARG = "column-family";
public static final String HBASE_ROW_KEY_ARG = "hbase-row-key";
public static final String HBASE_BULK_LOAD_ENABLED_ARG =
"hbase-bulkload";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
@ -709,6 +712,10 @@ protected RelatedOptions getHBaseOptions() {
.withDescription("Specifies which input column to use as the row key")
.withLongOpt(HBASE_ROW_KEY_ARG)
.create());
hbaseOpts.addOption(OptionBuilder
.withDescription("Enables HBase bulk loading")
.withLongOpt(HBASE_BULK_LOAD_ENABLED_ARG)
.create());
hbaseOpts.addOption(OptionBuilder
.withDescription("If specified, create missing HBase tables")
.withLongOpt(HBASE_CREATE_TABLE_ARG)
@ -1076,6 +1083,8 @@ protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG));
}
out.setHBaseBulkLoadEnabled(in.hasOption(HBASE_BULK_LOAD_ENABLED_ARG));
if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
out.setCreateHBaseTable(true);
}
@ -1326,6 +1335,14 @@ protected void validateHBaseOptions(SqoopOptions options)
throw new InvalidOptionsException("Direct import is incompatible with "
+ "HBase. Please remove parameter --direct");
}
if (options.isBulkLoadEnabled() && options.getHBaseTable() == null) {
String validationMessage = String.format("Can't run import with %s " +
"without %s",
BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG,
BaseSqoopTool.HBASE_TABLE_ARG);
throw new InvalidOptionsException(validationMessage);
}
}
/**

View File

@ -20,6 +20,7 @@
import java.util.Properties;
import com.cloudera.sqoop.tool.BaseSqoopTool;
import junit.framework.TestCase;
import org.apache.commons.lang.ArrayUtils;
@ -433,4 +434,32 @@ public void testDeleteWithIncrementalImport() throws Exception {
}
}
// test that hbase bulk load import with table name and target dir
// passes validation
public void testHBaseBulkLoad() throws Exception {
String [] extraArgs = {
longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test",
longArgument(BaseSqoopTool.HBASE_TABLE_ARG), "test_table",
longArgument(BaseSqoopTool.HBASE_COL_FAM_ARG), "d"};
validateImportOptions(extraArgs);
}
// test that hbase bulk load import with a missing --hbase-table fails
public void testHBaseBulkLoadMissingHbaseTable() throws Exception {
String [] extraArgs = {
longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test"};
try {
validateImportOptions(extraArgs);
fail("Expected InvalidOptionsException");
} catch (SqoopOptions.InvalidOptionsException ioe) {
// Expected
}
}
private static String longArgument(String argument) {
return String.format("--%s", argument);
}
}