mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 23:21:22 +08:00
SQOOP-319. Support for replacing Hive delimiters.
(Joey Echeverria via Arvind Prabhakar) git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1161382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5258c2855f
commit
ce5d285397
@ -32,6 +32,8 @@ Argument Description
|
||||
to Hive.
|
||||
+\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\
|
||||
fields when importing to Hive.
|
||||
+\--hive-delims-replacement+ Replace '\n', '\r', and '\01' from string\
|
||||
fields with user defined string when importing to Hive.
|
||||
+\--hive-partition-key+ Name of a hive field to partition are \
|
||||
sharded on
|
||||
+\--hive-partition-value <v>+ String-value that serves as partition key\
|
||||
|
@ -58,8 +58,11 @@ rows contain string fields that have Hive's default row delimiters
|
||||
(+\n+ and +\r+ characters) or column delimiters (+\01+ characters)
|
||||
present in them. You can use the +\--hive-drop-import-delims+ option
|
||||
to drop those characters on import to give Hive-compatible text data.
|
||||
This option should only be used if you use Hive's default delimiters
|
||||
and should not be used if different delimiters are specified.
|
||||
Alternatively, you can use the +\--hive-delims-replacement+ option
|
||||
to replace those characters with a user-defined string on import to give
|
||||
Hive-compatible text data. These options should only be used if you use
|
||||
Hive's default delimiters and should not be used if different delimiters
|
||||
are specified.
|
||||
|
||||
Sqoop will pass the field and record delimiters through to Hive. If you do
|
||||
not set any delimiters and do use +\--hive-import+, the field delimiter will
|
||||
|
@ -152,6 +152,8 @@ public enum IncrementalMode {
|
||||
private boolean failIfHiveTableExists;
|
||||
@StoredAsProperty("hive.table.name") private String hiveTableName;
|
||||
@StoredAsProperty("hive.drop.delims") private boolean hiveDropDelims;
|
||||
@StoredAsProperty("hive.delims.replacement")
|
||||
private String hiveDelimsReplacement;
|
||||
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
|
||||
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
|
||||
|
||||
@ -1100,6 +1102,18 @@ public void setHiveDropDelims(boolean dropHiveDelims) {
|
||||
this.hiveDropDelims = dropHiveDelims;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the user-specified option to specify the replacement string
|
||||
* for hive delimeters
|
||||
*/
|
||||
public String getHiveDelimsReplacement() {
|
||||
return hiveDelimsReplacement;
|
||||
}
|
||||
|
||||
public void setHiveDelimsReplacement(String replacement) {
|
||||
this.hiveDelimsReplacement = replacement;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the user-specified option to specify sqoop's behavior during
|
||||
* target table creation if the table exists.
|
||||
|
@ -32,8 +32,20 @@ private FieldFormatter() { }
|
||||
* @return
|
||||
*/
|
||||
public static String hiveStringDropDelims(String str,
|
||||
DelimiterSet delimiters) {
|
||||
return hiveStringReplaceDelims(str, "", delimiters);
|
||||
}
|
||||
|
||||
/**
|
||||
* replace hive delimiters with a user-defined string passed to the
|
||||
* --hive-delims-replacement option.
|
||||
* @param str
|
||||
* @param delimiters
|
||||
* @return
|
||||
*/
|
||||
public static String hiveStringReplaceDelims(String str, String replacement,
|
||||
DelimiterSet delimiters) {
|
||||
String droppedDelims = str.replaceAll("\\n|\\r|\01", "");
|
||||
String droppedDelims = str.replaceAll("\\n|\\r|\01", replacement);
|
||||
return escapeAndEnclose(droppedDelims, delimiters);
|
||||
}
|
||||
|
||||
|
@ -826,10 +826,18 @@ private void generateToString(Map<String, Integer> columnTypes,
|
||||
}
|
||||
|
||||
if (javaType.equals("String") && options.doHiveDropDelims()) {
|
||||
sb.append(" // special case for strings hive, dropping delimiters "
|
||||
+ "\\n,\\r,\\01 from strings\n");
|
||||
sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims("
|
||||
+ stringExpr + ", delimiters));\n");
|
||||
sb.append(" // special case for strings hive, dropping"
|
||||
+ "delimiters \\n,\\r,\\01 from strings\n");
|
||||
sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims("
|
||||
+ stringExpr + ", delimiters));\n");
|
||||
} else if (javaType.equals("String")
|
||||
&& options.getHiveDelimsReplacement() != null) {
|
||||
sb.append(" // special case for strings hive, replacing "
|
||||
+ "delimiters \\n,\\r,\\01 with '"
|
||||
+ options.getHiveDelimsReplacement() + "' from strings\n");
|
||||
sb.append(" __sb.append(FieldFormatter.hiveStringReplaceDelims("
|
||||
+ stringExpr + ", \"" + options.getHiveDelimsReplacement() + "\", "
|
||||
+ "delimiters));\n");
|
||||
} else {
|
||||
sb.append(" __sb.append(FieldFormatter.escapeAndEnclose("
|
||||
+ stringExpr + ", delimiters));\n");
|
||||
|
@ -98,6 +98,8 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
||||
public static final String HIVE_TABLE_ARG = "hive-table";
|
||||
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
|
||||
public static final String HIVE_DROP_DELIMS_ARG = "hive-drop-import-delims";
|
||||
public static final String HIVE_DELIMS_REPLACEMENT_ARG =
|
||||
"hive-delims-replacement";
|
||||
public static final String HIVE_PARTITION_KEY_ARG = "hive-partition-key";
|
||||
public static final String HIVE_PARTITION_VALUE_ARG = "hive-partition-value";
|
||||
public static final String CREATE_HIVE_TABLE_ARG =
|
||||
@ -426,6 +428,12 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) {
|
||||
+ "(\\n\\r) from imported string fields")
|
||||
.withLongOpt(HIVE_DROP_DELIMS_ARG)
|
||||
.create());
|
||||
hiveOpts.addOption(OptionBuilder
|
||||
.hasArg()
|
||||
.withDescription("Replace Hive record \\0x01 and row delimiters "
|
||||
+ "(\\n\\r) from imported string fields with user-defined string")
|
||||
.withLongOpt(HIVE_DELIMS_REPLACEMENT_ARG)
|
||||
.create());
|
||||
hiveOpts.addOption(OptionBuilder.withArgName("partition-key")
|
||||
.hasArg()
|
||||
.withDescription("Sets the partition key to use when importing to hive")
|
||||
@ -729,6 +737,11 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out)
|
||||
out.setHiveDropDelims(true);
|
||||
}
|
||||
|
||||
if (in.hasOption(HIVE_DELIMS_REPLACEMENT_ARG)) {
|
||||
out.setHiveDelimsReplacement(
|
||||
in.getOptionValue(HIVE_DELIMS_REPLACEMENT_ARG));
|
||||
}
|
||||
|
||||
if (in.hasOption(HIVE_PARTITION_KEY_ARG)) {
|
||||
out.setHivePartitionKey(in.getOptionValue(HIVE_PARTITION_KEY_ARG));
|
||||
}
|
||||
@ -894,6 +907,12 @@ protected void validateHiveOptions(SqoopOptions options)
|
||||
throws InvalidOptionsException {
|
||||
// Empty; this method is present to maintain API consistency, and
|
||||
// is reserved for future constraints on Hive options.
|
||||
if (options.getHiveDelimsReplacement() != null
|
||||
&& options.doHiveDropDelims()) {
|
||||
throw new InvalidOptionsException("The " + HIVE_DROP_DELIMS_ARG
|
||||
+ " option conflicts with the " + HIVE_DELIMS_REPLACEMENT_ARG
|
||||
+ " option." + HELP_STR);
|
||||
}
|
||||
}
|
||||
|
||||
protected void validateHBaseOptions(SqoopOptions options)
|
||||
|
@ -839,6 +839,7 @@ public void validateOptions(SqoopOptions options)
|
||||
validateCodeGenOptions(options);
|
||||
validateOutputFormatOptions(options);
|
||||
validateHBaseOptions(options);
|
||||
validateHiveOptions(options);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import com.cloudera.sqoop.SqoopOptions;
|
||||
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
|
||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||
@ -40,6 +41,7 @@
|
||||
import com.cloudera.sqoop.tool.CreateHiveTableTool;
|
||||
import com.cloudera.sqoop.tool.ImportTool;
|
||||
import com.cloudera.sqoop.tool.SqoopTool;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
|
||||
/**
|
||||
* Test HiveImport capability after an import to HDFS.
|
||||
@ -359,6 +361,77 @@ public void testFieldWithHiveDelims() throws IOException,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive import with row that has new line in it.
|
||||
*/
|
||||
@Test
|
||||
public void testFieldWithHiveDelimsReplacement() throws IOException,
|
||||
InterruptedException {
|
||||
final String TABLE_NAME = "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT";
|
||||
|
||||
LOG.info("Doing import of single row into "
|
||||
+ "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
|
||||
setCurTableName(TABLE_NAME);
|
||||
setNumCols(3);
|
||||
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||
String[] vals = { "'test with\nnew lines\n'", "42",
|
||||
"'oh no " + '\01' + " field delims " + '\01' + "'", };
|
||||
String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " "};
|
||||
|
||||
runImportTest(TABLE_NAME, types, vals,
|
||||
"fieldWithNewlineReplacementImport.q", getArgv(false, moreArgs),
|
||||
new ImportTool());
|
||||
|
||||
LOG.info("Validating data in single row is present in: "
|
||||
+ "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
|
||||
|
||||
// Ideally, we would actually invoke hive code to verify that record with
|
||||
// record and field delimiters have values replaced and that we have the
|
||||
// proper number of hive records. Unfortunately, this is a non-trivial task,
|
||||
// and better dealt with at an integration test level
|
||||
//
|
||||
// Instead, this assumes the path of the generated table and just validate
|
||||
// map job output.
|
||||
|
||||
// Get and read the raw output file
|
||||
String whDir = getWarehouseDir();
|
||||
File p = new File(new File(whDir, TABLE_NAME), "part-m-00000");
|
||||
File f = new File(p.toString());
|
||||
FileReader fr = new FileReader(f);
|
||||
BufferedReader br = new BufferedReader(fr);
|
||||
try {
|
||||
// verify the output
|
||||
assertEquals(br.readLine(), "test with new lines " + '\01' + "42"
|
||||
+ '\01' + "oh no field delims ");
|
||||
assertEquals(br.readLine(), null); // should only be one line
|
||||
} catch (IOException ioe) {
|
||||
fail("Unable to read files generated from hive");
|
||||
} finally {
|
||||
br.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive drop and replace option validation.
|
||||
*/
|
||||
@Test
|
||||
public void testHiveDropAndReplaceOptionValidation() throws ParseException {
|
||||
LOG.info("Testing conflicting Hive delimiter drop/replace options");
|
||||
|
||||
setNumCols(3);
|
||||
String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " ",
|
||||
"--"+BaseSqoopTool.HIVE_DROP_DELIMS_ARG, };
|
||||
|
||||
ImportTool tool = new ImportTool();
|
||||
try {
|
||||
tool.validateOptions(tool.parseArguments(getArgv(false, moreArgs), null,
|
||||
null, true));
|
||||
fail("Expected InvalidOptionsException");
|
||||
} catch (InvalidOptionsException ex) {
|
||||
/* success */
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hive import with row that has new line in it.
|
||||
*/
|
||||
|
2
testdata/hive/scripts/fieldWithNewlineReplacementImport.q
vendored
Normal file
2
testdata/hive/scripts/fieldWithNewlineReplacementImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
CREATE TABLE IF NOT EXISTS `FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
|
||||
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT' INTO TABLE `FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT`;
|
Loading…
Reference in New Issue
Block a user