From b4afcf4179b13c25b5e9bd182d75cab5d2e6c8d1 Mon Sep 17 00:00:00 2001 From: Attila Szabo Date: Thu, 13 Oct 2016 14:38:21 +0200 Subject: [PATCH] SQOOP-2952: Fixing bug (row key not added into column family using --hbase-bulkload) (Szabolcs Vasas via Attila Szabo) --- build.xml | 14 ++ ivy.xml | 23 ++++ .../apache/sqoop/hbase/HBasePutProcessor.java | 32 ++--- .../apache/sqoop/hbase/PutTransformer.java | 4 + .../sqoop/hbase/ToStringPutTransformer.java | 30 +++- .../mapreduce/HBaseBulkImportMapper.java | 3 +- .../sqoop/hbase/HBaseImportAddRowKeyTest.java | 128 +++++++++++++----- .../cloudera/sqoop/hbase/HBaseTestCase.java | 25 ++-- 8 files changed, 175 insertions(+), 84 deletions(-) diff --git a/build.xml b/build.xml index 97e55028..7f948b3c 100644 --- a/build.xml +++ b/build.xml @@ -185,6 +185,20 @@ + + + + + + + + + + + + + + diff --git a/ivy.xml b/ivy.xml index a5025305..ee1dafa3 100644 --- a/ivy.xml +++ b/ivy.xml @@ -55,6 +55,8 @@ under the License. extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" /> + @@ -62,6 +64,7 @@ under the License. + + + + + + + + + + + + + + diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index b2431ac0..fdbe1276 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -18,11 +18,9 @@ package org.apache.sqoop.hbase; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.lib.FieldMappable; +import com.cloudera.sqoop.lib.ProcessingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; @@ -31,11 +29,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.sqoop.mapreduce.ImportJobBase; -import com.cloudera.sqoop.lib.FieldMappable; -import com.cloudera.sqoop.lib.FieldMapProcessor; -import com.cloudera.sqoop.lib.ProcessingException; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; /** * SqoopRecordProcessor that performs an HBase "put" operation @@ -105,21 +103,7 @@ public void setConf(Configuration config) { if (null == putTransformer) { throw new RuntimeException("Could not instantiate PutTransformer."); } - - this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); - this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); - - if (this.putTransformer instanceof ToStringPutTransformer) { - ToStringPutTransformer stringPutTransformer = - (ToStringPutTransformer) this.putTransformer; - stringPutTransformer.bigDecimalFormatString = - conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, - ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); - stringPutTransformer.addRowKey = - conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY, - HBasePutProcessor.ADD_ROW_KEY_DEFAULT); - stringPutTransformer.detectCompositeKey(); - } + putTransformer.init(conf); this.tableName = conf.get(TABLE_NAME_KEY, null); try { diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java index 8d6bcac2..533467e5 100644 --- a/src/java/org/apache/sqoop/hbase/PutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; /** @@ -33,6 +35,8 @@ public abstract class PutTransformer { private String columnFamily; private String rowKeyColumn; + public abstract void init(Configuration conf); + /** * @return the default column family to insert into. */ diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index b5cad1db..363e1456 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -18,6 +18,15 @@ package org.apache.sqoop.hbase; +import com.cloudera.sqoop.hbase.PutTransformer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.mapreduce.ImportJobBase; + import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -27,13 +36,10 @@ import java.util.Map; import java.util.TreeMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; - -import com.cloudera.sqoop.hbase.PutTransformer; +import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY; +import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT; +import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY; +import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY; /** * PutTransformer that calls toString on all non-null fields. @@ -204,4 +210,14 @@ private String toHBaseString(Object val) { return valString; } + @Override + public void init(Configuration conf) { + setColumnFamily(conf.get(COL_FAMILY_KEY, null)); + setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + + this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, + ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT); + this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT); + detectCompositeKey(); + } } diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java index 363b5d73..58ccee7b 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java @@ -66,8 +66,7 @@ protected void setup(Context context) if (null == putTransformer) { throw new RuntimeException("Could not instantiate PutTransformer."); } - this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); - this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + putTransformer.init(conf); } @Override public void map(LongWritable key, SqoopRecord val, Context context) diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java index cfbb1d37..abf9f1cc 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java @@ -18,68 +18,126 @@ package com.cloudera.sqoop.hbase; -import java.io.IOException; - +import junit.framework.JUnit4TestAdapter; +import org.apache.commons.lang.StringUtils; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -/** - * - */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.apache.commons.lang.StringUtils.join; + +@RunWith(Parameterized.class) public class HBaseImportAddRowKeyTest extends HBaseTestCase { + @Parameterized.Parameters(name = "bulkLoad = {0}") + public static Iterable bulkLoadParameters() { + return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } ); + } + + private String[] columnTypes; + + private String[] columnValues; + + private String hbaseTableName; + + private String hbaseColumnFamily; + + private String hbaseTmpDir; + + private String hbaseBulkLoadDir; + + private boolean bulkLoad; + + public HBaseImportAddRowKeyTest(boolean bulkLoad) { + this.bulkLoad = bulkLoad; + } + + @Before + public void setUp() { + super.setUp(); + columnTypes = new String[] { "INT", "INT" }; + columnValues = new String[] { "0", "1" }; + hbaseTableName = "addRowKeyTable"; + hbaseColumnFamily = "addRowKeyFamily"; + hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir"; + hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir"; + createTableWithColTypes(columnTypes, columnValues); + } + @Test public void testAddRowKey() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); - - String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null); - String[] argv = new String[otherArg.length + 2]; - argv[0] = "-D"; - argv[1] = "sqoop.hbase.add.row.key=true"; - System.arraycopy(otherArg, 0, argv, 2, otherArg.length); + String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily); runImport(argv); // Row key should have been added - verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0"); - verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), columnValues[0]); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]); } @Test public void testAddRowKeyDefault() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); - - String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null); + String[] argv = getImportArguments(false, hbaseTableName, hbaseColumnFamily); runImport(argv); // Row key should not be added by default - verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null); - verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), null); + verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]); } @Test public void testAddCompositeKey() throws IOException { - String[] types = { "INT", "INT" }; - String[] vals = { "0", "1" }; - createTableWithColTypes(types, vals); + String rowKey = getColName(0)+","+getColName(1); - String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null); - String[] argv = new String[otherArg.length + 4]; - argv[0]="-D"; - argv[1]="sqoop.hbase.add.row.key=true"; - System.arraycopy(otherArg, 0, argv, 2, otherArg.length); - argv[argv.length - 2] = "--hbase-row-key"; - argv[argv.length - 1] = getColName(0)+","+getColName(1); + String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily, rowKey); runImport(argv); // Row key should have been added - verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0"); - verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1"); + verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(0), columnValues[0]); + verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(1), columnValues[1]); + } + + private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily) { + return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, null); + } + + private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily, String rowKey) { + List result = new ArrayList<>(); + + if (addRowKey) { + result.add("-D"); + result.add("sqoop.hbase.add.row.key=true"); + } + result.add("-D"); + result.add("hbase.fs.tmp.dir=" + hbaseTmpDir); + + result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, true, null))); + + if(bulkLoad) { + result.add("--target-dir"); + result.add(hbaseBulkLoadDir); + result.add("--hbase-bulkload"); + } + + if (!StringUtils.isBlank(rowKey)) { + result.add("--hbase-row-key"); + result.add(rowKey); + } + + return result.toArray(new String[result.size()]); + } + + public static junit.framework.Test suite() { + return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class); } } diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java index 37dc004d..ad92a07c 100644 --- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java +++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java @@ -88,7 +88,7 @@ private static void restoreTestBuidlDataProperty() { if (includeHadoopFlags) { CommonArgs.addHadoopFlags(args); args.add("-D"); - args.add("hbase.zookeeper.property.clientPort=21818"); + args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort); } if (null != queryStr) { @@ -120,40 +120,33 @@ private static void restoreTestBuidlDataProperty() { private String workDir = createTempDir().getAbsolutePath(); private MiniZooKeeperCluster zookeeperCluster; private MiniHBaseCluster hbaseCluster; + private int zookeeperPort; @Override @Before public void setUp() { try { + String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + zookeeperCluster = new MiniZooKeeperCluster(); + zookeeperCluster.startup(new File(zookeeperDir)); + zookeeperPort = zookeeperCluster.getClientPort(); + HBaseTestCase.recordTestBuildDataProperty(); String hbaseDir = new File(workDir, "hbase").getAbsolutePath(); String hbaseRoot = "file://" + hbaseDir; Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT - hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818); + hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort); hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); - String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); - int zookeeperPort = 21818; - zookeeperCluster = new MiniZooKeeperCluster(); - Method m; - Class zkParam[] = {Integer.TYPE}; - try { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", - zkParam); - } catch (NoSuchMethodException e) { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", - zkParam); - } - m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); - zookeeperCluster.startup(new File(zookeeperDir)); hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); HMaster master = hbaseCluster.getMaster(); Object serverName = master.getServerName(); String hostAndPort; + Method m; if (serverName instanceof String) { System.out.println("Server name is string, using HServerAddress."); m = HMaster.class.getDeclaredMethod("getMasterAddress",