mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 03:11:00 +08:00
SQOOP-862: Hbase import fails if there is a row where all columns are null
(David Robson via Jarek Jarcec Cecho)
This commit is contained in:
parent
a220ae4694
commit
20b16fd4bd
@ -48,6 +48,7 @@ using the default parameters from your HBase configuration.
|
||||
Sqoop currently serializes all values to HBase by converting each field
|
||||
to its string representation (as if you were importing to HDFS in text
|
||||
mode), and then inserts the UTF-8 bytes of this string in the target
|
||||
cell.
|
||||
cell. Sqoop will skip all rows containing null values in all columns
|
||||
except the row key column.
|
||||
|
||||
|
||||
|
@ -23,10 +23,13 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.sqoop.mapreduce.ImportJobBase;
|
||||
|
||||
@ -41,6 +44,9 @@
|
||||
public class HBasePutProcessor implements Closeable, Configurable,
|
||||
FieldMapProcessor {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
HBasePutProcessor.class.getName());
|
||||
|
||||
/** Configuration key specifying the table to insert into. */
|
||||
public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
|
||||
|
||||
@ -124,7 +130,14 @@ public void accept(FieldMappable record)
|
||||
List<Put> putList = putTransformer.getPutCommand(fields);
|
||||
if (null != putList) {
|
||||
for (Put put : putList) {
|
||||
this.table.put(put);
|
||||
if (put!=null) {
|
||||
if (put.isEmpty()) {
|
||||
LOG.warn("Could not insert row with no columns "
|
||||
+ "for row-key column: " + Bytes.toString(put.getRow()));
|
||||
} else {
|
||||
this.table.put(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -112,4 +112,19 @@ public void testExitFailure() throws IOException {
|
||||
|
||||
fail("should have gotten exception");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullRow() throws IOException {
|
||||
String [] argv = getArgv(true, "nullRowT", "nullRowF", true, null);
|
||||
String [] types = { "INT", "INT" };
|
||||
String [] vals = { "0", "null" };
|
||||
createTableWithColTypes(types, vals);
|
||||
runImport(argv);
|
||||
|
||||
// This cell should not be placed in the results..
|
||||
verifyHBaseCell("nullRowT", "0", "nullRowF", getColName(1), null);
|
||||
|
||||
int rowCount = countHBaseTable("nullRowT", "nullRowF");
|
||||
assertEquals(0, rowCount);
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -234,4 +235,22 @@ public static File createTempDir() {
|
||||
}
|
||||
throw new IllegalStateException("Failed to create directory");
|
||||
}
|
||||
|
||||
protected int countHBaseTable(String tableName, String colFamily)
|
||||
throws IOException {
|
||||
int count = 0;
|
||||
HTable table = new HTable(new Configuration(
|
||||
hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
|
||||
try {
|
||||
ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
|
||||
for(Result result = scanner.next();
|
||||
result != null;
|
||||
result = scanner.next()) {
|
||||
count++;
|
||||
}
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user