5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 07:11:02 +08:00

SQOOP-2952: Fixing bug (row key not added into column family using --hbase-bulkload)

(Szabolcs Vasas via Attila Szabo)
This commit is contained in:
Attila Szabo 2016-10-13 14:38:21 +02:00
parent 0f13c474bf
commit b4afcf4179
8 changed files with 175 additions and 84 deletions

View File

@ -185,6 +185,20 @@
<property name="avrohadoopprofile" value="2" />
</then>
</elseif>
<!-- hadoopversion 260 is created for testing purposes only. Do not use it in production! -->
<elseif>
<equals arg1="${hadoopversion}" arg2="260" />
<then>
<property name="hadoop.version" value="2.6.0" />
<property name="hbase95.version" value="1.2.0" />
<property name="zookeeper.version" value="3.4.5" />
<property name="hadoop.version.full" value="2.6.0" />
<property name="hcatalog.version" value="0.13.0" />
<property name="hbasecompatprofile" value="2" />
<property name="avrohadoopprofile" value="2" />
</then>
</elseif>
<else>
<fail message="Unrecognized hadoopversion. Can only be 20, 23, 100, 200 or 210." />
</else>

23
ivy.xml
View File

@ -55,6 +55,8 @@ under the License.
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
<conf name="hadoop210" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
<conf name="hadoop260" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@ -62,6 +64,7 @@ under the License.
<conf name="hadoop100test" visibility="private" extends="test,hadoop100" />
<conf name="hadoop200test" visibility="private" extends="test,hadoop200" />
<conf name="hadoop210test" visibility="private" extends="test,hadoop210" />
<conf name="hadoop260test" visibility="private" extends="test,hadoop260" />
<!-- We don't redistribute everything we depend on (e.g., Hadoop itself);
anything which Hadoop itself also depends on, we do not ship.
@ -105,6 +108,26 @@ under the License.
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop210->default"/>
<!-- Dependencies for Hadoop 2.6.0 -->
<dependency org="org.apache.hadoop" name="hadoop-common"
rev="${hadoop.version}" conf="hadoop260->default">
<artifact name="hadoop-common" type="jar" />
<artifact name="hadoop-common" type="jar" m:classifier="tests"/>
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-hdfs"
rev="${hadoop.version}" conf="hadoop260->default">
<artifact name="hadoop-hdfs" type="jar" />
<artifact name="hadoop-hdfs" type="jar" m:classifier="tests"/>
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-common"
rev="${hadoop.version}" conf="hadoop260->default"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
rev="${hadoop.version}" conf="hadoop260->default"/>
<dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
conf="hadoop260->default"/>
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop260->default"/>
<!-- Dependencies for Hadoop 2.0.0 -->
<dependency org="org.apache.hadoop" name="hadoop-common"
rev="${hadoop.version}" conf="hadoop200->default">

View File

@ -18,11 +18,9 @@
package org.apache.sqoop.hbase;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.ProcessingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@ -31,11 +29,11 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.mapreduce.ImportJobBase;
import com.cloudera.sqoop.lib.FieldMappable;
import com.cloudera.sqoop.lib.FieldMapProcessor;
import com.cloudera.sqoop.lib.ProcessingException;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* SqoopRecordProcessor that performs an HBase "put" operation
@ -105,21 +103,7 @@ public void setConf(Configuration config) {
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
if (this.putTransformer instanceof ToStringPutTransformer) {
ToStringPutTransformer stringPutTransformer =
(ToStringPutTransformer) this.putTransformer;
stringPutTransformer.bigDecimalFormatString =
conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
stringPutTransformer.addRowKey =
conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY,
HBasePutProcessor.ADD_ROW_KEY_DEFAULT);
stringPutTransformer.detectCompositeKey();
}
putTransformer.init(conf);
this.tableName = conf.get(TABLE_NAME_KEY, null);
try {

View File

@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
/**
@ -33,6 +35,8 @@ public abstract class PutTransformer {
private String columnFamily;
private String rowKeyColumn;
public abstract void init(Configuration conf);
/**
* @return the default column family to insert into.
*/

View File

@ -18,6 +18,15 @@
package org.apache.sqoop.hbase;
import com.cloudera.sqoop.hbase.PutTransformer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.mapreduce.ImportJobBase;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@ -27,13 +36,10 @@
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.hbase.PutTransformer;
import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY;
import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT;
import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
/**
* PutTransformer that calls toString on all non-null fields.
@ -204,4 +210,14 @@ private String toHBaseString(Object val) {
return valString;
}
@Override
public void init(Configuration conf) {
setColumnFamily(conf.get(COL_FAMILY_KEY, null));
setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT);
detectCompositeKey();
}
}

View File

@ -66,8 +66,7 @@ protected void setup(Context context)
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
putTransformer.init(conf);
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)

View File

@ -18,68 +18,126 @@
package com.cloudera.sqoop.hbase;
import java.io.IOException;
import junit.framework.JUnit4TestAdapter;
import org.apache.commons.lang.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
*
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.commons.lang.StringUtils.join;
@RunWith(Parameterized.class)
public class HBaseImportAddRowKeyTest extends HBaseTestCase {
@Parameterized.Parameters(name = "bulkLoad = {0}")
public static Iterable<? extends Object> bulkLoadParameters() {
return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } );
}
private String[] columnTypes;
private String[] columnValues;
private String hbaseTableName;
private String hbaseColumnFamily;
private String hbaseTmpDir;
private String hbaseBulkLoadDir;
private boolean bulkLoad;
public HBaseImportAddRowKeyTest(boolean bulkLoad) {
this.bulkLoad = bulkLoad;
}
@Before
public void setUp() {
super.setUp();
columnTypes = new String[] { "INT", "INT" };
columnValues = new String[] { "0", "1" };
hbaseTableName = "addRowKeyTable";
hbaseColumnFamily = "addRowKeyFamily";
hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir";
hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir";
createTableWithColTypes(columnTypes, columnValues);
}
@Test
public void testAddRowKey() throws IOException {
String[] types = { "INT", "INT" };
String[] vals = { "0", "1" };
createTableWithColTypes(types, vals);
String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
String[] argv = new String[otherArg.length + 2];
argv[0] = "-D";
argv[1] = "sqoop.hbase.add.row.key=true";
System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should have been added
verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0");
verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1");
verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), columnValues[0]);
verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddRowKeyDefault() throws IOException {
String[] types = { "INT", "INT" };
String[] vals = { "0", "1" };
createTableWithColTypes(types, vals);
String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null);
String[] argv = getImportArguments(false, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should not be added by default
verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null);
verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1");
verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), null);
verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddCompositeKey() throws IOException {
String[] types = { "INT", "INT" };
String[] vals = { "0", "1" };
createTableWithColTypes(types, vals);
String rowKey = getColName(0)+","+getColName(1);
String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
String[] argv = new String[otherArg.length + 4];
argv[0]="-D";
argv[1]="sqoop.hbase.add.row.key=true";
System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
argv[argv.length - 2] = "--hbase-row-key";
argv[argv.length - 1] = getColName(0)+","+getColName(1);
String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily, rowKey);
runImport(argv);
// Row key should have been added
verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0");
verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1");
verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(0), columnValues[0]);
verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(1), columnValues[1]);
}
private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily) {
return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, null);
}
private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily, String rowKey) {
List<String> result = new ArrayList<>();
if (addRowKey) {
result.add("-D");
result.add("sqoop.hbase.add.row.key=true");
}
result.add("-D");
result.add("hbase.fs.tmp.dir=" + hbaseTmpDir);
result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, true, null)));
if(bulkLoad) {
result.add("--target-dir");
result.add(hbaseBulkLoadDir);
result.add("--hbase-bulkload");
}
if (!StringUtils.isBlank(rowKey)) {
result.add("--hbase-row-key");
result.add(rowKey);
}
return result.toArray(new String[result.size()]);
}
public static junit.framework.Test suite() {
return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class);
}
}

View File

@ -88,7 +88,7 @@ private static void restoreTestBuidlDataProperty() {
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
args.add("-D");
args.add("hbase.zookeeper.property.clientPort=21818");
args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort);
}
if (null != queryStr) {
@ -120,40 +120,33 @@ private static void restoreTestBuidlDataProperty() {
private String workDir = createTempDir().getAbsolutePath();
private MiniZooKeeperCluster zookeeperCluster;
private MiniHBaseCluster hbaseCluster;
private int zookeeperPort;
@Override
@Before
public void setUp() {
try {
String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
zookeeperCluster = new MiniZooKeeperCluster();
zookeeperCluster.startup(new File(zookeeperDir));
zookeeperPort = zookeeperCluster.getClientPort();
HBaseTestCase.recordTestBuildDataProperty();
String hbaseDir = new File(workDir, "hbase").getAbsolutePath();
String hbaseRoot = "file://" + hbaseDir;
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
//Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT
hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818);
hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
hbaseConf.setInt("hbase.master.info.port", -1);
hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500);
String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
int zookeeperPort = 21818;
zookeeperCluster = new MiniZooKeeperCluster();
Method m;
Class<?> zkParam[] = {Integer.TYPE};
try {
m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
zkParam);
} catch (NoSuchMethodException e) {
m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
zkParam);
}
m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)});
zookeeperCluster.startup(new File(zookeeperDir));
hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
HMaster master = hbaseCluster.getMaster();
Object serverName = master.getServerName();
String hostAndPort;
Method m;
if (serverName instanceof String) {
System.out.println("Server name is string, using HServerAddress.");
m = HMaster.class.getDeclaredMethod("getMasterAddress",