5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 04:29:59 +08:00

MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop records and record reparsing. Contributed by Aaron Kimball.

From: Thomas White <tomwhite@apache.org>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149818 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:22 +00:00
parent 3c322c9969
commit 3d39962dfa
33 changed files with 2670 additions and 130 deletions

View File

@ -29,6 +29,7 @@ to call at top-level: ant deploy-contrib compile-core-test
<!-- ================================================================== -->
<!-- Run unit tests -->
<!-- Override with our own version so we can set hadoop.alt.classpath -->
<!-- and Hadoop logger properties -->
<!-- ================================================================== -->
<target name="test" depends="compile-test, compile" if="test.available">
<echo message="contrib: ${name}"/>
@ -59,6 +60,10 @@ to call at top-level: ant deploy-contrib compile-core-test
<sysproperty key="hadoop.alt.classpath"
value="${hadoop.root}/build/classes" />
<!-- we want more log4j output when running unit tests -->
<sysproperty key="hadoop.root.logger"
value="DEBUG,console" />
<!-- requires fork=yes for:
relative File paths to use the specified user.dir
classpath to use build/contrib/*.jar

View File

@ -100,6 +100,20 @@ public enum FileLayout {
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
private char inputFieldDelim;
private char inputRecordDelim;
private char inputEnclosedBy;
private char inputEscapedBy;
private boolean inputMustBeEnclosed;
private char outputFieldDelim;
private char outputRecordDelim;
private char outputEnclosedBy;
private char outputEscapedBy;
private boolean outputMustBeEnclosed;
private boolean areDelimsManuallySet;
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
public ImportOptions() {
@ -199,6 +213,20 @@ private void initDefaults() {
this.jarOutputDir = tmpDir + "sqoop/compile";
this.layout = FileLayout.TextFile;
this.inputFieldDelim = '\000';
this.inputRecordDelim = '\000';
this.inputEnclosedBy = '\000';
this.inputEscapedBy = '\000';
this.inputMustBeEnclosed = false;
this.outputFieldDelim = ',';
this.outputRecordDelim = '\n';
this.outputEnclosedBy = '\000';
this.outputEscapedBy = '\000';
this.outputMustBeEnclosed = false;
this.areDelimsManuallySet = false;
loadFromProperties();
}
@ -236,7 +264,24 @@ public static void printUsage() {
System.out.println("--as-textfile Imports data as plain text (default)");
System.out.println("--all-tables Import all tables in database");
System.out.println(" (Ignores --table, --columns and --order-by)");
System.out.println("--hive-import If set, then import the table into Hive");
System.out.println("--hive-import If set, then import the table into Hive.");
System.out.println(" (Uses Hive's default delimiters if none are set.)");
System.out.println("");
System.out.println("Output line formatting options:");
System.out.println("--fields-terminated-by (char) Sets the field separator character");
System.out.println("--lines-terminated-by (char) Sets the end-of-line character");
System.out.println("--optionally-enclosed-by (char) Sets a field enclosing character");
System.out.println("--enclosed-by (char) Sets a required field enclosing char");
System.out.println("--escaped-by (char) Sets the escape character");
System.out.println("--mysql-delimiters Uses MySQL's default delimiter set");
System.out.println(" fields: , lines: \\n escaped-by: \\ optionally-enclosed-by: '");
System.out.println("");
System.out.println("Input parsing options:");
System.out.println("--input-fields-terminated-by (char) Sets the input field separator");
System.out.println("--input-lines-terminated-by (char) Sets the input end-of-line char");
System.out.println("--input-optionally-enclosed-by (char) Sets a field enclosing character");
System.out.println("--input-enclosed-by (char) Sets a required field encloser");
System.out.println("--input-escaped-by (char) Sets the input escape character");
System.out.println("");
System.out.println("Code generation options:");
System.out.println("--outdir (dir) Output directory for generated code");
@ -260,6 +305,85 @@ public static void printUsage() {
System.out.println("commands.");
}
/**
* Given a string containing a single character or an escape sequence representing
* a char, return that char itself.
*
* Normal literal characters return themselves: "x" -&gt; 'x', etc.
* Strings containing a '\' followed by one of t, r, n, or b escape to the usual
* character as seen in Java: "\n" -&gt; (newline), etc.
*
* Strings like "\0ooo" return the character specified by the octal sequence 'ooo'
* Strings like "\0xhhh" or "\0Xhhh" return the character specified by the hex sequence 'hhh'
*/
static char toChar(String charish) throws InvalidOptionsException {
if (null == charish) {
throw new InvalidOptionsException("Character argument expected."
+ "\nTry --help for usage instructions.");
} else if (charish.startsWith("\\0x") || charish.startsWith("\\0X")) {
if (charish.length() == 3) {
throw new InvalidOptionsException("Base-16 value expected for character argument."
+ "\nTry --help for usage instructions.");
} else {
String valStr = charish.substring(3);
int val = Integer.parseInt(valStr, 16);
return (char) val;
}
} else if (charish.startsWith("\\0")) {
if (charish.equals("\\0")) {
// it's just '\0', which we can take as shorthand for nul.
return '\000';
} else {
// it's an octal value.
String valStr = charish.substring(2);
int val = Integer.parseInt(valStr, 8);
return (char) val;
}
} else if (charish.startsWith("\\")) {
if (charish.length() == 1) {
// it's just a '\'. Keep it literal.
return '\\';
} else if (charish.length() > 2) {
// we don't have any 3+ char escape strings.
throw new InvalidOptionsException("Cannot understand character argument: " + charish
+ "\nTry --help for usage instructions.");
} else {
// this is some sort of normal 1-character escape sequence.
char escapeWhat = charish.charAt(1);
switch(escapeWhat) {
case 'b':
return '\b';
case 'n':
return '\n';
case 'r':
return '\r';
case 't':
return '\t';
case '\"':
return '\"';
case '\'':
return '\'';
case '\\':
return '\\';
default:
throw new InvalidOptionsException("Cannot understand character argument: " + charish
+ "\nTry --help for usage instructions.");
}
}
} else if (charish.length() == 0) {
throw new InvalidOptionsException("Character argument expected."
+ "\nTry --help for usage instructions.");
} else {
// it's a normal character.
if (charish.length() > 1) {
LOG.warn("Character argument " + charish + " has multiple characters; "
+ "only the first will be used.");
}
return charish.charAt(0);
}
}
/**
* Read args from the command-line into member fields.
* @throws Exception if there's a problem parsing arguments.
@ -313,6 +437,42 @@ public void parse(String [] args) throws InvalidOptionsException {
this.hiveHome = args[++i];
} else if (args[i].equals("--hive-import")) {
this.hiveImport = true;
} else if (args[i].equals("--fields-terminated-by")) {
this.outputFieldDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
} else if (args[i].equals("--lines-terminated-by")) {
this.outputRecordDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
} else if (args[i].equals("--optionally-enclosed-by")) {
this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
this.outputMustBeEnclosed = false;
this.areDelimsManuallySet = true;
} else if (args[i].equals("--enclosed-by")) {
this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
this.outputMustBeEnclosed = true;
this.areDelimsManuallySet = true;
} else if (args[i].equals("--escaped-by")) {
this.outputEscapedBy = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
} else if (args[i].equals("--mysql-delimiters")) {
this.outputFieldDelim = ',';
this.outputRecordDelim = '\n';
this.outputEnclosedBy = '\'';
this.outputEscapedBy = '\\';
this.outputMustBeEnclosed = false;
this.areDelimsManuallySet = true;
} else if (args[i].equals("--input-fields-terminated-by")) {
this.inputFieldDelim = ImportOptions.toChar(args[++i]);
} else if (args[i].equals("--input-lines-terminated-by")) {
this.inputRecordDelim = ImportOptions.toChar(args[++i]);
} else if (args[i].equals("--input-optionally-enclosed-by")) {
this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
this.inputMustBeEnclosed = false;
} else if (args[i].equals("--input-enclosed-by")) {
this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
this.inputMustBeEnclosed = true;
} else if (args[i].equals("--input-escaped-by")) {
this.inputEscapedBy = ImportOptions.toChar(args[++i]);
} else if (args[i].equals("--outdir")) {
this.codeOutputDir = args[++i];
} else if (args[i].equals("--as-sequencefile")) {
@ -381,6 +541,30 @@ public void validate() throws InvalidOptionsException {
throw new InvalidOptionsException(
"--class-name overrides --package-name. You cannot use both." + HELP_STR);
}
if (this.hiveImport) {
if (!areDelimsManuallySet) {
// user hasn't manually specified delimiters, and wants to import straight to Hive.
// Use Hive-style delimiters.
LOG.info("Using Hive-specific delimiters for output. You can override");
LOG.info("delimiters with --fields-terminated-by, etc.");
this.outputFieldDelim = (char)0x1; // ^A
this.outputRecordDelim = '\n';
this.outputEnclosedBy = '\000'; // no enclosing in Hive.
this.outputEscapedBy = '\000'; // no escaping in Hive
this.outputMustBeEnclosed = false;
}
if (this.getOutputEscapedBy() != '\000') {
LOG.warn("Hive does not support escape characters in fields;");
LOG.warn("parse errors in Hive may result from using --escaped-by.");
}
if (this.getOutputEnclosedBy() != '\000') {
LOG.warn("Hive does not support quoted strings; parse errors");
LOG.warn("in Hive may result from using --enclosed-by.");
}
}
}
/** get the temporary directory; guaranteed to end in File.separator
@ -522,4 +706,101 @@ public void setUsername(String name) {
public void setPassword(String pass) {
this.password = pass;
}
/**
* @return the field delimiter to use when parsing lines. Defaults to the field delim
* to use when printing lines
*/
public char getInputFieldDelim() {
if (inputFieldDelim == '\000') {
return this.outputFieldDelim;
} else {
return this.inputFieldDelim;
}
}
/**
* @return the record delimiter to use when parsing lines. Defaults to the record delim
* to use when printing lines.
*/
public char getInputRecordDelim() {
if (inputRecordDelim == '\000') {
return this.outputRecordDelim;
} else {
return this.inputRecordDelim;
}
}
/**
* @return the character that may enclose fields when parsing lines. Defaults to the
* enclosing-char to use when printing lines.
*/
public char getInputEnclosedBy() {
if (inputEnclosedBy == '\000') {
return this.outputEnclosedBy;
} else {
return this.inputEnclosedBy;
}
}
/**
* @return the escape character to use when parsing lines. Defaults to the escape
* character used when printing lines.
*/
public char getInputEscapedBy() {
if (inputEscapedBy == '\000') {
return this.outputEscapedBy;
} else {
return this.inputEscapedBy;
}
}
/**
* @return true if fields must be enclosed by the --enclosed-by character when parsing.
* Defaults to false. Set true when --input-enclosed-by is used.
*/
public boolean isInputEncloseRequired() {
if (inputEnclosedBy == '\000') {
return this.outputMustBeEnclosed;
} else {
return this.inputMustBeEnclosed;
}
}
/**
* @return the character to print between fields when importing them to text.
*/
public char getOutputFieldDelim() {
return this.outputFieldDelim;
}
/**
* @return the character to print between records when importing them to text.
*/
public char getOutputRecordDelim() {
return this.outputRecordDelim;
}
/**
* @return a character which may enclose the contents of fields when imported to text.
*/
public char getOutputEnclosedBy() {
return this.outputEnclosedBy;
}
/**
* @return a character which signifies an escape sequence when importing to text.
*/
public char getOutputEscapedBy() {
return this.outputEscapedBy;
}
/**
* @return true if fields imported to text must be enclosed by the EnclosedBy char.
* default is false; set to true if --enclosed-by is used instead of --optionally-enclosed-by.
*/
public boolean isOutputEncloseRequired() {
return this.outputMustBeEnclosed;
}
}

View File

@ -115,8 +115,11 @@ public String getCreateTableStmt() throws IOException {
sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
}
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\0");
sb.append(Integer.toOctalString((int) options.getOutputFieldDelim()));
sb.append("' LINES TERMINATED BY '\\0");
sb.append(Integer.toOctalString((int) options.getOutputRecordDelim()));
sb.append("' STORED AS TEXTFILE");
LOG.debug("Create statement: " + sb.toString());
return sb.toString();

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
/**
* Abstract base class for all DBWritable types generated by Sqoop.
* Contains methods required by all such types, to help with parsing,
* stringification, etc.
*/
public final class FieldFormatter {
private FieldFormatter() { }
/**
* Takes an input string representing the value of a field, encloses it in
* enclosing chars, and escapes any occurrences of such characters in the middle.
* The escape character itself is also escaped if it appears in the text of the
* field.
*
* The field is enclosed only if:
* enclose != '\000', and:
* encloseRequired is true, or
* one of the characters in the mustEscapeFor list is present in the string.
*
* Escaping is not performed if the escape char is '\000'.
*
* @param str - The user's string to escape and enclose
* @param escape - What string to use as the escape sequence. If "" or null, then don't escape.
* @param enclose - The string to use to enclose str e.g. "quoted". If "" or null, then don't
* enclose.
* @param mustEncloseFor - A list of characters; if one is present in 'str', then str must be
* enclosed
* @param encloseRequired - If true, then always enclose, regardless of mustEscapeFor
* @return the escaped, enclosed version of 'str'
*/
public static final String escapeAndEnclose(String str, String escape, String enclose,
char [] mustEncloseFor, boolean encloseRequired) {
// true if we can use an escape character.
boolean escapingLegal = (null != escape && escape.length() > 0 && !escape.equals("\000"));
String withEscapes;
if (escapingLegal) {
// escaping is legal. Escape any instances of the escape char itself
withEscapes = str.replace(escape, escape + escape);
} else {
// no need to double-escape
withEscapes = str;
}
if (null == enclose || enclose.length() == 0 || enclose.equals("\000")) {
// The enclose-with character was left unset, so we can't enclose items. We're done.
return withEscapes;
}
// if we have an enclosing character, and escaping is legal, then the encloser must
// always be escaped.
if (escapingLegal) {
withEscapes = withEscapes.replace(enclose, escape + enclose);
}
boolean actuallyDoEnclose = encloseRequired;
if (!actuallyDoEnclose && mustEncloseFor != null) {
// check if the string requires enclosing
for (char reason : mustEncloseFor) {
if (str.indexOf(reason) != -1) {
actuallyDoEnclose = true;
break;
}
}
}
if (actuallyDoEnclose) {
return enclose + withEscapes + enclose;
} else {
return withEscapes;
}
}
}

View File

@ -0,0 +1,353 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Parses a record containing one or more fields. Fields are separated
* by some FIELD_DELIMITER character, e.g. a comma or a ^A character.
* Records are terminated by a RECORD_DELIMITER character, e.g., a newline.
*
* Fields may be (optionally or mandatorily) enclosed by a quoting char
* e.g., '\"'
*
* Fields may contain escaped characters. An escape character may be, e.g.,
* the '\\' character. Any character following an escape character
* is treated literally. e.g., '\n' is recorded as an 'n' character, not a
* newline.
*
* Unexpected results may occur if the enclosing character escapes itself.
* e.g., this cannot parse SQL SELECT statements where the single character
* ['] escapes to [''].
*
* This class is not synchronized. Multiple threads must use separate
* instances of RecordParser.
*
* The fields parsed by RecordParser are backed by an internal buffer
* which is cleared when the next call to parseRecord() is made. If
* the buffer is required to be preserved, you must copy it yourself.
*/
public final class RecordParser {
public static final Log LOG = LogFactory.getLog(RecordParser.class.getName());
private enum ParseState {
FIELD_START,
ENCLOSED_FIELD,
UNENCLOSED_FIELD,
ENCLOSED_ESCAPE,
ENCLOSED_EXPECT_DELIMITER,
UNENCLOSED_ESCAPE
}
public static class ParseError extends Exception {
public ParseError() {
super("ParseError");
}
public ParseError(final String msg) {
super(msg);
}
public ParseError(final String msg, final Throwable cause) {
super(msg, cause);
}
public ParseError(final Throwable cause) {
super(cause);
}
}
private char fieldDelim;
private char recordDelim;
private char enclosingChar;
private char escapeChar;
private boolean enclosingRequired;
private ArrayList<String> outputs;
public RecordParser(final char field, final char record, final char enclose,
final char escape, final boolean mustEnclose) {
this.fieldDelim = field;
this.recordDelim = record;
this.enclosingChar = enclose;
this.escapeChar = escape;
this.enclosingRequired = mustEnclose;
this.outputs = new ArrayList<String>();
}
/**
* Return a list of strings representing the fields of the input line.
* This list is backed by an internal buffer which is cleared by the
* next call to parseRecord().
*/
public List<String> parseRecord(CharSequence input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
return parseRecord(CharBuffer.wrap(input));
}
/**
* Return a list of strings representing the fields of the input line.
* This list is backed by an internal buffer which is cleared by the
* next call to parseRecord().
*/
public List<String> parseRecord(Text input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
// TODO(aaron): The parser should be able to handle UTF-8 strings
// as well, to avoid this transcode operation.
return parseRecord(input.toString());
}
/**
* Return a list of strings representing the fields of the input line.
* This list is backed by an internal buffer which is cleared by the
* next call to parseRecord().
*/
public List<String> parseRecord(byte [] input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
return parseRecord(ByteBuffer.wrap(input).asCharBuffer());
}
/**
* Return a list of strings representing the fields of the input line.
* This list is backed by an internal buffer which is cleared by the
* next call to parseRecord().
*/
public List<String> parseRecord(char [] input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
return parseRecord(CharBuffer.wrap(input));
}
public List<String> parseRecord(ByteBuffer input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
return parseRecord(input.asCharBuffer());
}
/**
* Return a list of strings representing the fields of the input line.
* This list is backed by an internal buffer which is cleared by the
* next call to parseRecord().
*/
public List<String> parseRecord(CharBuffer input) throws ParseError {
if (null == input) {
throw new ParseError("null input string");
}
/*
This method implements the following state machine to perform
parsing.
Note that there are no restrictions on whether particular characters
(e.g., field-sep, record-sep, etc) are distinct or the same. The
state transitions are processed in the order seen in this comment.
Starting state is FIELD_START
encloser -> ENCLOSED_FIELD
escape char -> UNENCLOSED_ESCAPE
field delim -> FIELD_START (for a new field)
record delim -> stops processing
all other letters get added to current field, -> UNENCLOSED FIELD
ENCLOSED_FIELD state:
escape char goes to ENCLOSED_ESCAPE
encloser goes to ENCLOSED_EXPECT_DELIMITER
field sep or record sep gets added to the current string
normal letters get added to the current string
ENCLOSED_ESCAPE state:
any character seen here is added literally, back to ENCLOSED_FIELD
ENCLOSED_EXPECT_DELIMITER state:
field sep goes to FIELD_START
record sep halts processing.
all other characters are errors.
UNENCLOSED_FIELD state:
ESCAPE char goes to UNENCLOSED_ESCAPE
FIELD_SEP char goes to FIELD_START
RECORD_SEP char halts processing
normal chars or the enclosing char get added to the current string
UNENCLOSED_ESCAPE:
add charater literal to current string, return to UNENCLOSED_FIELD
*/
char curChar = '\000';
ParseState state = ParseState.FIELD_START;
int len = input.length();
StringBuilder sb = null;
outputs.clear();
for (int pos = 0; pos < len; pos++) {
curChar = input.get();
switch (state) {
case FIELD_START:
// ready to start processing a new field.
if (null != sb) {
// We finished processing a previous field. Add to the list.
outputs.add(sb.toString());
}
sb = new StringBuilder();
if (this.enclosingChar == curChar) {
// got an opening encloser.
state = ParseState.ENCLOSED_FIELD;
} else if (this.escapeChar == curChar) {
state = ParseState.UNENCLOSED_ESCAPE;
} else if (this.fieldDelim == curChar) {
// we have a zero-length field. This is a no-op.
} else if (this.recordDelim == curChar) {
// we have a zero-length field, that ends processing.
pos = len;
} else {
// current char is part of the field.
state = ParseState.UNENCLOSED_FIELD;
sb.append(curChar);
if (this.enclosingRequired) {
throw new ParseError("Opening field-encloser expected at position " + pos);
}
}
break;
case ENCLOSED_FIELD:
if (this.escapeChar == curChar) {
// the next character is escaped. Treat it literally.
state = ParseState.ENCLOSED_ESCAPE;
} else if (this.enclosingChar == curChar) {
// we're at the end of the enclosing field. Expect an EOF or EOR char.
state = ParseState.ENCLOSED_EXPECT_DELIMITER;
} else {
// this is a regular char, or an EOF / EOR inside an encloser. Add to
// the current field string, and remain in this state.
sb.append(curChar);
}
break;
case UNENCLOSED_FIELD:
if (this.escapeChar == curChar) {
// the next character is escaped. Treat it literally.
state = ParseState.UNENCLOSED_ESCAPE;
} else if (this.fieldDelim == curChar) {
// we're at the end of this field; may be the start of another one.
state = ParseState.FIELD_START;
} else if (this.recordDelim == curChar) {
pos = len; // terminate processing immediately.
} else {
// this is a regular char. Add to the current field string,
// and remain in this state.
sb.append(curChar);
}
break;
case ENCLOSED_ESCAPE:
// Treat this character literally, whatever it is, and return to enclosed
// field processing.
sb.append(curChar);
state = ParseState.ENCLOSED_FIELD;
break;
case ENCLOSED_EXPECT_DELIMITER:
// We were in an enclosed field, but got the final encloser. Now we expect
// either an end-of-field or an end-of-record.
if (this.fieldDelim == curChar) {
// end of one field is the beginning of the next.
state = ParseState.FIELD_START;
} else if (this.recordDelim == curChar) {
// stop processing.
pos = len;
} else {
// Don't know what to do with this character.
throw new ParseError("Expected delimiter at position " + pos);
}
break;
case UNENCLOSED_ESCAPE:
// Treat this character literally, whatever it is, and return to non-enclosed
// field processing.
sb.append(curChar);
state = ParseState.UNENCLOSED_FIELD;
break;
}
}
if (state == ParseState.FIELD_START && curChar == this.fieldDelim) {
// we hit an EOF/EOR as the last legal character and we need to mark
// that string as recorded. This if block is outside the for-loop since
// we don't have a physical 'epsilon' token in our string.
if (null != sb) {
outputs.add(sb.toString());
sb = new StringBuilder();
}
}
if (null != sb) {
// There was a field that terminated by running out of chars or an EOR
// character. Add to the list.
outputs.add(sb.toString());
}
return outputs;
}
public boolean isEnclosingRequired() {
return enclosingRequired;
}
@Override
public String toString() {
return "RecordParser[" + fieldDelim + ',' + recordDelim + ',' + enclosingChar + ','
+ escapeChar + ',' + enclosingRequired + "]";
}
@Override
public int hashCode() {
return this.toString().hashCode();
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
/**
* Interface implemented by the classes generated by sqoop's orm.ClassWriter.
*/
public interface SqoopRecord extends DBWritable, Writable {
public void parse(CharSequence s) throws RecordParser.ParseError;
public void parse(Text s) throws RecordParser.ParseError;
public void parse(byte [] s) throws RecordParser.ParseError;
public void parse(char [] s) throws RecordParser.ParseError;
public void parse(ByteBuffer s) throws RecordParser.ParseError;
public void parse(CharBuffer s) throws RecordParser.ParseError;
}

View File

@ -29,8 +29,11 @@
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.CharBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,7 +41,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.PerfCounters;
import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
import org.apache.hadoop.util.Shell;
/**
@ -50,12 +57,290 @@ public class LocalMySQLManager extends MySQLManager {
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
// StreamHandlers used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
private final BufferedWriter writer;
private final PerfCounters counters;
CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
private CopyingStreamThread child;
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, writer, counters);
child.start();
}
public int join() throws InterruptedException {
child.join();
if (child.isErrored()) {
return 1;
} else {
return 0;
}
}
private static class CopyingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
private final BufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
}
public boolean isErrored() {
return error;
}
public void run() {
BufferedReader r = null;
BufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
// this line is of the form "INSERT .. VALUES ( actual value text );"
// strip the leading preamble up to the '(' and the trailing ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
}
// chop off the leading and trailing text as we write the
// output to HDFS.
int len = inLine.length() - 2 - preambleLen;
w.write(inLine, preambleLen, len);
w.newLine();
counters.addBytes(1 + len);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
error = true;
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
/**
* The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
private final BufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
private ReparsingStreamThread child;
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, writer, options, counters);
child.start();
}
public int join() throws InterruptedException {
child.join();
if (child.isErrored()) {
return 1;
} else {
return 0;
}
}
private static class ReparsingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
private final BufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
ReparsingStreamThread(final InputStream is, final BufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.stream = is;
this.counters = ctrs;
}
private static final char MYSQL_FIELD_DELIM = ',';
private static final char MYSQL_RECORD_DELIM = '\n';
private static final char MYSQL_ENCLOSE_CHAR = '\'';
private static final char MYSQL_ESCAPE_CHAR = '\\';
private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
private static final RecordParser MYSQLDUMP_PARSER;
static {
// build a record parser for mysqldump's format
MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
}
public boolean isErrored() {
return error;
}
public void run() {
BufferedReader r = null;
BufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
char outputFieldDelim = options.getOutputFieldDelim();
char outputRecordDelim = options.getOutputRecordDelim();
String outputEnclose = "" + options.getOutputEnclosedBy();
String outputEscape = "" + options.getOutputEscapedBy();
boolean outputEncloseRequired = options.isOutputEncloseRequired();
char [] encloseFor = { outputFieldDelim, outputRecordDelim };
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
// this line is of the form "INSERT .. VALUES ( actual value text );"
// strip the leading preamble up to the '(' and the trailing ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
}
// Wrap the input string in a char buffer that ignores the leading and trailing
// text.
CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen, inLine.length() - 2);
// Pass this along to the parser
List<String> fields = null;
try {
fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
} catch (RecordParser.ParseError pe) {
LOG.warn("ParseError reading from mysqldump: " + pe.toString() + "; record skipped");
}
// For all of the output fields, emit them using the delimiters the user chooses.
boolean first = true;
int recordLen = 1; // for the delimiter.
for (String field : fields) {
if (!first) {
w.write(outputFieldDelim);
} else {
first = false;
}
String fieldStr = FieldFormatter.escapeAndEnclose(field, outputEscape, outputEnclose,
encloseFor, outputEncloseRequired);
w.write(fieldStr);
recordLen += fieldStr.length();
}
w.write(outputRecordDelim);
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
error = true;
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
public LocalMySQLManager(final ImportOptions options) {
super(options, false);
}
private static final String MYSQL_DUMP_CMD = "mysqldump";
/**
* @return true if the user's output delimiters match those used by mysqldump.
* fields: ,
* lines: \n
* optional-enclose: \'
* escape: \\
*/
private boolean outputDelimsAreMySQL() {
return options.getOutputFieldDelim() == ','
&& options.getOutputRecordDelim() == '\n'
&& options.getOutputEnclosedBy() == '\''
&& options.getOutputEscapedBy() == '\\'
&& !options.isOutputEncloseRequired(); // encloser is optional
}
/**
* Writes the user's password to a tmp file with 0600 permissions.
* @return the filename used.
@ -145,12 +430,15 @@ public void importTable(String tableName, String jarFile, Configuration conf)
}
LOG.info("Performing import of table " + tableName + " from database " + databaseName);
Process p = null;
args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
String password = options.getPassword();
String passwordFile = null;
Process p = null;
StreamHandlerFactory streamHandler = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
@ -188,82 +476,53 @@ public void importTable(String tableName, String jarFile, Configuration conf)
LOG.debug(" " + arg);
}
FileSystem fs = FileSystem.get(conf);
String warehouseDir = options.getWarehouseDir();
Path destDir = null;
if (null != warehouseDir) {
destDir = new Path(new Path(warehouseDir), tableName);
} else {
destDir = new Path(tableName);
}
LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
LOG.debug("Creating destination directory " + destDir);
fs.mkdirs(destDir);
Path destFile = new Path(destDir, "data-00000");
LOG.debug("Opening output file: " + destFile);
if (fs.exists(destFile)) {
Path canonicalDest = destFile.makeQualified(fs);
throw new IOException("Destination file " + canonicalDest + " already exists");
}
// This writer will be closed by StreamHandlerFactory.
OutputStream os = fs.create(destFile);
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
// read from the pipe, into HDFS.
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
OutputStream os = null;
BufferedReader r = null;
BufferedWriter w = null;
try {
r = new BufferedReader(new InputStreamReader(is));
// create the paths/files in HDFS
FileSystem fs = FileSystem.get(conf);
String warehouseDir = options.getWarehouseDir();
Path destDir = null;
if (null != warehouseDir) {
destDir = new Path(new Path(warehouseDir), tableName);
} else {
destDir = new Path(tableName);
}
LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
LOG.debug("Creating destination directory " + destDir);
fs.mkdirs(destDir);
Path destFile = new Path(destDir, "data-00000");
LOG.debug("Opening output file: " + destFile);
if (fs.exists(destFile)) {
Path canonicalDest = destFile.makeQualified(fs);
throw new IOException("Destination file " + canonicalDest + " already exists");
}
os = fs.create(destFile);
w = new BufferedWriter(new OutputStreamWriter(os));
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
// this line is of the form "INSERT .. VALUES ( actual value text );"
// strip the leading preamble up to the '(' and the trailing ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
}
// chop off the leading and trailing text as we write the
// output to HDFS.
w.write(inLine, preambleLen, inLine.length() - 2 - preambleLen);
w.newLine();
}
} finally {
LOG.info("Transfer loop complete.");
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
if (outputDelimsAreMySQL()) {
LOG.debug("Output delimiters conform to mysqldump; using straight copy");
streamHandler = new CopyingStreamHandlerFactory(w, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
streamHandler.processStream(is);
} finally {
// block until the process is done.
int result = 0;
if (null != p) {
while (true) {
@ -286,10 +545,34 @@ public void importTable(String tableName, String jarFile, Configuration conf)
}
}
// block until the stream handler is done too.
int streamResult = 0;
if (null != streamHandler) {
while (true) {
try {
streamResult = streamHandler.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {
throw new IOException("mysqldump terminated with status "
+ Integer.toString(result));
}
if (0 != streamResult) {
throw new IOException("Encountered exception in stream handler");
}
counters.stopClock();
LOG.info("Transferred " + counters.toString());
}
}
}

View File

@ -28,9 +28,11 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
@ -41,6 +43,7 @@
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
@ -95,6 +98,7 @@ public void runImport(String tableName, String ormJarFile, String orderByCol,
}
if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
job.setOutputFormat(RawKeyTextOutputFormat.class);
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
@ -137,7 +141,16 @@ public void runImport(String tableName, String ormJarFile, String orderByCol,
orderByCol, colNames);
job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
JobClient.runJob(job);
PerfCounters counters = new PerfCounters();
counters.startClock();
RunningJob runningJob = JobClient.runJob(job);
counters.stopClock();
// TODO(aaron): Is this the correct way to determine how much data got written?
counters.addBytes(runningJob.getCounters().getGroup("FileSystemCounters")
.getCounterForName("FILE_BYTES_WRITTEN").getCounter());
LOG.info("Transferred " + counters.toString());
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapred;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes plain text files.
* Only writes the key. Does not write any delimiter/newline after the key.
*/
public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
protected static class RawKeyRecordWriter<K, V>
implements RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
protected DataOutputStream out;
public RawKeyRecordWriter(DataOutputStream out) {
this.out = out;
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
writeObject(key);
}
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}
public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new RawKeyRecordWriter<K, V>(fileOut);
} else {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file =
FileOutputFormat.getTaskOutputPath(job,
name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new RawKeyRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)));
}
}
}

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.sqoop.manager.SqlManager;
import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import java.io.File;
import java.io.FileOutputStream;
@ -52,7 +55,7 @@ public class ClassWriter {
*
* If the way that we generate classes, bump this number.
*/
public static final int CLASS_WRITER_VERSION = 1;
public static final int CLASS_WRITER_VERSION = 2;
private ImportOptions options;
private ConnManager connManager;
@ -375,8 +378,31 @@ private void generateHadoopRead(Map<String, Integer> columnTypes, String [] colN
private void generateToString(Map<String, Integer> columnTypes, String [] colNames,
StringBuilder sb) {
// Embed the delimiters into the class, as characters...
sb.append(" private static final char __OUTPUT_FIELD_DELIM_CHAR = " +
+ (int)options.getOutputFieldDelim() + ";\n");
sb.append(" private static final char __OUTPUT_RECORD_DELIM_CHAR = "
+ (int)options.getOutputRecordDelim() + ";\n");
// as strings...
sb.append(" private static final String __OUTPUT_FIELD_DELIM = \"\" + (char) "
+ (int) options.getOutputFieldDelim() + ";\n");
sb.append(" private static final String __OUTPUT_RECORD_DELIM = \"\" + (char) "
+ (int) options.getOutputRecordDelim() + ";\n");
sb.append(" private static final String __OUTPUT_ENCLOSED_BY = \"\" + (char) "
+ (int) options.getOutputEnclosedBy() + ";\n");
sb.append(" private static final String __OUTPUT_ESCAPED_BY = \"\" + (char) "
+ (int) options.getOutputEscapedBy() + ";\n");
// and some more options.
sb.append(" private static final boolean __OUTPUT_ENCLOSE_REQUIRED = "
+ options.isOutputEncloseRequired() + ";\n");
sb.append(" private static final char [] __OUTPUT_DELIMITER_LIST = { "
+ "__OUTPUT_FIELD_DELIM_CHAR, __OUTPUT_RECORD_DELIM_CHAR };\n\n");
// The actual toString() method itself follows.
sb.append(" public String toString() {\n");
sb.append(" StringBuilder sb = new StringBuilder();\n");
sb.append(" StringBuilder __sb = new StringBuilder();\n");
boolean first = true;
for (String col : colNames) {
@ -388,8 +414,8 @@ private void generateToString(Map<String, Integer> columnTypes, String [] colNam
}
if (!first) {
// TODO(aaron): Support arbitrary record delimiters
sb.append(" sb.append(\",\");\n");
// print inter-field tokens.
sb.append(" __sb.append(__OUTPUT_FIELD_DELIM);\n");
}
first = false;
@ -400,14 +426,132 @@ private void generateToString(Map<String, Integer> columnTypes, String [] colNam
continue;
}
sb.append(" sb.append(" + stringExpr + ");\n");
sb.append(" __sb.append(FieldFormatter.escapeAndEnclose(" + stringExpr
+ ", __OUTPUT_ESCAPED_BY, __OUTPUT_ENCLOSED_BY, __OUTPUT_DELIMITER_LIST, "
+ "__OUTPUT_ENCLOSE_REQUIRED));\n");
}
sb.append(" return sb.toString();\n");
sb.append(" __sb.append(__OUTPUT_RECORD_DELIM);\n");
sb.append(" return __sb.toString();\n");
sb.append(" }\n");
}
/**
* Helper method for generateParser(). Writes out the parse() method for one particular
* type we support as an input string-ish type.
*/
private void generateParseMethod(String typ, StringBuilder sb) {
sb.append(" public void parse(" + typ + " __record) throws RecordParser.ParseError {\n");
sb.append(" if (null == this.__parser) {\n");
sb.append(" this.__parser = new RecordParser(__INPUT_FIELD_DELIM_CHAR, ");
sb.append("__INPUT_RECORD_DELIM_CHAR, __INPUT_ENCLOSED_BY_CHAR, __INPUT_ESCAPED_BY_CHAR, ");
sb.append("__INPUT_ENCLOSE_REQUIRED);\n");
sb.append(" }\n");
sb.append(" List<String> __fields = this.__parser.parseRecord(__record);\n");
sb.append(" __loadFromFields(__fields);\n");
sb.append(" }\n\n");
}
/**
* Helper method for parseColumn(). Interpret the string 'null' as a null
* for a particular column.
*/
private void parseNullVal(String colName, StringBuilder sb) {
sb.append(" if (__cur_str.equals(\"null\")) { this.");
sb.append(colName);
sb.append(" = null; } else {\n");
}
/**
* Helper method for generateParser(). Generates the code that loads one field of
* a specified name and type from the next element of the field strings list.
*/
private void parseColumn(String colName, int colType, StringBuilder sb) {
// assume that we have __it and __cur_str vars, based on __loadFromFields() code.
sb.append(" __cur_str = __it.next();\n");
String javaType = SqlManager.toJavaType(colType);
parseNullVal(colName, sb);
if (javaType.equals("String")) {
// TODO(aaron): Distinguish between 'null' and null. Currently they both set the
// actual object to null.
sb.append(" this." + colName + " = __cur_str;\n");
} else if (javaType.equals("Integer")) {
sb.append(" this." + colName + " = Integer.valueOf(__cur_str);\n");
} else if (javaType.equals("Long")) {
sb.append(" this." + colName + " = Long.valueOf(__cur_str);\n");
} else if (javaType.equals("Float")) {
sb.append(" this." + colName + " = Float.valueOf(__cur_str);\n");
} else if (javaType.equals("Double")) {
sb.append(" this." + colName + " = Double.valueOf(__cur_str);\n");
} else if (javaType.equals("Boolean")) {
sb.append(" this." + colName + " = Boolean.valueOf(__cur_str);\n");
} else if (javaType.equals("java.sql.Date")) {
sb.append(" this." + colName + " = java.sql.Date.valueOf(__cur_str);\n");
} else if (javaType.equals("java.sql.Time")) {
sb.append(" this." + colName + " = java.sql.Time.valueOf(__cur_str);\n");
} else if (javaType.equals("java.sql.Timestamp")) {
sb.append(" this." + colName + " = java.sql.Timestamp.valueOf(__cur_str);\n");
} else if (javaType.equals("java.math.BigDecimal")) {
sb.append(" this." + colName + " = new java.math.BigDecimal(__cur_str);\n");
} else {
LOG.error("No parser available for Java type " + javaType);
}
sb.append(" }\n\n"); // the closing '{' based on code in parseNullVal();
}
/**
* Generate the parse() method
* @param columnTypes - mapping from column names to sql types
* @param colNames - ordered list of column names for table.
* @param sb - StringBuilder to append code to
*/
private void generateParser(Map<String, Integer> columnTypes, String [] colNames,
StringBuilder sb) {
// Embed into the class the delimiter characters to use when parsing input records.
// Note that these can differ from the delims to use as output via toString(), if
// the user wants to use this class to convert one format to another.
sb.append(" private static final char __INPUT_FIELD_DELIM_CHAR = " +
+ (int)options.getInputFieldDelim() + ";\n");
sb.append(" private static final char __INPUT_RECORD_DELIM_CHAR = "
+ (int)options.getInputRecordDelim() + ";\n");
sb.append(" private static final char __INPUT_ENCLOSED_BY_CHAR = "
+ (int)options.getInputEnclosedBy() + ";\n");
sb.append(" private static final char __INPUT_ESCAPED_BY_CHAR = "
+ (int)options.getInputEscapedBy() + ";\n");
sb.append(" private static final boolean __INPUT_ENCLOSE_REQUIRED = "
+ options.isInputEncloseRequired() + ";\n");
// The parser object which will do the heavy lifting for field splitting.
sb.append(" private RecordParser __parser;\n");
// Generate wrapper methods which will invoke the parser.
generateParseMethod("Text", sb);
generateParseMethod("CharSequence", sb);
generateParseMethod("byte []", sb);
generateParseMethod("char []", sb);
generateParseMethod("ByteBuffer", sb);
generateParseMethod("CharBuffer", sb);
// The wrapper methods call __loadFromFields() to actually interpret the raw
// field data as string, int, boolean, etc. The generation of this method is
// type-dependent for the fields.
sb.append(" private void __loadFromFields(List<String> fields) {\n");
sb.append(" Iterator<String> __it = fields.listIterator();\n");
sb.append(" String __cur_str;\n");
for (String colName : colNames) {
int colType = columnTypes.get(colName);
parseColumn(colName, colType, sb);
}
sb.append(" }\n\n");
}
/**
* Generate the write() method used by the Hadoop RPC system
* @param columnTypes - mapping from column names to sql types
@ -534,18 +678,25 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
sb.append("import org.apache.hadoop.io.Writable;\n");
sb.append("import org.apache.hadoop.mapred.lib.db.DBWritable;\n");
sb.append("import " + JdbcWritableBridge.class.getCanonicalName() + ";\n");
sb.append("import " + FieldFormatter.class.getCanonicalName() + ";\n");
sb.append("import " + RecordParser.class.getCanonicalName() + ";\n");
sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
sb.append("import java.sql.PreparedStatement;\n");
sb.append("import java.sql.ResultSet;\n");
sb.append("import java.sql.SQLException;\n");
sb.append("import java.io.DataInput;\n");
sb.append("import java.io.DataOutput;\n");
sb.append("import java.io.IOException;\n");
sb.append("import java.nio.ByteBuffer;\n");
sb.append("import java.nio.CharBuffer;\n");
sb.append("import java.sql.Date;\n");
sb.append("import java.sql.Time;\n");
sb.append("import java.sql.Timestamp;\n");
sb.append("import java.util.Iterator;\n");
sb.append("import java.util.List;\n");
String className = tableNameInfo.getShortClassForTable(tableName);
sb.append("public class " + className + " implements DBWritable, Writable {\n");
sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n");
sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n");
generateFields(columnTypes, colNames, sb);
generateDbRead(columnTypes, colNames, sb);
@ -553,6 +704,7 @@ public StringBuilder generateClassForColumns(Map<String, Integer> columnTypes,
generateHadoopRead(columnTypes, colNames, sb);
generateHadoopWrite(columnTypes, colNames, sb);
generateToString(columnTypes, colNames, sb);
generateParser(columnTypes, colNames, sb);
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
sb.append("}\n");

View File

@ -24,6 +24,7 @@
import java.util.List;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* Recursive file listing under a specified directory.
@ -101,4 +102,27 @@ private static void validateDirectory(File aDirectory) throws FileNotFoundExcept
throw new IllegalArgumentException("Directory cannot be read: " + aDirectory);
}
}
/**
* Recursively delete a directory and all its children
* @param aStartingDir is a valid directory.
*/
public static void recursiveDeleteDir(File dir) throws IOException {
if (!dir.exists()) {
throw new FileNotFoundException(dir.toString() + " does not exist");
}
if (dir.isDirectory()) {
// recursively descend into all children and delete them.
File [] children = dir.listFiles();
for (File child : children) {
recursiveDeleteDir(child);
}
}
if (!dir.delete()) {
throw new IOException("Could not remove: " + dir);
}
}
}

View File

@ -45,8 +45,16 @@ public LoggingStreamHandlerFactory(final Log context) {
}
}
private Thread child;
public void processStream(InputStream is) {
new LoggingThread(is).start();
child = new LoggingThread(is);
child.start();
}
public int join() throws InterruptedException {
child.join();
return 0; // always successful.
}
/**

View File

@ -34,8 +34,16 @@ public class NullStreamHandlerFactory implements StreamHandlerFactory {
public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName());
private Thread child;
public void processStream(InputStream is) {
new IgnoringThread(is).start();
child = new IgnoringThread(is);
child.start();
}
public int join() throws InterruptedException {
child.join();
return 0; // always successful.
}
/**

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.util;
import java.text.NumberFormat;
/**
* A quick set of performance counters for reporting import speed.
*/
public class PerfCounters {
private long bytes;
private long nanoseconds;
private long startTime;
public PerfCounters() {
}
public void addBytes(long more) {
bytes += more;
}
public void startClock() {
startTime = System.nanoTime();
}
public void stopClock() {
nanoseconds = System.nanoTime() - startTime;
}
private static final double ONE_BILLION = 1000.0 * 1000.0 * 1000.0;
/** maximum number of digits after the decimal place */
private static final int MAX_PLACES = 4;
/**
* @return A value in nanoseconds scaled to report in seconds
*/
private Double inSeconds(long nanos) {
return (double) nanos / ONE_BILLION;
}
private static final long ONE_GB = 1024 * 1024 * 1024;
private static final long ONE_MB = 1024 * 1024;
private static final long ONE_KB = 1024;
/**
* @return a string of the form "xxxx bytes" or "xxxxx KB" or "xxxx GB", scaled
* as is appropriate for the current value.
*/
private String formatBytes() {
double val;
String scale;
if (bytes > ONE_GB) {
val = (double) bytes / (double) ONE_GB;
scale = "GB";
} else if (bytes > ONE_MB) {
val = (double) bytes / (double) ONE_MB;
scale = "MB";
} else if (bytes > ONE_KB) {
val = (double) bytes / (double) ONE_KB;
scale = "KB";
} else {
val = (double) bytes;
scale = "bytes";
}
NumberFormat fmt = NumberFormat.getInstance();
fmt.setMaximumFractionDigits(MAX_PLACES);
return fmt.format(val) + " " + scale;
}
private String formatTimeInSeconds() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setMaximumFractionDigits(MAX_PLACES);
return fmt.format(inSeconds(this.nanoseconds)) + " seconds";
}
/**
* @return a string of the form "xxx bytes/sec" or "xxx KB/sec" scaled as is
* appropriate for the current value.
*/
private String formatSpeed() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setMaximumFractionDigits(MAX_PLACES);
Double seconds = inSeconds(this.nanoseconds);
double speed = (double) bytes / seconds;
double val;
String scale;
if (speed > ONE_GB) {
val = speed / (double) ONE_GB;
scale = "GB";
} else if (speed > ONE_MB) {
val = speed / (double) ONE_MB;
scale = "MB";
} else if (speed > ONE_KB) {
val = speed / (double) ONE_KB;
scale = "KB";
} else {
val = speed;
scale = "bytes";
}
return fmt.format(val) + " " + scale + "/sec";
}
public String toString() {
return formatBytes() + " in " + formatTimeInSeconds() + " (" + formatSpeed() + ")";
}
}

View File

@ -35,5 +35,11 @@ public interface StreamHandlerFactory {
* continue to run until the InputStream is exhausted.
*/
void processStream(InputStream is);
/**
* Wait until the stream has been processed.
* @return a status code indicating success or failure. 0 is typical for success.
*/
int join() throws InterruptedException;
}

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.sqoop;
import org.apache.hadoop.sqoop.hive.TestHiveImport;
import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
import org.apache.hadoop.sqoop.orm.TestParseMethods;
import junit.framework.Test;
import junit.framework.TestSuite;
@ -51,6 +54,10 @@ public static Test suite() {
suite.addTestSuite(LocalMySQLTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(TestHiveImport.class);
suite.addTestSuite(TestRecordParser.class);
suite.addTestSuite(TestFieldFormatter.class);
suite.addTestSuite(TestImportOptions.class);
suite.addTestSuite(TestParseMethods.class);
return suite;
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop;
import junit.framework.TestCase;
/**
* Test aspects of the ImportOptions class
*/
public class TestImportOptions extends TestCase {
// tests for the toChar() parser
public void testNormalChar() throws ImportOptions.InvalidOptionsException {
assertEquals('a', ImportOptions.toChar("a"));
}
public void testEmptyString() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar("");
fail("Expected exception");
} catch (ImportOptions.InvalidOptionsException ioe) {
// expect this.
}
}
public void testNullString() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar(null);
fail("Expected exception");
} catch (ImportOptions.InvalidOptionsException ioe) {
// expect this.
}
}
public void testTooLong() throws ImportOptions.InvalidOptionsException {
// Should just use the first character and log a warning.
assertEquals('x', ImportOptions.toChar("xyz"));
}
public void testHexChar1() throws ImportOptions.InvalidOptionsException {
assertEquals(0xF, ImportOptions.toChar("\\0xf"));
}
public void testHexChar2() throws ImportOptions.InvalidOptionsException {
assertEquals(0xF, ImportOptions.toChar("\\0xF"));
}
public void testHexChar3() throws ImportOptions.InvalidOptionsException {
assertEquals(0xF0, ImportOptions.toChar("\\0xf0"));
}
public void testHexChar4() throws ImportOptions.InvalidOptionsException {
assertEquals(0xF0, ImportOptions.toChar("\\0Xf0"));
}
public void testEscapeChar1() throws ImportOptions.InvalidOptionsException {
assertEquals('\n', ImportOptions.toChar("\\n"));
}
public void testEscapeChar2() throws ImportOptions.InvalidOptionsException {
assertEquals('\\', ImportOptions.toChar("\\\\"));
}
public void testEscapeChar3() throws ImportOptions.InvalidOptionsException {
assertEquals('\\', ImportOptions.toChar("\\"));
}
public void testUnknownEscape1() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar("\\Q");
fail("Expected exception");
} catch (ImportOptions.InvalidOptionsException ioe) {
// expect this.
}
}
public void testUnknownEscape2() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar("\\nn");
fail("Expected exception");
} catch (ImportOptions.InvalidOptionsException ioe) {
// expect this.
}
}
public void testEscapeNul1() throws ImportOptions.InvalidOptionsException {
assertEquals('\000', ImportOptions.toChar("\\0"));
}
public void testEscapeNul2() throws ImportOptions.InvalidOptionsException {
assertEquals('\000', ImportOptions.toChar("\\00"));
}
public void testEscapeNul3() throws ImportOptions.InvalidOptionsException {
assertEquals('\000', ImportOptions.toChar("\\0000"));
}
public void testEscapeNul4() throws ImportOptions.InvalidOptionsException {
assertEquals('\000', ImportOptions.toChar("\\0x0"));
}
public void testOctalChar1() throws ImportOptions.InvalidOptionsException {
assertEquals(04, ImportOptions.toChar("\\04"));
}
public void testOctalChar2() throws ImportOptions.InvalidOptionsException {
assertEquals(045, ImportOptions.toChar("\\045"));
}
public void testErrOctalChar() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar("\\095");
fail("Expected exception");
} catch (NumberFormatException nfe) {
// expected.
}
}
public void testErrHexChar() throws ImportOptions.InvalidOptionsException {
try {
ImportOptions.toChar("\\0x9K5");
fail("Expected exception");
} catch (NumberFormatException nfe) {
// expected.
}
}
// test that setting output delimiters also sets input delimiters
public void testDelimitersInherit() throws ImportOptions.InvalidOptionsException {
String [] args = {
"--fields-terminated-by",
"|"
};
ImportOptions opts = new ImportOptions();
opts.parse(args);
assertEquals('|', opts.getInputFieldDelim());
assertEquals('|', opts.getOutputFieldDelim());
}
// test that setting output delimiters and setting input delims separately works
public void testDelimOverride1() throws ImportOptions.InvalidOptionsException {
String [] args = {
"--fields-terminated-by",
"|",
"--input-fields-terminated-by",
"*"
};
ImportOptions opts = new ImportOptions();
opts.parse(args);
assertEquals('*', opts.getInputFieldDelim());
assertEquals('|', opts.getOutputFieldDelim());
}
// test that the order in which delims are specified doesn't matter
public void testDelimOverride2() throws ImportOptions.InvalidOptionsException {
String [] args = {
"--input-fields-terminated-by",
"*",
"--fields-terminated-by",
"|"
};
ImportOptions opts = new ImportOptions();
opts.parse(args);
assertEquals('*', opts.getInputFieldDelim());
assertEquals('|', opts.getOutputFieldDelim());
}
}

View File

@ -28,8 +28,6 @@
* some of which may contain null values.
*
* Also test loading only selected columns from the db.
*
*
*/
public class TestMultiCols extends ImportJobTestCase {

View File

@ -35,8 +35,6 @@
/**
* Test that --order-by works
*
*
*/
public class TestOrderBy extends ImportJobTestCase {
@ -152,11 +150,11 @@ public void runOrderByTest(String orderByCol, String firstValStr, int expectedSu
public void testOrderByFirstCol() throws IOException {
String orderByCol = "INTFIELD1";
runOrderByTest(orderByCol, "1,8", HsqldbTestServer.getFirstColSum());
runOrderByTest(orderByCol, "1,8\n", HsqldbTestServer.getFirstColSum());
}
public void testOrderBySecondCol() throws IOException {
String orderByCol = "INTFIELD2";
runOrderByTest(orderByCol, "7,2", HsqldbTestServer.getFirstColSum());
runOrderByTest(orderByCol, "7,2\n", HsqldbTestServer.getFirstColSum());
}
}

View File

@ -38,8 +38,6 @@
* Methods essentially copied out of the other Test* classes.
* TODO(kevin or aaron): Factor out these common test methods
* so that every new Test* class doesn't need to copy the code.
*
*
*/
public class TestWhere extends ImportJobTestCase {
@ -97,8 +95,8 @@ private int getFirstInt(String str) {
return Integer.parseInt(parts[0]);
}
public void runWhereTest(String whereClause, String firstValStr, int numExpectedResults, int expectedSum)
throws IOException {
public void runWhereTest(String whereClause, String firstValStr, int numExpectedResults,
int expectedSum) throws IOException {
String [] columns = HsqldbTestServer.getFieldNames();
ClassLoader prevClassLoader = null;
@ -160,11 +158,11 @@ public void runWhereTest(String whereClause, String firstValStr, int numExpected
public void testSingleClauseWhere() throws IOException {
String whereClause = "INTFIELD2 > 4";
runWhereTest(whereClause, "1,8", 2, 4);
runWhereTest(whereClause, "1,8\n", 2, 4);
}
public void testMultiClauseWhere() throws IOException {
String whereClause = "INTFIELD1 > 4 AND INTFIELD2 < 3";
runWhereTest(whereClause, "7,2", 1, 7);
runWhereTest(whereClause, "7,2\n", 1, 7);
}
}

View File

@ -42,7 +42,7 @@ public class TestHiveImport extends ImportJobTestCase {
* Create the argv to pass to Sqoop
* @return the argv as an array of strings.
*/
private String [] getArgv(boolean includeHadoopFlags) {
private String [] getArgv(boolean includeHadoopFlags, String [] moreArgs) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
@ -64,13 +64,19 @@ public class TestHiveImport extends ImportJobTestCase {
args.add("--order-by");
args.add(getColNames()[0]);
if (null != moreArgs) {
for (String arg: moreArgs) {
args.add(arg);
}
}
return args.toArray(new String[0]);
}
private ImportOptions getImportOptions() {
private ImportOptions getImportOptions(String [] extraArgs) {
ImportOptions opts = new ImportOptions();
try {
opts.parse(getArgv(false));
opts.parse(getArgv(false, extraArgs));
} catch (ImportOptions.InvalidOptionsException ioe) {
fail("Invalid options: " + ioe.toString());
}
@ -79,7 +85,7 @@ private ImportOptions getImportOptions() {
}
private void runImportTest(String tableName, String [] types, String [] values,
String verificationScript) throws IOException {
String verificationScript, String [] extraArgs) throws IOException {
// create a table and populate it with a row...
setCurTableName(tableName);
@ -87,14 +93,14 @@ private void runImportTest(String tableName, String [] types, String [] values,
// set up our mock hive shell to compare our generated script
// against the correct expected one.
ImportOptions options = getImportOptions();
ImportOptions options = getImportOptions(extraArgs);
String hiveHome = options.getHiveHome();
assertNotNull("hive.home was not set", hiveHome);
Path testDataPath = new Path(new Path(hiveHome), "scripts/" + verificationScript);
System.setProperty("expected.script", testDataPath.toString());
// verify that we can import it correctly into hive.
runImport(getArgv(true));
runImport(getArgv(true, extraArgs));
}
/** Test that strings and ints are handled in the normal fashion */
@ -102,7 +108,7 @@ private void runImportTest(String tableName, String [] types, String [] values,
public void testNormalHiveImport() throws IOException {
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
String [] vals = { "'test'", "42", "'somestring'" };
runImportTest("NORMAL_HIVE_IMPORT", types, vals, "normalImport.q");
runImportTest("NORMAL_HIVE_IMPORT", types, vals, "normalImport.q", null);
}
/** Test that dates are coerced properly to strings */
@ -110,7 +116,7 @@ public void testNormalHiveImport() throws IOException {
public void testDate() throws IOException {
String [] types = { "VARCHAR(32)", "DATE" };
String [] vals = { "'test'", "'2009-05-12'" };
runImportTest("DATE_HIVE_IMPORT", types, vals, "dateImport.q");
runImportTest("DATE_HIVE_IMPORT", types, vals, "dateImport.q", null);
}
/** Test that NUMERICs are coerced to doubles */
@ -118,7 +124,7 @@ public void testDate() throws IOException {
public void testNumeric() throws IOException {
String [] types = { "NUMERIC", "CHAR(64)" };
String [] vals = { "3.14159", "'foo'" };
runImportTest("NUMERIC_HIVE_IMPORT", types, vals, "numericImport.q");
runImportTest("NUMERIC_HIVE_IMPORT", types, vals, "numericImport.q", null);
}
/** If bin/hive returns an error exit status, we should get an IOException */
@ -129,7 +135,7 @@ public void testHiveExitFails() {
String [] types = { "NUMERIC", "CHAR(64)" };
String [] vals = { "3.14159", "'foo'" };
try {
runImportTest("FAILING_HIVE_IMPORT", types, vals, "failingImport.q");
runImportTest("FAILING_HIVE_IMPORT", types, vals, "failingImport.q", null);
// If we get here, then the run succeeded -- which is incorrect.
fail("FAILING_HIVE_IMPORT test should have thrown IOException");
} catch (IOException ioe) {
@ -137,5 +143,14 @@ public void testHiveExitFails() {
}
}
/** Test that we can set delimiters how we want them */
@Test
public void testCustomDelimiters() throws IOException {
String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
String [] vals = { "'test'", "42", "'somestring'" };
String [] extraArgs = { "--fields-terminated-by", ",", "--lines-terminated-by", "|" };
runImportTest("CUSTOM_DELIM_IMPORT", types, vals, "customDelimImport.q", extraArgs);
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
/**
* Test that the field formatter works in a variety of configurations
*/
public class TestFieldFormatter extends TestCase {
public void testAllEmpty() {
char [] chars = new char[0];
String result = FieldFormatter.escapeAndEnclose("", "", "", chars, false);
assertEquals("", result);
}
public void testNullArgs() {
String result = FieldFormatter.escapeAndEnclose("", null, null, null, false);
assertEquals("", result);
}
public void testBasicStr() {
String result = FieldFormatter.escapeAndEnclose("foo", null, null, null, false);
assertEquals("foo", result);
}
public void testEscapeSlash() {
String result = FieldFormatter.escapeAndEnclose("foo\\bar", "\\", "\"", null, false);
assertEquals("foo\\\\bar", result);
}
public void testMustEnclose() {
String result = FieldFormatter.escapeAndEnclose("foo", null, "\"", null, true);
assertEquals("\"foo\"", result);
}
public void testEncloseComma1() {
char [] chars = { ',' };
String result = FieldFormatter.escapeAndEnclose("foo,bar", "\\", "\"", chars, false);
assertEquals("\"foo,bar\"", result);
}
public void testEncloseComma2() {
char [] chars = { '\n', ',' };
String result = FieldFormatter.escapeAndEnclose("foo,bar", "\\", "\"", chars, false);
assertEquals("\"foo,bar\"", result);
}
public void testEncloseComma3() {
char [] chars = { ',', '\n' };
String result = FieldFormatter.escapeAndEnclose("foo,bar", "\\", "\"", chars, false);
assertEquals("\"foo,bar\"", result);
}
public void testNoNeedToEnclose() {
char [] chars = { ',', '\n' };
String result = FieldFormatter.escapeAndEnclose(
"just another string", "\\", "\"", chars, false);
assertEquals("just another string", result);
}
public void testCannotEnclose1() {
char [] chars = { ',', '\n' };
// can't enclose because encloser is ""
String result = FieldFormatter.escapeAndEnclose("foo,bar", "\\", "", chars, false);
assertEquals("foo,bar", result);
}
public void testCannotEnclose2() {
char [] chars = { ',', '\n' };
// can't enclose because encloser is null
String result = FieldFormatter.escapeAndEnclose("foo,bar", "\\", null, chars, false);
assertEquals("foo,bar", result);
}
public void testEmptyCharToEscapeString() {
// test what happens when the escape char is null. It should encode the null char.
char nul = '\000';
String s = "" + nul;
assertEquals("\000", s);
}
public void testEscapeCentralQuote() {
String result = FieldFormatter.escapeAndEnclose("foo\"bar", "\\", "\"", null, false);
assertEquals("foo\\\"bar", result);
}
public void testEscapeMultiCentralQuote() {
String result = FieldFormatter.escapeAndEnclose("foo\"\"bar", "\\", "\"", null, false);
assertEquals("foo\\\"\\\"bar", result);
}
public void testDoubleEscape() {
String result = FieldFormatter.escapeAndEnclose("foo\\\"bar", "\\", "\"", null, false);
assertEquals("foo\\\\\\\"bar", result);
}
public void testReverseEscape() {
String result = FieldFormatter.escapeAndEnclose("foo\"\\bar", "\\", "\"", null, false);
assertEquals("foo\\\"\\\\bar", result);
}
public void testQuotedEncloser() {
char [] chars = { ',', '\n' };
String result = FieldFormatter.escapeAndEnclose("foo\",bar", "\\", "\"", chars, false);
assertEquals("\"foo\\\",bar\"", result);
}
public void testQuotedEscape() {
char [] chars = { ',', '\n' };
String result = FieldFormatter.escapeAndEnclose("foo\\,bar", "\\", "\"", chars, false);
assertEquals("\"foo\\\\,bar\"", result);
}
}

View File

@ -0,0 +1,356 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.lib;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
/**
* Test that the record parser works in a variety of configurations.
*/
public class TestRecordParser extends TestCase {
private void assertListsEqual(String msg, List<String> expected, List<String> actual) {
if (expected == null && actual != null) {
if (null == msg) {
msg = "expected null list";
}
fail(msg);
} else if (expected != null && actual == null) {
if (null == msg) {
msg = "expected non-null list";
}
fail(msg);
}
if (expected == null && actual == null) {
return; // ok. Both null; nothing to do.
}
int expectedLen = expected.size();
int actualLen = actual.size();
if (expectedLen != actualLen) {
if (null == msg) {
msg = "Expected list of length " + expectedLen + "; got " + actualLen;
}
fail(msg);
}
// Check the list contents.
for (int i = 0; i < expectedLen; i++) {
String expectedElem = expected.get(i);
String actualElem = actual.get(i);
if (expectedElem == null && actualElem != null) {
if (null == msg) {
msg = "Expected null element at position " + i + "; got [" + actualElem + "]";
}
fail(msg);
}
if (!expectedElem.equals(actualElem)) {
if (null == msg) {
msg = "Expected [" + expectedElem + "] at position " + i + "; got [" + actualElem + "]";
}
fail(msg);
}
}
}
private List<String> list(String [] items) {
if (null == items) {
return null;
}
ArrayList<String> asList = new ArrayList<String>();
for (int i = 0; i < items.length; i++) {
asList.add(items[i]);
}
return asList;
}
public void testEmptyLine() throws RecordParser.ParseError {
// an empty line should return no fields.
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { };
assertListsEqual(null, list(strings), parser.parseRecord(""));
}
public void testJustEOR() throws RecordParser.ParseError {
// a line with just a newline char should return a single zero-length field.
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "" };
assertListsEqual(null, list(strings), parser.parseRecord("\n"));
}
public void testOneField() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the field" };
assertListsEqual(null, list(strings), parser.parseRecord("the field"));
}
public void testOneField2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the field" };
assertListsEqual(null, list(strings), parser.parseRecord("the field\n"));
}
public void testQuotedField1() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the field" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the field\"\n"));
}
public void testQuotedField2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the field" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the field\""));
}
public void testQuotedField3() throws RecordParser.ParseError {
// quoted containing EOF
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the ,field" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the ,field\""));
}
public void testQuotedField4() throws RecordParser.ParseError {
// quoted containing multiple EOFs
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the ,,field" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the ,,field\""));
}
public void testQuotedField5() throws RecordParser.ParseError {
// quoted containing EOF and EOR
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the ,\nfield" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the ,\nfield\""));
}
public void testQuotedField6() throws RecordParser.ParseError {
// quoted containing EOR
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the \nfield" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the \nfield\""));
}
public void testQuotedField7() throws RecordParser.ParseError {
// quoted containing multiple EORs
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the \n\nfield" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the \n\nfield\""));
}
public void testQuotedField8() throws RecordParser.ParseError {
// quoted containing escaped quoted char
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the \"field" };
assertListsEqual(null, list(strings), parser.parseRecord("\"the \\\"field\""));
}
public void testUnquotedEscape1() throws RecordParser.ParseError {
// field without quotes with an escaped EOF char.
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the ,field" };
assertListsEqual(null, list(strings), parser.parseRecord("the \\,field"));
}
public void testUnquotedEscape2() throws RecordParser.ParseError {
// field without quotes with an escaped escape char.
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "the \\field" };
assertListsEqual(null, list(strings), parser.parseRecord("the \\\\field"));
}
public void testTwoFields1() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("field1,field2"));
}
public void testTwoFields2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("field1,field2\n"));
}
public void testTwoFields3() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("\"field1\",field2\n"));
}
public void testTwoFields4() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("field1,\"field2\"\n"));
}
public void testTwoFields5() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("field1,\"field2\""));
}
public void testRequiredQuotes0() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("\"field1\",\"field2\"\n"));
}
public void testRequiredQuotes1() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String [] strings = { "field1", "field2" };
assertListsEqual(null, list(strings), parser.parseRecord("\"field1\",\"field2\""));
}
public void testRequiredQuotes2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String [] strings = { "field1", "field2" };
try {
parser.parseRecord("\"field1\",field2");
fail("Expected parse error for required quotes");
} catch (RecordParser.ParseError pe) {
// ok. expected.
}
}
public void testRequiredQuotes3() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String [] strings = { "field1", "field2" };
try {
parser.parseRecord("field1,\"field2\"");
fail("Expected parse error for required quotes");
} catch (RecordParser.ParseError pe) {
// ok. expected.
}
}
public void testRequiredQuotes4() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String [] strings = { "field1", "field2" };
try {
parser.parseRecord("field1,\"field2\"\n");
fail("Expected parse error for required quotes");
} catch (RecordParser.ParseError pe) {
// ok. expected.
}
}
public void testNull() {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', true);
String input = null;
try {
parser.parseRecord(input);
fail("Expected parse error for null string");
} catch (RecordParser.ParseError pe) {
// ok. expected.
}
}
public void testEmptyFields1() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "", ""};
assertListsEqual(null, list(strings), parser.parseRecord(","));
}
public void testEmptyFields2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "", "" };
assertListsEqual(null, list(strings), parser.parseRecord(",\n"));
}
public void testEmptyFields3() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "", "", "" };
assertListsEqual(null, list(strings), parser.parseRecord(",,\n"));
}
public void testEmptyFields4() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "", "foo", "" };
assertListsEqual(null, list(strings), parser.parseRecord(",foo,\n"));
}
public void testEmptyFields5() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "", "foo", "" };
assertListsEqual(null, list(strings), parser.parseRecord(",foo,"));
}
public void testEmptyFields6() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "foo", "" };
assertListsEqual(null, list(strings), parser.parseRecord("foo,"));
}
public void testTrailingText() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "foo", "bar" };
assertListsEqual(null, list(strings), parser.parseRecord("foo,bar\nbaz"));
}
public void testTrailingText2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "" };
assertListsEqual(null, list(strings), parser.parseRecord("\nbaz"));
}
public void testLeadingEscape() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', '\n', '\"', '\\', false);
String [] strings = { "\nbaz" };
assertListsEqual(null, list(strings), parser.parseRecord("\\\nbaz"));
}
public void testEofIsEor() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', ',', '\"', '\\', false);
String [] strings = { "three", "different", "fields" };
assertListsEqual(null, list(strings), parser.parseRecord("three,different,fields"));
}
public void testEofIsEor2() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', ',', '\"', '\\', false);
String [] strings = { "three", "different", "fields" };
assertListsEqual(null, list(strings), parser.parseRecord("three,\"different\",fields"));
}
public void testRepeatedParse() throws RecordParser.ParseError {
RecordParser parser = new RecordParser(',', ',', '\"', '\\', false);
String [] strings = { "three", "different", "fields" };
assertListsEqual(null, list(strings), parser.parseRecord("three,\"different\",fields"));
String [] strings2 = { "foo", "bar" };
assertListsEqual(null, list(strings2), parser.parseRecord("foo,\"bar\""));
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.util.FileListing;
/**
* Test the LocalMySQLManager implementation.
@ -180,13 +181,11 @@ private String getCurrentUser() {
}
}
private String [] getArgv(boolean includeHadoopFlags) {
private String [] getArgv(boolean mysqlOutputDelims) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
args.add("-D");
args.add("fs.default.name=file:///");
}
args.add("-D");
args.add("fs.default.name=file:///");
args.add("--table");
args.add(TABLE_NAME);
@ -200,12 +199,27 @@ private String getCurrentUser() {
args.add("--where");
args.add("id > 1");
if (mysqlOutputDelims) {
args.add("--mysql-delimiters");
}
return args.toArray(new String[0]);
}
@Test
public void testLocalBulkImport() {
String [] argv = getArgv(true);
private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults)
throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, TABLE_NAME);
Path filePath = new Path(tablePath, "data-00000");
File tableFile = new File(tablePath.toString());
if (tableFile.exists() && tableFile.isDirectory()) {
// remove the directory before running the import.
FileListing.recursiveDeleteDir(tableFile);
}
String [] argv = getArgv(mysqlOutputDelims);
try {
runImport(argv);
} catch (IOException ioe) {
@ -214,18 +228,15 @@ public void testLocalBulkImport() {
fail(ioe.toString());
}
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, TABLE_NAME);
Path filePath = new Path(tablePath, "data-00000");
File f = new File(filePath.toString());
assertTrue("Could not find imported data file", f.exists());
BufferedReader r = null;
try {
// Read through the file and make sure it's all there.
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine());
assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine());
for (String expectedLine : expectedResults) {
assertEquals(expectedLine, r.readLine());
}
} catch (IOException ioe) {
LOG.error("Got IOException verifying results: " + ioe.toString());
ioe.printStackTrace();
@ -234,4 +245,26 @@ public void testLocalBulkImport() {
IOUtils.closeStream(r);
}
}
@Test
public void testLocalBulkImportWithDefaultDelims() throws IOException {
// no quoting of strings allowed.
String [] expectedResults = {
"2,Bob,2009-04-20,400,sales",
"3,Fred,2009-01-23,15,marketing"
};
doLocalBulkImport(false, expectedResults);
}
@Test
public void testLocalBulkImportWithMysqlQuotes() throws IOException {
// mysql quotes all string-based output.
String [] expectedResults = {
"2,'Bob','2009-04-20',400,'sales'",
"3,'Fred','2009-01-23',15,'marketing'"
};
doLocalBulkImport(true, expectedResults);
}
}

View File

@ -149,6 +149,7 @@ public void tearDown() {
args.add(AUTH_TEST_USER);
args.add("--password");
args.add(AUTH_TEST_PASS);
args.add("--mysql-delimiters");
return args.toArray(new String[0]);
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.orm;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
import org.apache.hadoop.sqoop.mapred.RawKeyTextOutputFormat;
import org.apache.hadoop.sqoop.orm.CompilationManager;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.testutil.ReparseMapper;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
* Test that the parse() methods generated in user SqoopRecord implementations
* work.
*/
public class TestParseMethods extends ImportJobTestCase {
/**
* Create the argv to pass to Sqoop
* @return the argv as an array of strings.
*/
private String [] getArgv(boolean includeHadoopFlags, String fieldTerminator,
String lineTerminator, String encloser, String escape, boolean encloserRequired) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
args.add("-D");
args.add("mapred.job.tracker=local");
args.add("-D");
args.add("mapred.map.tasks=1");
args.add("-D");
args.add("fs.default.name=file:///");
}
args.add("--table");
args.add(getTableName());
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-textfile");
args.add("--order-by");
args.add("DATA_COL0"); // always order by first column.
args.add("--fields-terminated-by");
args.add(fieldTerminator);
args.add("--lines-terminated-by");
args.add(lineTerminator);
args.add("--escaped-by");
args.add(escape);
if (encloserRequired) {
args.add("--enclosed-by");
} else {
args.add("--optionally-enclosed-by");
}
args.add(encloser);
return args.toArray(new String[0]);
}
public void runParseTest(String fieldTerminator, String lineTerminator, String encloser,
String escape, boolean encloseRequired) throws IOException {
ClassLoader prevClassLoader = null;
String [] argv = getArgv(true, fieldTerminator, lineTerminator, encloser, escape,
encloseRequired);
runImport(argv);
try {
ImportOptions opts = new ImportOptions();
String tableClassName = getTableName();
opts.parse(getArgv(false, fieldTerminator, lineTerminator, encloser, escape,
encloseRequired));
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
// make sure the user's class is loaded into our address space.
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, tableClassName);
JobConf job = new JobConf();
job.setJar(jarFileName);
// Tell the job what class we're testing.
job.set(ReparseMapper.USER_TYPE_NAME_KEY, tableClassName);
// use local mode in the same JVM.
job.set("mapred.job.tracker", "local");
job.set("fs.default.name", "file:///");
String warehouseDir = getWarehouseDir();
Path warehousePath = new Path(warehouseDir);
Path inputPath = new Path(warehousePath, getTableName());
Path outputPath = new Path(warehousePath, getTableName() + "-out");
job.setMapperClass(ReparseMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormat(RawKeyTextOutputFormat.class);
JobClient.runJob(job);
} catch (InvalidOptionsException ioe) {
fail(ioe.toString());
} finally {
if (null != prevClassLoader) {
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
public void testDefaults() throws IOException {
String [] types = { "INTEGER", "VARCHAR(32)", "INTEGER" };
String [] vals = { "64", "'foo'", "128" };
createTableWithColTypes(types, vals);
runParseTest(",", "\\n", "\\\"", "\\", false);
}
public void testRequiredEnclose() throws IOException {
String [] types = { "INTEGER", "VARCHAR(32)", "INTEGER" };
String [] vals = { "64", "'foo'", "128" };
createTableWithColTypes(types, vals);
runParseTest(",", "\\n", "\\\"", "\\", true);
}
public void testStringEscapes() throws IOException {
String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" };
String [] vals = { "'foo'", "'foo,bar'", "'foo''bar'", "'foo\\bar'", "'foo,bar''baz'" };
createTableWithColTypes(types, vals);
runParseTest(",", "\\n", "\\\'", "\\", false);
}
public void testNumericTypes() throws IOException {
String [] types = { "INTEGER", "REAL", "FLOAT", "DATE", "TIME",
"TIMESTAMP", "NUMERIC", "BOOLEAN" };
String [] vals = { "42", "36.0", "127.1", "'2009-07-02'", "'11:24:00'",
"'2009-08-13 20:32:00.1234567'", "92104916282869291837672829102857271948687.287475322",
"true" };
createTableWithColTypes(types, vals);
runParseTest(",", "\\n", "\\\'", "\\", false);
}
}

View File

@ -372,10 +372,12 @@ protected void verifyImport(String expectedVal, String [] importCols) {
assertTrue("Error: " + dataFilePath.toString() + " does not exist", f.exists());
Object readValue = SeqFileReader.getFirstValue(dataFilePath.toString());
// add trailing '\n' to expected value since SqoopRecord.toString() encodes the record delim
if (null == expectedVal) {
assertEquals("Error validating result from SeqFile", "null", readValue.toString());
assertEquals("Error validating result from SeqFile", "null\n", readValue.toString());
} else {
assertEquals("Error validating result from SeqFile", expectedVal, readValue.toString());
assertEquals("Error validating result from SeqFile", expectedVal + "\n",
readValue.toString());
}
} catch (IOException ioe) {
fail("IOException: " + ioe.toString());

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.testutil;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Test harness mapper. Instantiate the user's specific type, parse() the input
* line of text, and throw an IOException if the output toString() line of text
* differs.
*/
public class ReparseMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, NullWritable> {
public static final Log LOG = LogFactory.getLog(ReparseMapper.class.getName());
public static final String USER_TYPE_NAME_KEY = "sqoop.user.class";
private SqoopRecord userRecord;
public void configure(JobConf job) {
String userTypeName = job.get(USER_TYPE_NAME_KEY);
if (null == userTypeName) {
throw new RuntimeException("Unconfigured parameter: " + USER_TYPE_NAME_KEY);
}
LOG.info("User type name set to " + userTypeName);
this.userRecord = null;
try {
Configuration conf = new Configuration();
Class userClass = Class.forName(userTypeName, true,
Thread.currentThread().getContextClassLoader());
this.userRecord =
(SqoopRecord) ReflectionUtils.newInstance(userClass, conf);
} catch (ClassNotFoundException cnfe) {
// handled by the next block.
LOG.error("ClassNotFound exception: " + cnfe.toString());
} catch (Exception e) {
LOG.error("Got an exception reflecting user class: " + e.toString());
}
if (null == this.userRecord) {
LOG.error("Could not instantiate user record of type " + userTypeName);
throw new RuntimeException("Could not instantiate user record of type " + userTypeName);
}
}
public void map(LongWritable key, Text val, OutputCollector<Text, NullWritable> out, Reporter r)
throws IOException {
LOG.info("Mapper input line: " + val.toString());
try {
// Use the user's record class to parse the line back in.
userRecord.parse(val);
} catch (RecordParser.ParseError pe) {
LOG.error("Got parse error: " + pe.toString());
throw new IOException(pe);
}
LOG.info("Mapper output line: " + userRecord.toString());
out.collect(new Text(userRecord.toString()), NullWritable.get());
if (!userRecord.toString().equals(val.toString() + "\n")) {
// misparsed.
throw new IOException("Returned string has value [" + userRecord.toString() + "] when ["
+ val.toString() + "\n] was expected.");
}
}
}

View File

@ -0,0 +1,2 @@
CREATE TABLE CUSTOM_DELIM_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' LINES TERMINATED BY '\0174' STORED AS TEXTFILE;
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/CUSTOM_DELIM_IMPORT' INTO TABLE CUSTOM_DELIM_IMPORT;

View File

@ -1,2 +1,2 @@
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\01' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;

View File

@ -1,2 +1,2 @@
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\01' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;

View File

@ -1,2 +1,2 @@
CREATE TABLE NORMAL_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
CREATE TABLE NORMAL_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\01' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT' INTO TABLE NORMAL_HIVE_IMPORT;

View File

@ -1,2 +1,2 @@
CREATE TABLE NUMERIC_HIVE_IMPORT ( DATA_COL0 DOUBLE, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
CREATE TABLE NUMERIC_HIVE_IMPORT ( DATA_COL0 DOUBLE, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\01' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NUMERIC_HIVE_IMPORT' INTO TABLE NUMERIC_HIVE_IMPORT;