diff --git a/build.xml b/build.xml
index 97e55028..7f948b3c 100644
--- a/build.xml
+++ b/build.xml
@@ -185,6 +185,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ivy.xml b/ivy.xml
index a5025305..ee1dafa3 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -55,6 +55,8 @@ under the License.
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
+
@@ -62,6 +64,7 @@ under the License.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index b2431ac0..fdbe1276 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -18,11 +18,9 @@
package org.apache.sqoop.hbase;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.ProcessingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -31,11 +29,11 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.sqoop.mapreduce.ImportJobBase;
-import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
/**
* SqoopRecordProcessor that performs an HBase "put" operation
@@ -105,21 +103,7 @@ public void setConf(Configuration config) {
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
-
- this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
- this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
-
- if (this.putTransformer instanceof ToStringPutTransformer) {
- ToStringPutTransformer stringPutTransformer =
- (ToStringPutTransformer) this.putTransformer;
- stringPutTransformer.bigDecimalFormatString =
- conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
- ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
- stringPutTransformer.addRowKey =
- conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY,
- HBasePutProcessor.ADD_ROW_KEY_DEFAULT);
- stringPutTransformer.detectCompositeKey();
- }
+ putTransformer.init(conf);
this.tableName = conf.get(TABLE_NAME_KEY, null);
try {
diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java
index 8d6bcac2..533467e5 100644
--- a/src/java/org/apache/sqoop/hbase/PutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
/**
@@ -33,6 +35,8 @@ public abstract class PutTransformer {
private String columnFamily;
private String rowKeyColumn;
+ public abstract void init(Configuration conf);
+
/**
* @return the default column family to insert into.
*/
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index b5cad1db..363e1456 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -18,6 +18,15 @@
package org.apache.sqoop.hbase;
+import com.cloudera.sqoop.hbase.PutTransformer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -27,13 +36,10 @@
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.hbase.PutTransformer;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT;
+import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
/**
* PutTransformer that calls toString on all non-null fields.
@@ -204,4 +210,14 @@ private String toHBaseString(Object val) {
return valString;
}
+ @Override
+ public void init(Configuration conf) {
+ setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+ setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+
+ this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT);
+ detectCompositeKey();
+ }
}
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
index 363b5d73..58ccee7b 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -66,8 +66,7 @@ protected void setup(Context context)
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
- this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
- this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+ putTransformer.init(conf);
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
index cfbb1d37..abf9f1cc 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
@@ -18,68 +18,126 @@
package com.cloudera.sqoop.hbase;
-import java.io.IOException;
-
+import junit.framework.JUnit4TestAdapter;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-/**
- *
- */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.StringUtils.join;
+
+@RunWith(Parameterized.class)
public class HBaseImportAddRowKeyTest extends HBaseTestCase {
+ @Parameterized.Parameters(name = "bulkLoad = {0}")
+ public static Iterable extends Object> bulkLoadParameters() {
+ return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } );
+ }
+
+ private String[] columnTypes;
+
+ private String[] columnValues;
+
+ private String hbaseTableName;
+
+ private String hbaseColumnFamily;
+
+ private String hbaseTmpDir;
+
+ private String hbaseBulkLoadDir;
+
+ private boolean bulkLoad;
+
+ public HBaseImportAddRowKeyTest(boolean bulkLoad) {
+ this.bulkLoad = bulkLoad;
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ columnTypes = new String[] { "INT", "INT" };
+ columnValues = new String[] { "0", "1" };
+ hbaseTableName = "addRowKeyTable";
+ hbaseColumnFamily = "addRowKeyFamily";
+ hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir";
+ hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir";
+ createTableWithColTypes(columnTypes, columnValues);
+ }
+
@Test
public void testAddRowKey() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
-
- String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
- String[] argv = new String[otherArg.length + 2];
- argv[0] = "-D";
- argv[1] = "sqoop.hbase.add.row.key=true";
- System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
+ String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should have been added
- verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0");
- verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), columnValues[0]);
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddRowKeyDefault() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
-
- String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null);
+ String[] argv = getImportArguments(false, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should not be added by default
- verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null);
- verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), null);
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddCompositeKey() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
+ String rowKey = getColName(0)+","+getColName(1);
- String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
- String[] argv = new String[otherArg.length + 4];
- argv[0]="-D";
- argv[1]="sqoop.hbase.add.row.key=true";
- System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
- argv[argv.length - 2] = "--hbase-row-key";
- argv[argv.length - 1] = getColName(0)+","+getColName(1);
+ String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily, rowKey);
runImport(argv);
// Row key should have been added
- verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0");
- verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(0), columnValues[0]);
+ verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(1), columnValues[1]);
+ }
+
+ private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily) {
+ return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, null);
+ }
+
+ private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily, String rowKey) {
+ List result = new ArrayList<>();
+
+ if (addRowKey) {
+ result.add("-D");
+ result.add("sqoop.hbase.add.row.key=true");
+ }
+ result.add("-D");
+ result.add("hbase.fs.tmp.dir=" + hbaseTmpDir);
+
+ result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, true, null)));
+
+ if(bulkLoad) {
+ result.add("--target-dir");
+ result.add(hbaseBulkLoadDir);
+ result.add("--hbase-bulkload");
+ }
+
+ if (!StringUtils.isBlank(rowKey)) {
+ result.add("--hbase-row-key");
+ result.add(rowKey);
+ }
+
+ return result.toArray(new String[result.size()]);
+ }
+
+ public static junit.framework.Test suite() {
+ return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class);
}
}
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index 37dc004d..ad92a07c 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -88,7 +88,7 @@ private static void restoreTestBuidlDataProperty() {
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
args.add("-D");
- args.add("hbase.zookeeper.property.clientPort=21818");
+ args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort);
}
if (null != queryStr) {
@@ -120,40 +120,33 @@ private static void restoreTestBuidlDataProperty() {
private String workDir = createTempDir().getAbsolutePath();
private MiniZooKeeperCluster zookeeperCluster;
private MiniHBaseCluster hbaseCluster;
+ private int zookeeperPort;
@Override
@Before
public void setUp() {
try {
+ String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
+ zookeeperCluster = new MiniZooKeeperCluster();
+ zookeeperCluster.startup(new File(zookeeperDir));
+ zookeeperPort = zookeeperCluster.getClientPort();
+
HBaseTestCase.recordTestBuildDataProperty();
String hbaseDir = new File(workDir, "hbase").getAbsolutePath();
String hbaseRoot = "file://" + hbaseDir;
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
//Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818);
+ hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
hbaseConf.setInt("hbase.master.info.port", -1);
hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500);
- String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
- int zookeeperPort = 21818;
- zookeeperCluster = new MiniZooKeeperCluster();
- Method m;
- Class> zkParam[] = {Integer.TYPE};
- try {
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
- zkParam);
- } catch (NoSuchMethodException e) {
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
- zkParam);
- }
- m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)});
- zookeeperCluster.startup(new File(zookeeperDir));
hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
HMaster master = hbaseCluster.getMaster();
Object serverName = master.getServerName();
String hostAndPort;
+ Method m;
if (serverName instanceof String) {
System.out.println("Server name is string, using HServerAddress.");
m = HMaster.class.getDeclaredMethod("getMasterAddress",