mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 23:52:15 +08:00
SQOOP-129. Newlines in RDBMS fields break Hive
From: Jonathan Hsieh <jon@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1150028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
79fd6b4d84
commit
f38e40e760
@ -146,6 +146,7 @@ public enum IncrementalMode {
|
|||||||
@StoredAsProperty("hive.import") private boolean hiveImport;
|
@StoredAsProperty("hive.import") private boolean hiveImport;
|
||||||
@StoredAsProperty("hive.overwrite.table") private boolean overwriteHiveTable;
|
@StoredAsProperty("hive.overwrite.table") private boolean overwriteHiveTable;
|
||||||
@StoredAsProperty("hive.table.name") private String hiveTableName;
|
@StoredAsProperty("hive.table.name") private String hiveTableName;
|
||||||
|
@StoredAsProperty("hive.drop.delims") private boolean hiveDropDelims;
|
||||||
|
|
||||||
// An ordered list of column names denoting what order columns are
|
// An ordered list of column names denoting what order columns are
|
||||||
// serialized to a PreparedStatement from a generated record type.
|
// serialized to a PreparedStatement from a generated record type.
|
||||||
@ -994,6 +995,17 @@ public void setOverwriteHiveTable(boolean overwrite) {
|
|||||||
this.overwriteHiveTable = overwrite;
|
this.overwriteHiveTable = overwrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the user-specified option to modify fields to drop hive delimiters
|
||||||
|
*/
|
||||||
|
public boolean doHiveDropDelims() {
|
||||||
|
return hiveDropDelims;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHiveDropDelims(boolean dropHiveDelims) {
|
||||||
|
this.hiveDropDelims = dropHiveDelims;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return location where .java files go; guaranteed to end with '/'.
|
* @return location where .java files go; guaranteed to end with '/'.
|
||||||
*/
|
*/
|
||||||
@ -1696,5 +1708,6 @@ public void setInNullNonStringValue(String inNullNonString) {
|
|||||||
public String getInNullNonStringValue() {
|
public String getInNullNonStringValue() {
|
||||||
return inNullNonStringValue;
|
return inNullNonStringValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,19 @@ public final class FieldFormatter {
|
|||||||
|
|
||||||
private FieldFormatter() { }
|
private FieldFormatter() { }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* only pass fields that are strings when --hive-drop-delims option is on.
|
||||||
|
* @param str
|
||||||
|
* @param delimiters
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static String hiveStringDropDelims(String str,
|
||||||
|
DelimiterSet delimiters) {
|
||||||
|
String droppedDelims = str.replaceAll("\\n|\\r|\01", "");
|
||||||
|
return escapeAndEnclose(droppedDelims, delimiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
* Takes an input string representing the value of a field, encloses it in
|
* Takes an input string representing the value of a field, encloses it in
|
||||||
* enclosing chars, and escapes any occurrences of such characters in the
|
* enclosing chars, and escapes any occurrences of such characters in the
|
||||||
* middle. The escape character itself is also escaped if it appears in the
|
* middle. The escape character itself is also escaped if it appears in the
|
||||||
|
@ -767,8 +767,15 @@ private void generateToString(Map<String, Integer> columnTypes,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
sb.append(" __sb.append(FieldFormatter.escapeAndEnclose(" + stringExpr
|
if (javaType.equals("String") && options.doHiveDropDelims()) {
|
||||||
+ ", delimiters));\n");
|
sb.append(" // special case for strings hive, dropping delimiters "
|
||||||
|
+ "\\n,\\r,\\01 from strings\n");
|
||||||
|
sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims("
|
||||||
|
+ stringExpr + ", delimiters));\n");
|
||||||
|
} else {
|
||||||
|
sb.append(" __sb.append(FieldFormatter.escapeAndEnclose("
|
||||||
|
+ stringExpr + ", delimiters));\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sb.append(" if (useRecordDelim) {\n");
|
sb.append(" if (useRecordDelim) {\n");
|
||||||
|
@ -89,6 +89,7 @@ public abstract class BaseSqoopTool extends SqoopTool {
|
|||||||
public static final String HIVE_IMPORT_ARG = "hive-import";
|
public static final String HIVE_IMPORT_ARG = "hive-import";
|
||||||
public static final String HIVE_TABLE_ARG = "hive-table";
|
public static final String HIVE_TABLE_ARG = "hive-table";
|
||||||
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
|
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
|
||||||
|
public static final String HIVE_DROP_DELIMS_ARG = "hive-drop-import-delims";
|
||||||
public static final String NUM_MAPPERS_ARG = "num-mappers";
|
public static final String NUM_MAPPERS_ARG = "num-mappers";
|
||||||
public static final String NUM_MAPPERS_SHORT_ARG = "m";
|
public static final String NUM_MAPPERS_SHORT_ARG = "m";
|
||||||
public static final String COMPRESS_ARG = "compress";
|
public static final String COMPRESS_ARG = "compress";
|
||||||
@ -399,7 +400,11 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) {
|
|||||||
.withDescription("Sets the table name to use when importing to hive")
|
.withDescription("Sets the table name to use when importing to hive")
|
||||||
.withLongOpt(HIVE_TABLE_ARG)
|
.withLongOpt(HIVE_TABLE_ARG)
|
||||||
.create());
|
.create());
|
||||||
|
hiveOpts.addOption(OptionBuilder
|
||||||
|
.withDescription("Drop Hive record \\0x01 and row delimiters "
|
||||||
|
+ "(\\n\\r) from imported string fields")
|
||||||
|
.withLongOpt(HIVE_DROP_DELIMS_ARG)
|
||||||
|
.create());
|
||||||
return hiveOpts;
|
return hiveOpts;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -653,6 +658,10 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out)
|
|||||||
if (in.hasOption(HIVE_TABLE_ARG)) {
|
if (in.hasOption(HIVE_TABLE_ARG)) {
|
||||||
out.setHiveTableName(in.getOptionValue(HIVE_TABLE_ARG));
|
out.setHiveTableName(in.getOptionValue(HIVE_TABLE_ARG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (in.hasOption(HIVE_DROP_DELIMS_ARG)) {
|
||||||
|
out.setHiveDropDelims(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
|
protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
|
||||||
|
@ -18,20 +18,24 @@
|
|||||||
|
|
||||||
package com.cloudera.sqoop.hive;
|
package com.cloudera.sqoop.hive;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.cloudera.sqoop.SqoopOptions;
|
import com.cloudera.sqoop.SqoopOptions;
|
||||||
import com.cloudera.sqoop.testutil.CommonArgs;
|
import com.cloudera.sqoop.testutil.CommonArgs;
|
||||||
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
import com.cloudera.sqoop.testutil.HsqldbTestServer;
|
||||||
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
import com.cloudera.sqoop.testutil.ImportJobTestCase;
|
||||||
|
import com.cloudera.sqoop.tool.BaseSqoopTool;
|
||||||
import com.cloudera.sqoop.tool.CodeGenTool;
|
import com.cloudera.sqoop.tool.CodeGenTool;
|
||||||
import com.cloudera.sqoop.tool.CreateHiveTableTool;
|
import com.cloudera.sqoop.tool.CreateHiveTableTool;
|
||||||
import com.cloudera.sqoop.tool.ImportTool;
|
import com.cloudera.sqoop.tool.ImportTool;
|
||||||
@ -306,5 +310,53 @@ public void testCustomDelimiters() throws IOException {
|
|||||||
runImportTest(TABLE_NAME, types, vals, "customDelimImport.q",
|
runImportTest(TABLE_NAME, types, vals, "customDelimImport.q",
|
||||||
getArgv(false, extraArgs), new ImportTool());
|
getArgv(false, extraArgs), new ImportTool());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test hive import with row that has new line in it.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFieldWithHiveDelims() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
final String TABLE_NAME = "FIELD_WITH_NL_HIVE_IMPORT";
|
||||||
|
|
||||||
|
LOG.info("Doing import of single row into FIELD_WITH_NL_HIVE_IMPORT table");
|
||||||
|
setCurTableName(TABLE_NAME);
|
||||||
|
setNumCols(3);
|
||||||
|
String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
|
||||||
|
String[] vals = { "'test with \n new lines \n'", "42",
|
||||||
|
"'oh no " + '\01' + " field delims " + '\01' + "'", };
|
||||||
|
String[] moreArgs = { "--"+ BaseSqoopTool.HIVE_DROP_DELIMS_ARG };
|
||||||
|
|
||||||
|
runImportTest(TABLE_NAME, types, vals, "fieldWithNewlineImport.q",
|
||||||
|
getArgv(false, moreArgs), new ImportTool());
|
||||||
|
|
||||||
|
LOG.info("Validating data in single row is present in: "
|
||||||
|
+ "FIELD_WITH_NL_HIVE_IMPORT table");
|
||||||
|
|
||||||
|
// Ideally, we would actually invoke hive code to verify that record with
|
||||||
|
// record and field delimiters have values replaced and that we have the
|
||||||
|
// proper number of hive records. Unfortunately, this is a non-trivial task,
|
||||||
|
// and better dealt with at an integration test level
|
||||||
|
//
|
||||||
|
// Instead, this assumes the path of the generated table and just validate
|
||||||
|
// map job output.
|
||||||
|
|
||||||
|
// Get and read the raw output file
|
||||||
|
String whDir = getWarehouseDir();
|
||||||
|
File p = new File(new File(whDir, TABLE_NAME), "part-m-00000");
|
||||||
|
File f = new File(p.toString());
|
||||||
|
FileReader fr = new FileReader(f);
|
||||||
|
BufferedReader br = new BufferedReader(fr);
|
||||||
|
try {
|
||||||
|
// verify the output
|
||||||
|
assertEquals(br.readLine(), "test with new lines " + '\01' + "42"
|
||||||
|
+ '\01' + "oh no field delims ");
|
||||||
|
assertEquals(br.readLine(), null); // should only be one line
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
fail("Unable to read files generated from hive");
|
||||||
|
} finally {
|
||||||
|
br.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
2
testdata/hive/scripts/fieldWithNewlineImport.q
vendored
Normal file
2
testdata/hive/scripts/fieldWithNewlineImport.q
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS `FIELD_WITH_NL_HIVE_IMPORT` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
|
||||||
|
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/FIELD_WITH_NL_HIVE_IMPORT' INTO TABLE `FIELD_WITH_NL_HIVE_IMPORT`;
|
Loading…
Reference in New Issue
Block a user