mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 06:51:49 +08:00
SQOOP-100. Sqoop to support populating Hive table partitions.
Initial patch by Frank Maritato. From: Jonathan Hsieh <jon@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1150031 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a795d32e4
commit
0efc5a4d55
@ -30,5 +30,9 @@ Argument Description
|
|||||||
to Hive.
|
to Hive.
|
||||||
+\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\
|
+\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\
|
||||||
fields when importing to Hive.
|
fields when importing to Hive.
|
||||||
|
+\--hive-partition-key+ Name of a hive field to partition are \
|
||||||
|
sharded on
|
||||||
|
+\--hive-partition-value <v>+ String-value that serves as partition key\
|
||||||
|
for this imported into hive in this job.
|
||||||
--------------------------------------------------------------------------
|
--------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -68,3 +68,10 @@ with Hive's defaults.
|
|||||||
The table name used in Hive is, by default, the same as that of the
|
The table name used in Hive is, by default, the same as that of the
|
||||||
source table. You can control the output table name with the +\--hive-table+
|
source table. You can control the output table name with the +\--hive-table+
|
||||||
option.
|
option.
|
||||||
|
|
||||||
|
Hive can put data into partitions for more efficient query
|
||||||
|
performance. You can tell a Sqoop job to import data for Hive into a
|
||||||
|
particular partition by specifying the +\--hive-partition-key+ and
|
||||||
|
+\--hive-partition-value+ arguments. The partition value must be a
|
||||||
|
string. Please see the Hive documentation for more details on
|
||||||
|
partitioning.
|
||||||
|
@ -147,6 +147,8 @@ public enum IncrementalMode {
|
|||||||
@StoredAsProperty("hive.overwrite.table") private boolean overwriteHiveTable;
|
@StoredAsProperty("hive.overwrite.table") private boolean overwriteHiveTable;
|
||||||
@StoredAsProperty("hive.table.name") private String hiveTableName;
|
@StoredAsProperty("hive.table.name") private String hiveTableName;
|
||||||
@StoredAsProperty("hive.drop.delims") private boolean hiveDropDelims;
|
@StoredAsProperty("hive.drop.delims") private boolean hiveDropDelims;
|
||||||
|
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
|
||||||
|
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
|
||||||
|
|
||||||
// An ordered list of column names denoting what order columns are
|
// An ordered list of column names denoting what order columns are
|
||||||
// serialized to a PreparedStatement from a generated record type.
|
// serialized to a PreparedStatement from a generated record type.
|
||||||
@ -1343,6 +1345,22 @@ public void setHiveTableName(String name) {
|
|||||||
this.hiveTableName = name;
|
this.hiveTableName = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getHivePartitionKey() {
|
||||||
|
return hivePartitionKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHivePartitionKey(String hpk) {
|
||||||
|
this.hivePartitionKey = hpk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHivePartitionValue() {
|
||||||
|
return hivePartitionValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHivePartitionValue(String hpv) {
|
||||||
|
this.hivePartitionValue = hpv;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the file size to split by when using --direct mode.
|
* @return the file size to split by when using --direct mode.
|
||||||
*/
|
*/
|
||||||
|
@ -167,6 +167,12 @@ public String getCreateTableStmt() throws IOException {
|
|||||||
sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
|
sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (options.getHivePartitionKey() != null) {
|
||||||
|
sb.append("PARTITIONED BY (")
|
||||||
|
.append(options.getHivePartitionKey())
|
||||||
|
.append(" STRING) ");
|
||||||
|
}
|
||||||
|
|
||||||
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
|
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
|
||||||
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
|
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
|
||||||
sb.append("' LINES TERMINATED BY '");
|
sb.append("' LINES TERMINATED BY '");
|
||||||
@ -208,6 +214,13 @@ public String getLoadDataStmt() throws IOException {
|
|||||||
sb.append(outputTableName);
|
sb.append(outputTableName);
|
||||||
sb.append('`');
|
sb.append('`');
|
||||||
|
|
||||||
|
if (options.getHivePartitionKey() != null) {
|
||||||
|
sb.append(" PARTITION (")
|
||||||
|
.append(options.getHivePartitionKey())
|
||||||
|
.append("='").append(options.getHivePartitionValue())
|
||||||
|
.append("')");
|
||||||
|
}
|
||||||
|
|
||||||
LOG.debug("Load statement: " + sb.toString());
|
LOG.debug("Load statement: " + sb.toString());
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -90,6 +90,8 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
|||||||
public static final String HIVE_TABLE_ARG = "hive-table";
|
public static final String HIVE_TABLE_ARG = "hive-table";
|
||||||
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
|
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
|
||||||
public static final String HIVE_DROP_DELIMS_ARG = "hive-drop-import-delims";
|
public static final String HIVE_DROP_DELIMS_ARG = "hive-drop-import-delims";
|
||||||
|
public static final String HIVE_PARTITION_KEY_ARG = "hive-partition-key";
|
||||||
|
public static final String HIVE_PARTITION_VALUE_ARG = "hive-partition-value";
|
||||||
public static final String NUM_MAPPERS_ARG = "num-mappers";
|
public static final String NUM_MAPPERS_ARG = "num-mappers";
|
||||||
public static final String NUM_MAPPERS_SHORT_ARG = "m";
|
public static final String NUM_MAPPERS_SHORT_ARG = "m";
|
||||||
public static final String COMPRESS_ARG = "compress";
|
public static final String COMPRESS_ARG = "compress";
|
||||||
@ -405,6 +407,17 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) {
|
|||||||
+ "(\\n\\r) from imported string fields")
|
+ "(\\n\\r) from imported string fields")
|
||||||
.withLongOpt(HIVE_DROP_DELIMS_ARG)
|
.withLongOpt(HIVE_DROP_DELIMS_ARG)
|
||||||
.create());
|
.create());
|
||||||
|
hiveOpts.addOption(OptionBuilder.withArgName("partition-key")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("Sets the partition key to use when importing to hive")
|
||||||
|
.withLongOpt(HIVE_PARTITION_KEY_ARG)
|
||||||
|
.create());
|
||||||
|
hiveOpts.addOption(OptionBuilder.withArgName("partition-value")
|
||||||
|
.hasArg()
|
||||||
|
.withDescription("Sets the partition value to use when importing "
|
||||||
|
+ "to hive")
|
||||||
|
.withLongOpt(HIVE_PARTITION_VALUE_ARG)
|
||||||
|
.create());
|
||||||
return hiveOpts;
|
return hiveOpts;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -662,6 +675,14 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out)
|
|||||||
if (in.hasOption(HIVE_DROP_DELIMS_ARG)) {
|
if (in.hasOption(HIVE_DROP_DELIMS_ARG)) {
|
||||||
out.setHiveDropDelims(true);
|
out.setHiveDropDelims(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (in.hasOption(HIVE_PARTITION_KEY_ARG)) {
|
||||||
|
out.setHivePartitionKey(in.getOptionValue(HIVE_PARTITION_KEY_ARG));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (in.hasOption(HIVE_PARTITION_VALUE_ARG)) {
|
||||||
|
out.setHivePartitionValue(in.getOptionValue(HIVE_PARTITION_VALUE_ARG));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
|
protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
|
||||||
|
@ -236,6 +236,16 @@ public void testGoodNumMappers() throws Exception {
|
|||||||
assertEquals(4, opts.getNumMappers());
|
assertEquals(4, opts.getNumMappers());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHivePartitionParams() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--hive-partition-key", "ds",
|
||||||
|
"--hive-partition-value", "20110413",
|
||||||
|
};
|
||||||
|
SqoopOptions opts = parse(args);
|
||||||
|
assertEquals("ds", opts.getHivePartitionKey());
|
||||||
|
assertEquals("20110413", opts.getHivePartitionValue());
|
||||||
|
}
|
||||||
|
|
||||||
public void testPropertySerialization1() {
|
public void testPropertySerialization1() {
|
||||||
// Test that if we write a SqoopOptions out to a Properties,
|
// Test that if we write a SqoopOptions out to a Properties,
|
||||||
// and then read it back in, we get all the same results.
|
// and then read it back in, we get all the same results.
|
||||||
|
@ -359,4 +359,24 @@ public void testFieldWithHiveDelims() throws IOException,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test hive import with row that has new line in it.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testImportHiveWithPartitions() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
final String TABLE_NAME = "PARTITION_HIVE_IMPORT";
|
||||||
|
|
||||||
|
LOG.info("Doing import of single row into PARTITION_HIVE_IMPORT table");
|
||||||
|
setCurTableName(TABLE_NAME);
|
||||||
|
setNumCols(3);
|
||||||
|
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)", };
|
||||||
|
String[] vals = { "'whoop'", "42", "'I am a row in a partition'", };
|
||||||
|
String[] moreArgs = { "--" + BaseSqoopTool.HIVE_PARTITION_KEY_ARG, "ds",
|
||||||
|
"--" + BaseSqoopTool.HIVE_PARTITION_VALUE_ARG, "20110413", };
|
||||||
|
|
||||||
|
runImportTest(TABLE_NAME, types, vals, "partitionImport.q",
|
||||||
|
getArgv(false, moreArgs), new ImportTool());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -21,12 +21,14 @@
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import com.cloudera.sqoop.SqoopOptions;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import com.cloudera.sqoop.SqoopOptions;
|
||||||
|
import com.cloudera.sqoop.tool.ImportTool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Hive DDL statement generation.
|
* Test Hive DDL statement generation.
|
||||||
@ -78,4 +80,30 @@ public void testDifferentTableNames() throws Exception {
|
|||||||
assertTrue(loadData.indexOf("INTO TABLE `outputTable`") != -1);
|
assertTrue(loadData.indexOf("INTO TABLE `outputTable`") != -1);
|
||||||
assertTrue(loadData.indexOf("/inputTable'") != -1);
|
assertTrue(loadData.indexOf("/inputTable'") != -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPartitions() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--hive-partition-key", "ds",
|
||||||
|
"--hive-partition-value", "20110413",
|
||||||
|
};
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
SqoopOptions options =
|
||||||
|
new ImportTool().parseArguments(args, null, null, false);
|
||||||
|
TableDefWriter writer = new TableDefWriter(options,
|
||||||
|
null, "inputTable", "outputTable", conf, false);
|
||||||
|
|
||||||
|
Map<String, Integer> colTypes = new HashMap<String, Integer>();
|
||||||
|
writer.setColumnTypes(colTypes);
|
||||||
|
|
||||||
|
String createTable = writer.getCreateTableStmt();
|
||||||
|
String loadData = writer.getLoadDataStmt();
|
||||||
|
|
||||||
|
assertNotNull(createTable);
|
||||||
|
assertNotNull(loadData);
|
||||||
|
assertEquals("CREATE TABLE IF NOT EXISTS `outputTable` ( ) "
|
||||||
|
+ "PARTITIONED BY (ds STRING) "
|
||||||
|
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\054' "
|
||||||
|
+ "LINES TERMINATED BY '\\012' STORED AS TEXTFILE", createTable);
|
||||||
|
assertTrue(loadData.endsWith(" PARTITION (ds='20110413')"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
2
testdata/hive/scripts/partitionImport.q
vendored
Normal file
2
testdata/hive/scripts/partitionImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS `PARTITION_HIVE_IMPORT` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) PARTITIONED BY (ds STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
|
||||||
|
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/PARTITION_HIVE_IMPORT' INTO TABLE `PARTITION_HIVE_IMPORT` PARTITION (ds='20110413');
|
Loading…
Reference in New Issue
Block a user