mirror of
https://github.com/apache/sqoop.git
synced 2025-05-05 03:49:14 +08:00
SQOOP-1579: Sqoop2: Data transfer to load into Hive does not work
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
cb037c16a0
commit
418b9a70f8
@ -32,12 +32,14 @@
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.error.code.HdfsConnectorError;
|
||||
import org.apache.sqoop.etl.io.DataWriter;
|
||||
import org.apache.sqoop.job.etl.Extractor;
|
||||
import org.apache.sqoop.job.etl.ExtractorContext;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
|
||||
/**
|
||||
* Extract from HDFS.
|
||||
@ -49,6 +51,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
|
||||
|
||||
private Configuration conf;
|
||||
private DataWriter dataWriter;
|
||||
private Schema schema;
|
||||
private long rowsRead = 0;
|
||||
|
||||
@Override
|
||||
@ -57,6 +60,7 @@ public void extract(ExtractorContext context, LinkConfiguration linkConfiguratio
|
||||
|
||||
conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
|
||||
dataWriter = context.getDataWriter();
|
||||
schema = context.getSchema();
|
||||
|
||||
try {
|
||||
HdfsPartition p = partition;
|
||||
@ -112,7 +116,8 @@ private void extractSequenceFile(LinkConfiguration linkConfiguration,
|
||||
while (hasNext) {
|
||||
rowsRead++;
|
||||
if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
|
||||
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString()));
|
||||
Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
|
||||
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data));
|
||||
} else {
|
||||
dataWriter.writeStringRecord(line.toString());
|
||||
}
|
||||
@ -179,7 +184,8 @@ private void extractTextFile(LinkConfiguration linkConfiguration,
|
||||
}
|
||||
rowsRead++;
|
||||
if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
|
||||
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString()));
|
||||
Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
|
||||
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data));
|
||||
} else {
|
||||
dataWriter.writeStringRecord(line.toString());
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
|
||||
@ -89,7 +90,10 @@ public void load(LoaderContext context, LinkConfiguration linkConfiguration,
|
||||
Object[] record;
|
||||
|
||||
while ((record = reader.readArrayRecord()) != null) {
|
||||
filewriter.write(HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record));
|
||||
filewriter.write(
|
||||
SqoopIDFUtils.toCSV(
|
||||
HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
|
||||
context.getSchema()));
|
||||
rowsWritten++;
|
||||
}
|
||||
} else {
|
||||
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.hdfs;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
|
||||
@ -53,8 +52,7 @@ public static Configuration configureURI(Configuration conf, LinkConfiguration l
|
||||
* @return boolean
|
||||
*/
|
||||
public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration) {
|
||||
return fromJobConfiguration.fromJobConfig.overrideNullValue != null
|
||||
&& fromJobConfiguration.fromJobConfig.overrideNullValue;
|
||||
return Boolean.TRUE.equals(fromJobConfiguration.fromJobConfig.overrideNullValue);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -64,8 +62,7 @@ public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, FromJ
|
||||
* @return boolean
|
||||
*/
|
||||
public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, ToJobConfiguration toJobConfiguration) {
|
||||
return toJobConfiguration.toJobConfig.overrideNullValue != null
|
||||
&& toJobConfiguration.toJobConfig.overrideNullValue;
|
||||
return Boolean.TRUE.equals(toJobConfiguration.toJobConfig.overrideNullValue);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -73,24 +70,22 @@ public static boolean hasCustomFormat(LinkConfiguration linkConfiguration, ToJob
|
||||
* format the record according to configuration.
|
||||
* @param linkConfiguration Link configuration
|
||||
* @param fromJobConfiguration Job configuration
|
||||
* @param record String record
|
||||
* @param record Object[] record
|
||||
* @return Object[]
|
||||
*/
|
||||
public static Object[] formatRecord(LinkConfiguration linkConfiguration,
|
||||
FromJobConfiguration fromJobConfiguration,
|
||||
String record) {
|
||||
Object[] arrayRecord = StringUtils.split(record, HdfsConstants.DEFAULT_FIELD_DELIMITER);
|
||||
|
||||
Object[] record) {
|
||||
if (fromJobConfiguration.fromJobConfig.overrideNullValue != null
|
||||
&& fromJobConfiguration.fromJobConfig.overrideNullValue) {
|
||||
for (int i = 0; i < arrayRecord.length; ++i) {
|
||||
if (arrayRecord[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) {
|
||||
arrayRecord[i] = null;
|
||||
for (int i = 0; i < record.length; ++i) {
|
||||
if (record[i] != null && record[i].equals(fromJobConfiguration.fromJobConfig.nullValue)) {
|
||||
record[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return arrayRecord;
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -99,9 +94,9 @@ public static Object[] formatRecord(LinkConfiguration linkConfiguration,
|
||||
* @param linkConfiguration Link configuration
|
||||
* @param toJobConfiguration Job configuration
|
||||
* @param record Record array
|
||||
* @return String
|
||||
* @return Object[]
|
||||
*/
|
||||
public static String formatRecord(LinkConfiguration linkConfiguration,
|
||||
public static Object[] formatRecord(LinkConfiguration linkConfiguration,
|
||||
ToJobConfiguration toJobConfiguration,
|
||||
Object[] record) {
|
||||
if (toJobConfiguration.toJobConfig.overrideNullValue != null
|
||||
@ -113,6 +108,6 @@ public static String formatRecord(LinkConfiguration linkConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
return StringUtils.join(record, HdfsConstants.DEFAULT_FIELD_DELIMITER);
|
||||
return record;
|
||||
}
|
||||
}
|
||||
|
@ -25,13 +25,10 @@
|
||||
import org.apache.sqoop.validation.validators.NotEmpty;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ConfigClass(validators = { @Validator(ToJobConfig.ToJobConfigValidator.class)})
|
||||
public class ToJobConfig {
|
||||
|
||||
public static String DEFAULT_NULL_VALUE = "NULL";
|
||||
|
||||
@Input(size = 255) public Boolean overrideNullValue;
|
||||
|
||||
@Input(size = 255) public String nullValue;
|
||||
|
@ -72,4 +72,4 @@ fromJobConfig.overrideNullValue.help = If set to true, then the null value will
|
||||
|
||||
fromJobConfig.nullValue.label = Null value
|
||||
fromJobConfig.nullValue.help = Use this particular character or sequence of characters \
|
||||
as a value representing null when outputting to a file.
|
||||
as a value representing null when outputting to a file.
|
||||
|
@ -23,7 +23,6 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -37,6 +36,11 @@
|
||||
import org.apache.sqoop.etl.io.DataWriter;
|
||||
import org.apache.sqoop.job.etl.Extractor;
|
||||
import org.apache.sqoop.job.etl.ExtractorContext;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
import org.apache.sqoop.schema.type.FixedPoint;
|
||||
import org.apache.sqoop.schema.type.FloatingPoint;
|
||||
import org.apache.sqoop.schema.type.Text;
|
||||
import org.testng.ITest;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
@ -80,11 +84,11 @@ public void setUp() throws Exception {
|
||||
FileUtils.mkdirs(inputDirectory);
|
||||
switch (this.outputFileType) {
|
||||
case TEXT_FILE:
|
||||
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
|
||||
createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
|
||||
break;
|
||||
|
||||
case SEQUENCE_FILE:
|
||||
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
|
||||
createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -99,6 +103,11 @@ public void testExtractor() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
|
||||
final boolean[] visited = new boolean[NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE];
|
||||
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, true))
|
||||
.addColumn(new FloatingPoint("col2", 4L))
|
||||
.addColumn(new Text("col3"))
|
||||
.addColumn(new Text("col4"))
|
||||
.addColumn(new Text("col5"));
|
||||
ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
|
||||
@Override
|
||||
public void writeArrayRecord(Object[] array) {
|
||||
@ -123,7 +132,7 @@ public void writeStringRecord(String text) {
|
||||
Assert.assertEquals(String.valueOf((double) index), components[1]);
|
||||
Assert.assertEquals("NULL", components[2]);
|
||||
Assert.assertEquals("'" + index + "'", components[3]);
|
||||
Assert.assertEquals("\\N", components[4]);
|
||||
Assert.assertEquals("\\\\N", components[4]);
|
||||
|
||||
visited[index - 1] = true;
|
||||
}
|
||||
@ -132,7 +141,7 @@ public void writeStringRecord(String text) {
|
||||
public void writeRecord(Object obj) {
|
||||
throw new AssertionError("Should not be writing object.");
|
||||
}
|
||||
}, null);
|
||||
}, schema);
|
||||
|
||||
LinkConfiguration emptyLinkConfig = new LinkConfiguration();
|
||||
FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
|
||||
@ -150,6 +159,11 @@ public void testOverrideNull() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
|
||||
final boolean[] visited = new boolean[NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE];
|
||||
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 4L, true))
|
||||
.addColumn(new FloatingPoint("col2", 4L))
|
||||
.addColumn(new Text("col3"))
|
||||
.addColumn(new Text("col4"))
|
||||
.addColumn(new Text("col5"));
|
||||
ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
|
||||
@Override
|
||||
public void writeArrayRecord(Object[] array) {
|
||||
@ -165,9 +179,9 @@ public void writeArrayRecord(Object[] array) {
|
||||
}
|
||||
|
||||
Assert.assertFalse(visited[index - 1]);
|
||||
Assert.assertEquals(String.valueOf((double) index), array[1]);
|
||||
Assert.assertEquals("NULL", array[2]);
|
||||
Assert.assertEquals("'" + index + "'", array[3]);
|
||||
Assert.assertEquals(String.valueOf((double) index), array[1].toString());
|
||||
Assert.assertEquals(null, array[2]);
|
||||
Assert.assertEquals(String.valueOf(index), array[3]);
|
||||
Assert.assertNull(array[4]);
|
||||
|
||||
visited[index - 1] = true;
|
||||
@ -182,7 +196,7 @@ public void writeStringRecord(String text) {
|
||||
public void writeRecord(Object obj) {
|
||||
throw new AssertionError("Should not be writing object.");
|
||||
}
|
||||
}, null);
|
||||
}, schema);
|
||||
|
||||
LinkConfiguration emptyLinkConfig = new LinkConfiguration();
|
||||
FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
|
||||
|
@ -75,15 +75,19 @@ public void testTransformRecord() throws Exception {
|
||||
LinkConfiguration linkConfiguration = new LinkConfiguration();
|
||||
FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();
|
||||
ToJobConfiguration toJobConfiguration = new ToJobConfiguration();
|
||||
final String record = "'Abe',\0,'test'";
|
||||
final Object[] arrayRecord = new Object[]{
|
||||
final Object[] fromRecord = new Object[]{
|
||||
"'Abe'",
|
||||
"\0",
|
||||
"'test'"
|
||||
};
|
||||
final Object[] toRecord = new Object[]{
|
||||
"'Abe'",
|
||||
"\0",
|
||||
"'test'"
|
||||
};
|
||||
|
||||
// No transformations
|
||||
assertArrayEquals(arrayRecord, HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, record));
|
||||
assertEquals(record, HdfsUtils.formatRecord(linkConfiguration, toJobConfiguration, arrayRecord));
|
||||
assertArrayEquals(toRecord, HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, fromRecord));
|
||||
assertArrayEquals(fromRecord, HdfsUtils.formatRecord(linkConfiguration, toJobConfiguration, toRecord));
|
||||
}
|
||||
}
|
@ -24,7 +24,6 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -33,7 +32,6 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.sqoop.common.PrefixContext;
|
||||
@ -44,6 +42,10 @@
|
||||
import org.apache.sqoop.etl.io.DataReader;
|
||||
import org.apache.sqoop.job.etl.Loader;
|
||||
import org.apache.sqoop.job.etl.LoaderContext;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
import org.apache.sqoop.schema.type.FixedPoint;
|
||||
import org.apache.sqoop.schema.type.FloatingPoint;
|
||||
import org.apache.sqoop.schema.type.Text;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
@ -96,6 +98,9 @@ public void tearDown() throws IOException {
|
||||
@Test
|
||||
public void testLoader() throws Exception {
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
|
||||
.addColumn(new FloatingPoint("col2", 4L))
|
||||
.addColumn(new Text("col3"));
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
|
||||
@ -146,6 +151,10 @@ public Object readContent() {
|
||||
@Test
|
||||
public void testOverrideNull() throws Exception {
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
|
||||
.addColumn(new FloatingPoint("col2", 8L))
|
||||
.addColumn(new Text("col3"))
|
||||
.addColumn(new Text("col4"));
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
|
||||
@ -159,7 +168,7 @@ public Object[] readArrayRecord() {
|
||||
index,
|
||||
(double)index,
|
||||
null,
|
||||
"'" + index + "'"
|
||||
String.valueOf(index)
|
||||
};
|
||||
} else {
|
||||
return null;
|
||||
@ -175,7 +184,7 @@ public String readTextRecord() {
|
||||
public Object readContent() {
|
||||
throw new AssertionError("should not be at readContent");
|
||||
}
|
||||
}, null);
|
||||
}, schema);
|
||||
LinkConfiguration linkConf = new LinkConfiguration();
|
||||
ToJobConfiguration jobConf = new ToJobConfiguration();
|
||||
jobConf.toJobConfig.outputDirectory = outputDirectory;
|
||||
@ -189,7 +198,7 @@ public Object readContent() {
|
||||
Assert.assertEquals(1, fs.listStatus(outputPath).length);
|
||||
|
||||
for (FileStatus status : fs.listStatus(outputPath)) {
|
||||
verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
|
||||
verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s");
|
||||
}
|
||||
|
||||
loader.load(context, linkConf, jobConf);
|
||||
@ -238,7 +247,7 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce
|
||||
BufferedReader textReader = new BufferedReader(in);
|
||||
|
||||
for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
|
||||
Assert.assertEquals(formatRow(format, i), textReader.readLine());
|
||||
Assert.assertEquals(textReader.readLine(), formatRow(format, i));
|
||||
}
|
||||
break;
|
||||
|
||||
@ -262,11 +271,11 @@ private void verifyOutput(FileSystem fs, Path file, String format) throws IOExce
|
||||
break;
|
||||
}
|
||||
|
||||
Text line = new Text();
|
||||
org.apache.hadoop.io.Text line = new org.apache.hadoop.io.Text();
|
||||
int index = 1;
|
||||
while (sequenceReader.next(line)) {
|
||||
Assert.assertEquals(formatRow(format, index++), line.toString());
|
||||
line = new Text();
|
||||
Assert.assertEquals(line.toString(), formatRow(format, index++));
|
||||
line = new org.apache.hadoop.io.Text();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.error.code.CSVIntermediateDataFormatError;
|
||||
import org.apache.sqoop.error.code.IntermediateDataFormatError;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
import org.apache.sqoop.schema.type.AbstractComplexListType;
|
||||
import org.apache.sqoop.schema.type.Column;
|
||||
import org.apache.sqoop.schema.type.ColumnType;
|
||||
@ -47,6 +48,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
/**
|
||||
@ -66,17 +68,29 @@ public class SqoopIDFUtils {
|
||||
// implementation.
|
||||
public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
|
||||
|
||||
public static final char[] originals = { 0x5C, 0x00, 0x0A, 0x0D, 0x1A, 0x22, 0x27 };
|
||||
public static final Map<Character, String> ORIGINALS = new TreeMap<Character, String>();
|
||||
|
||||
public static final char CSV_SEPARATOR_CHARACTER = ',';
|
||||
public static final char ESCAPE_CHARACTER = '\\';
|
||||
public static final char QUOTE_CHARACTER = '\'';
|
||||
|
||||
// string related replacements
|
||||
private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }),
|
||||
new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }),
|
||||
new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
|
||||
new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) };
|
||||
private static final Map<Character, Character> REPLACEMENTS = new TreeMap<Character, Character>();
|
||||
|
||||
static {
|
||||
ORIGINALS.put(new Character((char)0x00), new String(new char[] { ESCAPE_CHARACTER, '0' }));
|
||||
ORIGINALS.put(new Character((char)0x0A), new String(new char[] { ESCAPE_CHARACTER, 'n' }));
|
||||
ORIGINALS.put(new Character((char)0x0D), new String(new char[] { ESCAPE_CHARACTER, 'r' }));
|
||||
ORIGINALS.put(new Character((char)0x1A), new String(new char[] { ESCAPE_CHARACTER, 'Z' }));
|
||||
ORIGINALS.put(new Character((char)0x22), new String(new char[] { ESCAPE_CHARACTER, '"' }));
|
||||
ORIGINALS.put(new Character((char)0x27), new String(new char[] { ESCAPE_CHARACTER, '\'' }));
|
||||
|
||||
REPLACEMENTS.put('0', new Character((char)0x00));
|
||||
REPLACEMENTS.put('n', new Character((char)0x0A));
|
||||
REPLACEMENTS.put('r', new Character((char)0x0D));
|
||||
REPLACEMENTS.put('Z', new Character((char)0x1A));
|
||||
REPLACEMENTS.put('"', new Character((char)0x22));
|
||||
REPLACEMENTS.put('\'', new Character((char)0x27));
|
||||
}
|
||||
|
||||
// http://www.joda.org/joda-time/key_format.html provides details on the
|
||||
// formatter token
|
||||
@ -420,42 +434,66 @@ public static Object[] toObjectArray(List<Object> list) {
|
||||
|
||||
// ************ TEXT Column Type utils*********
|
||||
|
||||
private static String getRegExp(char character) {
|
||||
return getRegExp(String.valueOf(character));
|
||||
}
|
||||
|
||||
private static String getRegExp(String string) {
|
||||
return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
|
||||
}
|
||||
|
||||
public static String toCSVString(String string) {
|
||||
int j = 0;
|
||||
String replacement = string;
|
||||
try {
|
||||
for (j = 0; j < replacements.length; j++) {
|
||||
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
|
||||
StringBuilder sb1 = new StringBuilder();
|
||||
StringBuilder sb2 = new StringBuilder();
|
||||
|
||||
// Escape the escape character
|
||||
for (int i = 0; i < string.length(); ++i) {
|
||||
char c = string.charAt(i);
|
||||
if (c == ESCAPE_CHARACTER) {
|
||||
sb1.append(ESCAPE_CHARACTER);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
|
||||
+ " " + String.valueOf(j) + " " + e.getMessage());
|
||||
|
||||
sb1.append(c);
|
||||
}
|
||||
return encloseWithQuotes(replacement);
|
||||
|
||||
// Encode characters
|
||||
for (char c : sb1.toString().toCharArray()) {
|
||||
if (ORIGINALS.containsKey(c)) {
|
||||
sb2.append(ORIGINALS.get(c));
|
||||
} else {
|
||||
sb2.append(c);
|
||||
}
|
||||
}
|
||||
|
||||
return encloseWithQuotes(sb2.toString());
|
||||
}
|
||||
|
||||
public static String toText(String string) {
|
||||
boolean escaped = false;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int i;
|
||||
|
||||
// Remove the trailing and starting quotes.
|
||||
string = removeQuotes(string);
|
||||
int j = 0;
|
||||
try {
|
||||
for (j = 0; j < replacements.length; j++) {
|
||||
string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
|
||||
|
||||
// Decode
|
||||
for (i = 0; i < string.length(); ++i) {
|
||||
char c = string.charAt(i);
|
||||
|
||||
if (escaped) {
|
||||
escaped = false;
|
||||
|
||||
if (REPLACEMENTS.containsKey(c)) {
|
||||
c = REPLACEMENTS.get(c);
|
||||
}
|
||||
|
||||
sb.append(c);
|
||||
} else {
|
||||
switch(c) {
|
||||
case ESCAPE_CHARACTER:
|
||||
escaped = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
sb.append(c);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " "
|
||||
+ String.valueOf(j) + e.getMessage());
|
||||
}
|
||||
|
||||
return string;
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
// ************ BINARY Column type utils*********
|
||||
@ -510,6 +548,85 @@ public static boolean isColumnStringType(Column stringType) {
|
||||
}
|
||||
|
||||
// ******* parse sqoop CSV ********
|
||||
|
||||
/**
|
||||
* Encode to the sqoop prescribed CSV String for every element in the object
|
||||
* array
|
||||
*
|
||||
* @param objectArray
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String toCSV(Object[] objectArray, Schema schema) {
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
|
||||
StringBuilder csvString = new StringBuilder();
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
if (objectArray[i] == null && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
if (objectArray[i] == null) {
|
||||
csvString.append(NULL_VALUE);
|
||||
} else {
|
||||
switch (columns[i].getType()) {
|
||||
case ARRAY:
|
||||
case SET:
|
||||
csvString.append(toCSVList((Object[]) objectArray[i], (AbstractComplexListType) columns[i]));
|
||||
break;
|
||||
case MAP:
|
||||
csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], columns[i]));
|
||||
break;
|
||||
case ENUM:
|
||||
case TEXT:
|
||||
csvString.append(toCSVString(objectArray[i].toString()));
|
||||
break;
|
||||
case BINARY:
|
||||
case UNKNOWN:
|
||||
csvString.append(toCSVByteArray((byte[]) objectArray[i]));
|
||||
break;
|
||||
case FIXED_POINT:
|
||||
csvString.append(toCSVFixedPoint(objectArray[i], columns[i]));
|
||||
break;
|
||||
case FLOATING_POINT:
|
||||
csvString.append(toCSVFloatingPoint(objectArray[i], columns[i]));
|
||||
break;
|
||||
case DECIMAL:
|
||||
csvString.append(toCSVDecimal(objectArray[i]));
|
||||
break;
|
||||
// stored in JSON as strings in the joda time format
|
||||
case DATE:
|
||||
csvString.append(toCSVDate(objectArray[i]));
|
||||
break;
|
||||
case TIME:
|
||||
csvString.append(toCSVTime(objectArray[i], columns[i]));
|
||||
break;
|
||||
case DATE_TIME:
|
||||
if (objectArray[i] instanceof org.joda.time.DateTime) {
|
||||
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
|
||||
// check for fraction and time zone and then use the right formatter
|
||||
csvString.append(toCSVDateTime(dateTime, columns[i]));
|
||||
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
|
||||
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
|
||||
csvString.append(toCSVLocalDateTime(localDateTime, columns[i]));
|
||||
}
|
||||
break;
|
||||
case BIT:
|
||||
csvString.append(toCSVBit(objectArray[i]));
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"Column type from schema was not recognized for " + columns[i].getType());
|
||||
}
|
||||
}
|
||||
if (i < columns.length - 1) {
|
||||
csvString.append(CSV_SEPARATOR_CHARACTER);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return csvString.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom CSV Text parser that honors quoting and escaped quotes.
|
||||
*
|
||||
@ -561,4 +678,87 @@ public static String[] parseCSVString(String csvText) {
|
||||
return parsedData.toArray(new String[parsedData.size()]);
|
||||
}
|
||||
|
||||
private static Object toObject(String csvString, Column column) {
|
||||
Object returnValue = null;
|
||||
|
||||
switch (column.getType()) {
|
||||
case ENUM:
|
||||
case TEXT:
|
||||
returnValue = toText(csvString);
|
||||
break;
|
||||
case BINARY:
|
||||
// Unknown is treated as a binary type
|
||||
case UNKNOWN:
|
||||
returnValue = toByteArray(csvString);
|
||||
break;
|
||||
case FIXED_POINT:
|
||||
returnValue = toFixedPoint(csvString, column);
|
||||
break;
|
||||
case FLOATING_POINT:
|
||||
returnValue = toFloatingPoint(csvString, column);
|
||||
break;
|
||||
case DECIMAL:
|
||||
returnValue = toDecimal(csvString, column);
|
||||
break;
|
||||
case DATE:
|
||||
returnValue = toDate(csvString, column);
|
||||
break;
|
||||
case TIME:
|
||||
returnValue = toTime(csvString, column);
|
||||
break;
|
||||
case DATE_TIME:
|
||||
returnValue = toDateTime(csvString, column);
|
||||
break;
|
||||
case BIT:
|
||||
returnValue = toBit(csvString);
|
||||
break;
|
||||
case ARRAY:
|
||||
case SET:
|
||||
returnValue = toList(csvString);
|
||||
break;
|
||||
case MAP:
|
||||
returnValue = toMap(csvString);
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
|
||||
"Column type from schema was not recognized for " + column.getType());
|
||||
}
|
||||
return returnValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse CSV text data
|
||||
* @param csvText csv text to parse
|
||||
* @param schema schema to understand data
|
||||
* @return Object[]
|
||||
*/
|
||||
public static Object[] fromCSV(String csvText, Schema schema) {
|
||||
String[] csvArray = parseCSVString(csvText);
|
||||
|
||||
if (csvArray == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
|
||||
if (csvArray.length != columns.length) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"The data " + csvArray + " has the wrong number of fields.");
|
||||
}
|
||||
|
||||
Object[] objectArray = new Object[csvArray.length];
|
||||
for (int i = 0; i < csvArray.length; i++) {
|
||||
if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
if (csvArray[i].equals(NULL_VALUE)) {
|
||||
objectArray[i] = null;
|
||||
continue;
|
||||
}
|
||||
objectArray[i] = toObject(csvArray[i], columns[i]);
|
||||
}
|
||||
|
||||
return objectArray;
|
||||
}
|
||||
}
|
||||
|
@ -18,21 +18,15 @@
|
||||
*/
|
||||
package org.apache.sqoop.connector.idf;
|
||||
|
||||
import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
|
||||
|
||||
import org.apache.sqoop.classification.InterfaceAudience;
|
||||
import org.apache.sqoop.classification.InterfaceStability;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.error.code.IntermediateDataFormatError;
|
||||
import org.apache.sqoop.connector.common.SqoopIDFUtils;
|
||||
import org.apache.sqoop.schema.Schema;
|
||||
import org.apache.sqoop.schema.type.AbstractComplexListType;
|
||||
import org.apache.sqoop.schema.type.Column;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -80,79 +74,7 @@ public void setCSVTextData(String csvText) {
|
||||
@Override
|
||||
public Object[] getObjectData() {
|
||||
super.validateSchema(schema);
|
||||
String[] csvStringArray = parseCSVString(this.data);
|
||||
|
||||
if (csvStringArray == null) {
|
||||
return null;
|
||||
}
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
|
||||
if (csvStringArray.length != columns.length) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"The data " + getCSVTextData() + " has the wrong number of fields.");
|
||||
}
|
||||
|
||||
Object[] objectArray = new Object[csvStringArray.length];
|
||||
for (int i = 0; i < csvStringArray.length; i++) {
|
||||
if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
if (csvStringArray[i].equals(NULL_VALUE)) {
|
||||
objectArray[i] = null;
|
||||
continue;
|
||||
}
|
||||
objectArray[i] = toObject(csvStringArray[i], columns[i]);
|
||||
}
|
||||
return objectArray;
|
||||
}
|
||||
|
||||
private Object toObject(String csvString, Column column) {
|
||||
Object returnValue = null;
|
||||
|
||||
switch (column.getType()) {
|
||||
case ENUM:
|
||||
case TEXT:
|
||||
returnValue = toText(csvString);
|
||||
break;
|
||||
case BINARY:
|
||||
// Unknown is treated as a binary type
|
||||
case UNKNOWN:
|
||||
returnValue = toByteArray(csvString);
|
||||
break;
|
||||
case FIXED_POINT:
|
||||
returnValue = toFixedPoint(csvString, column);
|
||||
break;
|
||||
case FLOATING_POINT:
|
||||
returnValue = toFloatingPoint(csvString, column);
|
||||
break;
|
||||
case DECIMAL:
|
||||
returnValue = toDecimal(csvString, column);
|
||||
break;
|
||||
case DATE:
|
||||
returnValue = toDate(csvString, column);
|
||||
break;
|
||||
case TIME:
|
||||
returnValue = toTime(csvString, column);
|
||||
break;
|
||||
case DATE_TIME:
|
||||
returnValue = toDateTime(csvString, column);
|
||||
break;
|
||||
case BIT:
|
||||
returnValue = toBit(csvString);
|
||||
break;
|
||||
case ARRAY:
|
||||
case SET:
|
||||
returnValue = toList(csvString);
|
||||
break;
|
||||
case MAP:
|
||||
returnValue = toMap(csvString);
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
|
||||
"Column type from schema was not recognized for " + column.getType());
|
||||
}
|
||||
return returnValue;
|
||||
return SqoopIDFUtils.fromCSV(data, schema);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -191,75 +113,7 @@ public void read(DataInput in) throws IOException {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private String toCSV(Object[] objectArray) {
|
||||
|
||||
Column[] columns = schema.getColumnsArray();
|
||||
|
||||
StringBuilder csvString = new StringBuilder();
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
if (objectArray[i] == null && !columns[i].isNullable()) {
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
|
||||
columns[i].getName() + " does not support null values");
|
||||
}
|
||||
if (objectArray[i] == null) {
|
||||
csvString.append(NULL_VALUE);
|
||||
} else {
|
||||
switch (columns[i].getType()) {
|
||||
case ARRAY:
|
||||
case SET:
|
||||
csvString.append(toCSVList((Object[]) objectArray[i], (AbstractComplexListType) columns[i]));
|
||||
break;
|
||||
case MAP:
|
||||
csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], columns[i]));
|
||||
break;
|
||||
case ENUM:
|
||||
case TEXT:
|
||||
csvString.append(toCSVString(objectArray[i].toString()));
|
||||
break;
|
||||
case BINARY:
|
||||
case UNKNOWN:
|
||||
csvString.append(toCSVByteArray((byte[]) objectArray[i]));
|
||||
break;
|
||||
case FIXED_POINT:
|
||||
csvString.append(toCSVFixedPoint(objectArray[i], columns[i]));
|
||||
break;
|
||||
case FLOATING_POINT:
|
||||
csvString.append(toCSVFloatingPoint(objectArray[i], columns[i]));
|
||||
break;
|
||||
case DECIMAL:
|
||||
csvString.append(toCSVDecimal(objectArray[i]));
|
||||
break;
|
||||
// stored in JSON as strings in the joda time format
|
||||
case DATE:
|
||||
csvString.append(toCSVDate(objectArray[i]));
|
||||
break;
|
||||
case TIME:
|
||||
csvString.append(toCSVTime(objectArray[i], columns[i]));
|
||||
break;
|
||||
case DATE_TIME:
|
||||
if (objectArray[i] instanceof org.joda.time.DateTime) {
|
||||
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
|
||||
// check for fraction and time zone and then use the right formatter
|
||||
csvString.append(toCSVDateTime(dateTime, columns[i]));
|
||||
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
|
||||
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
|
||||
csvString.append(toCSVLocalDateTime(localDateTime, columns[i]));
|
||||
}
|
||||
break;
|
||||
case BIT:
|
||||
csvString.append(toCSVBit(objectArray[i]));
|
||||
break;
|
||||
default:
|
||||
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
|
||||
"Column type from schema was not recognized for " + columns[i].getType());
|
||||
}
|
||||
}
|
||||
if (i < columns.length - 1) {
|
||||
csvString.append(CSV_SEPARATOR_CHARACTER);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return csvString.toString();
|
||||
return SqoopIDFUtils.toCSV(objectArray, schema);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -320,4 +320,41 @@ public void testToDecimaPointNoScaleNoPrecisionReturnsDecimal() {
|
||||
assertEquals("23.44444444", toCSVDecimal(bd));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEscaping() {
|
||||
String[][] testData = new String[][]{
|
||||
{"\0", "'\\0'"},
|
||||
{"\\0", "'\\\\0'"},
|
||||
{"\\\0", "'\\\\\\0'"},
|
||||
{"\\\\0", "'\\\\\\\\0'"},
|
||||
{"\n", "'\\n'"},
|
||||
{"\\n", "'\\\\n'"},
|
||||
{"\\\n", "'\\\\\\n'"},
|
||||
{"\\\\n", "'\\\\\\\\n'"},
|
||||
{"\r", "'\\r'"},
|
||||
{"\\r", "'\\\\r'"},
|
||||
{"\\\r", "'\\\\\\r'"},
|
||||
{"\\\\r", "'\\\\\\\\r'"},
|
||||
{Character.toString((char)0x1A), "'\\Z'"},
|
||||
{"\\Z", "'\\\\Z'"},
|
||||
{"\\" + Character.toString((char)0x1A), "'\\\\\\Z'"},
|
||||
{"\\\\Z", "'\\\\\\\\Z'"},
|
||||
{"\"", "'\\\"'"},
|
||||
{"\\\"", "'\\\\\\\"'"},
|
||||
{"\\\\\"", "'\\\\\\\\\\\"'"},
|
||||
{"\\\\\\\"", "'\\\\\\\\\\\\\\\"'"},
|
||||
{"'", "'\\''"},
|
||||
{"\\'", "'\\\\\\''"},
|
||||
{"\\\\'", "'\\\\\\\\\\''"},
|
||||
{"\\\\\\'", "'\\\\\\\\\\\\\\''"}
|
||||
};
|
||||
|
||||
for (String[] testDatum : testData) {
|
||||
String csvData = SqoopIDFUtils.toCSVString(testDatum[0]);
|
||||
|
||||
assertEquals(csvData, testDatum[1]);
|
||||
|
||||
assertEquals(SqoopIDFUtils.toText(csvData), testDatum[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.sqoop.test.data;
|
||||
|
||||
import org.apache.sqoop.common.test.db.DatabaseProvider;
|
||||
|
||||
/**
|
||||
* Releases of Ubuntu Linux.
|
||||
*
|
||||
* Purpose of this set is to cover most common data types (varchar, int, numeric, date, boolean).
|
||||
*/
|
||||
public class ShortStories extends DataSet {
|
||||
|
||||
public ShortStories(DatabaseProvider provider, String tableBaseName) {
|
||||
super(provider, tableBaseName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSet createTables() {
|
||||
provider.createTable(
|
||||
tableBaseName,
|
||||
"id",
|
||||
"id", "int",
|
||||
"name", "varchar(64)",
|
||||
"story", "varchar(10000)"
|
||||
);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSet loadBasicData() {
|
||||
provider.insertRow(tableBaseName, 1, "The Gift of the Magi",
|
||||
"ONE DOLLAR AND EIGHTY-SEVEN CENTS. THAT WAS ALL. AND SIXTY CENTS of it was in pennies. Pennies saved one and two at a time by bulldozing the grocer and the vegetable man and the butcher until ones cheeks burned with the silent imputation of parsimony that such close dealing implied. Three times Della counted it. One dollar and eighty-seven cents. And the next day would be Christmas.\n" +
|
||||
"\n" +
|
||||
"There was clearly nothing left to do but flop down on the shabby little couch and howl. So Della did it. Which instigates the moral reflection that life is made up of sobs, sniffles, and smiles, with sniffles predominating.");
|
||||
provider.insertRow(tableBaseName, 2, "The Little Match Girl",
|
||||
"Most terribly cold it was; it snowed, and was nearly quite dark, and evening-- the last evening of the year. In this cold and darkness there went along the street a poor little girl, bareheaded, and with naked feet. When she left home she had slippers on, it is true; but what was the good of that? They were very large slippers, which her mother had hitherto worn; so large were they; and the poor little thing lost them as she scuffled away across the street, because of two carriages that rolled by dreadfully fast.");
|
||||
provider.insertRow(tableBaseName, 3, "To Build a Fire",
|
||||
"Day had broken cold and grey, exceedingly cold and grey, when the man turned aside from the main Yukon trail and climbed the high earth- bank, where a dim and little-travelled trail led eastward through the fat spruce timberland. It was a steep bank, and he paused for breath at the top, excusing the act to himself by looking at his watch. It was nine oclock. There was no sun nor hint of sun, though there was not a cloud in the sky. It was a clear day, and yet there seemed an intangible pall over the face of things, a subtle gloom that made the day dark, and that was due to the absence of sun. This fact did not worry the man. He was used to the lack of sun. It had been days since he had seen the sun, and he knew that a few more days must pass before that cheerful orb, due south, would just peep above the sky- line and dip immediately from view.");
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
@ -37,6 +37,7 @@
|
||||
import org.apache.sqoop.model.MSubmission;
|
||||
import org.apache.sqoop.submission.SubmissionStatus;
|
||||
import org.apache.sqoop.test.data.Cities;
|
||||
import org.apache.sqoop.test.data.ShortStories;
|
||||
import org.apache.sqoop.test.data.UbuntuReleases;
|
||||
import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner;
|
||||
import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
|
||||
@ -197,6 +198,20 @@ protected void createAndLoadTableUbuntuReleases() {
|
||||
new UbuntuReleases(provider, getTableName()).createTables().loadBasicData();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table for short stories.
|
||||
*/
|
||||
protected void createTableShortStories() {
|
||||
new ShortStories(provider, getTableName()).createTables();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table for short stories.
|
||||
*/
|
||||
protected void createAndLoadTableShortStories() {
|
||||
new ShortStories(provider, getTableName()).createTables().loadBasicData();
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert row in testing table.
|
||||
*
|
||||
|
@ -23,6 +23,9 @@
|
||||
import org.apache.sqoop.model.MLink;
|
||||
import org.apache.sqoop.model.MConfigList;
|
||||
import org.apache.sqoop.model.MJob;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.testng.AssertJUnit.assertEquals;
|
||||
@ -31,15 +34,23 @@
|
||||
*
|
||||
*/
|
||||
public class FromHDFSToRDBMSTest extends ConnectorTestCase {
|
||||
@BeforeMethod(alwaysRun = true)
|
||||
public void createTable() {
|
||||
createTableCities();
|
||||
}
|
||||
|
||||
@AfterMethod(alwaysRun = true)
|
||||
public void dropTable() {
|
||||
super.dropTable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasic() throws Exception {
|
||||
createTableCities();
|
||||
createFromFile("input-0001",
|
||||
"1,'USA','2004-10-23','San Francisco'",
|
||||
"2,'USA','2004-10-24','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25','Brno'",
|
||||
"4,'USA','2004-10-26','Palo Alto'"
|
||||
"1,'USA','2004-10-23','San Francisco'",
|
||||
"2,'USA','2004-10-24','Sunnyvale'",
|
||||
"3,'Czech Republic','2004-10-25','Brno'",
|
||||
"4,'USA','2004-10-26','Palo Alto'"
|
||||
);
|
||||
|
||||
// RDBMS link
|
||||
@ -68,14 +79,10 @@ public void testBasic() throws Exception {
|
||||
|
||||
executeJob(job);
|
||||
|
||||
assertEquals(4L, provider.rowCount(null, getTableName()));
|
||||
assertEquals(4L, provider.rowCount(getTableName()));
|
||||
assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
|
||||
assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
|
||||
assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
|
||||
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
|
||||
|
||||
// Clean up testing table
|
||||
dropTable();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -35,7 +35,7 @@
|
||||
public class FromRDBMSToHDFSTest extends ConnectorTestCase {
|
||||
|
||||
@Test
|
||||
public void testBasic() throws Exception {
|
||||
public void testCities() throws Exception {
|
||||
createAndLoadTableCities();
|
||||
|
||||
// RDBMS link
|
||||
@ -77,6 +77,51 @@ public void testBasic() throws Exception {
|
||||
dropTable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStories() throws Exception {
|
||||
createAndLoadTableShortStories();
|
||||
|
||||
// RDBMS link
|
||||
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
|
||||
fillRdbmsLinkConfig(rdbmsLink);
|
||||
saveLink(rdbmsLink);
|
||||
|
||||
// HDFS link
|
||||
MLink hdfsLink = getClient().createLink("hdfs-connector");
|
||||
saveLink(hdfsLink);
|
||||
|
||||
// Job creation
|
||||
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
|
||||
|
||||
// Connector values
|
||||
MConfigList configs = job.getJobConfig(Direction.FROM);
|
||||
configs.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
|
||||
configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
|
||||
configs.getStringInput("fromJobConfig.columns").setValue(provider.escapeColumnName("id") + "," + provider.escapeColumnName("name") + "," + provider.escapeColumnName("story"));
|
||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||
saveJob(job);
|
||||
|
||||
MSubmission submission = getClient().startJob(job.getPersistenceId());
|
||||
assertTrue(submission.getStatus().isRunning());
|
||||
|
||||
// Wait until the job finish - this active waiting will be removed once
|
||||
// Sqoop client API will get blocking support.
|
||||
do {
|
||||
Thread.sleep(5000);
|
||||
submission = getClient().getJobStatus(job.getPersistenceId());
|
||||
} while(submission.getStatus().isRunning());
|
||||
|
||||
// Assert correct output
|
||||
assertTo(
|
||||
"1,'The Gift of the Magi','ONE DOLLAR AND EIGHTY-SEVEN CENTS. THAT WAS ALL. AND SIXTY CENTS of it was in pennies. Pennies saved one and two at a time by bulldozing the grocer and the vegetable man and the butcher until ones cheeks burned with the silent imputation of parsimony that such close dealing implied. Three times Della counted it. One dollar and eighty-seven cents. And the next day would be Christmas.\\n\\nThere was clearly nothing left to do but flop down on the shabby little couch and howl. So Della did it. Which instigates the moral reflection that life is made up of sobs, sniffles, and smiles, with sniffles predominating.'",
|
||||
"2,'The Little Match Girl','Most terribly cold it was; it snowed, and was nearly quite dark, and evening-- the last evening of the year. In this cold and darkness there went along the street a poor little girl, bareheaded, and with naked feet. When she left home she had slippers on, it is true; but what was the good of that? They were very large slippers, which her mother had hitherto worn; so large were they; and the poor little thing lost them as she scuffled away across the street, because of two carriages that rolled by dreadfully fast.'",
|
||||
"3,'To Build a Fire','Day had broken cold and grey, exceedingly cold and grey, when the man turned aside from the main Yukon trail and climbed the high earth- bank, where a dim and little-travelled trail led eastward through the fat spruce timberland. It was a steep bank, and he paused for breath at the top, excusing the act to himself by looking at his watch. It was nine oclock. There was no sun nor hint of sun, though there was not a cloud in the sky. It was a clear day, and yet there seemed an intangible pall over the face of things, a subtle gloom that made the day dark, and that was due to the absence of sun. This fact did not worry the man. He was used to the lack of sun. It had been days since he had seen the sun, and he knew that a few more days must pass before that cheerful orb, due south, would just peep above the sky- line and dip immediately from view.'"
|
||||
);
|
||||
|
||||
// Clean up testing table
|
||||
dropTable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumns() throws Exception {
|
||||
createAndLoadTableCities();
|
||||
|
Loading…
Reference in New Issue
Block a user