5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 23:12:40 +08:00

SQOOP-1348: Sqoop2: Remove Data class

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-10 13:18:38 -08:00
parent 49a7431ba3
commit ace222374b
11 changed files with 2032 additions and 820 deletions

1844
SQOOP-1348.patch Normal file

File diff suppressed because it is too large Load Diff

View File

@ -122,12 +122,7 @@ public List<Column> getColumns() {
}
public boolean isEmpty() {
if (columns.size()==0) {
return true;
} else {
return false;
}
return columns.size() == 0;
}
public String toString() {

View File

@ -67,9 +67,15 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
private Schema schema;
public CSVIntermediateDataFormat() {
}
public CSVIntermediateDataFormat(Schema schema) {
setSchema(schema);
}
/**
* {@inheritDoc}
*/
@ -166,7 +172,7 @@ private String[] getFields() {
*/
@Override
public Object[] getObjectData() {
if (schema.isEmpty()) {
if (schema == null || schema.isEmpty()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
}

View File

@ -41,11 +41,11 @@ public class TestCSVIntermediateDataFormat {
private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
private IntermediateDataFormat<?> data;
private IntermediateDataFormat<?> dataFormat;
@Before
public void setUp() {
data = new CSVIntermediateDataFormat();
dataFormat = new CSVIntermediateDataFormat();
}
private String getByteFieldString(byte[] byteFieldData) {
@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) {
public void testStringInStringOut() {
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'" + String.valueOf(0x0A) + "'";
data.setTextData(testData);
assertEquals(testData, data.getTextData());
dataFormat.setTextData(testData);
assertEquals(testData, dataFormat.getTextData());
}
@Test
@ -74,10 +74,10 @@ public void testNullStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData(null);
dataFormat.setSchema(schema);
dataFormat.setTextData(null);
Object[] out = data.getObjectData();
Object[] out = dataFormat.getObjectData();
assertNull(out);
}
@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData("");
dataFormat.setSchema(schema);
dataFormat.setTextData("");
data.getObjectData();
dataFormat.getObjectData();
}
@Test
@ -111,10 +111,10 @@ public void testStringInObjectOut() {
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setTextData(testData);
dataFormat.setSchema(schema);
dataFormat.setTextData(testData);
Object[] out = data.getObjectData();
Object[] out = dataFormat.getObjectData();
assertEquals(new Long(10),out[0]);
assertEquals(new Long(34),out[1]);
@ -134,7 +134,7 @@ public void testObjectInStringOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
dataFormat.setSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
@ -145,12 +145,12 @@ public void testObjectInStringOut() {
in[4] = byteFieldData;
in[5] = new String(new char[] { 0x0A });
data.setObjectData(in);
dataFormat.setObjectData(in);
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,'54','random data'," +
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
assertEquals(testData, data.getTextData());
assertEquals(testData, dataFormat.getTextData());
}
@Test
@ -164,7 +164,7 @@ public void testObjectInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
dataFormat.setSchema(schema);
Object[] in = new Object[6];
in[0] = new Long(10);
@ -177,9 +177,9 @@ public void testObjectInObjectOut() {
System.arraycopy(in,0,inCopy,0,in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
dataFormat.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
}
@Test
@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
dataFormat.setSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() {
in[4] = byteFieldData;
in[5] = new String(new char[] { 0x0A });
data.setObjectData(in);
dataFormat.setObjectData(in);
//byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
String testData = "10,34,NULL,'random data'," +
getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
assertEquals(testData, data.getTextData());
assertEquals(testData, dataFormat.getTextData());
}
@Test
@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() {
System.arraycopy(in, 0, inCopy, 0, in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
dataFormat.setObjectData(in);
assertEquals(strData, data.getObjectData()[0]);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
assertEquals(strData, dataFormat.getObjectData()[0]);
assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
}
@Test
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
byte[] allCharByteArr = new byte[256];
for (int i = 0; i < allCharByteArr.length; ++i) {
@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() {
System.arraycopy(in, 0, inCopy, 0, in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
dataFormat.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
}
@Test
public void testDate() {
Schema schema = new Schema("test");
schema.addColumn(new Date("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
data.setTextData("2014-10-01");
assertEquals("2014-10-01", data.getObjectData()[0].toString());
dataFormat.setTextData("2014-10-01");
assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString());
}
@Test
public void testDateTime() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
for (String dateTime : new String[]{
"2014-10-01T12:00:00",
"2014-10-01T12:00:00.000"
}) {
data.setTextData(dateTime);
assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
dataFormat.setTextData(dateTime);
assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
}
}
@ -289,14 +289,14 @@ public void testDateTime() {
public void testDateTimeISO8601Alternative() {
Schema schema = new Schema("test");
schema.addColumn(new DateTime("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
for (String dateTime : new String[]{
"2014-10-01 12:00:00",
"2014-10-01 12:00:00.000"
}) {
data.setTextData(dateTime);
assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
dataFormat.setTextData(dateTime);
assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
}
}
@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() {
public void testBit() {
Schema schema = new Schema("test");
schema.addColumn(new Bit("1"));
data.setSchema(schema);
dataFormat.setSchema(schema);
for (String trueBit : new String[]{
"true", "TRUE", "1"
}) {
data.setTextData(trueBit);
assertTrue((Boolean) data.getObjectData()[0]);
dataFormat.setTextData(trueBit);
assertTrue((Boolean) dataFormat.getObjectData()[0]);
}
for (String falseBit : new String[]{
"false", "FALSE", "0"
}) {
data.setTextData(falseBit);
assertFalse((Boolean) data.getObjectData()[0]);
dataFormat.setTextData(falseBit);
assertFalse((Boolean) dataFormat.getObjectData()[0]);
}
}
@ -326,9 +326,23 @@ public void testEmptySchema() {
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'\\n'";
Schema schema = new Schema("Test");
data.setSchema(schema);
data.setTextData(testData);
dataFormat.setSchema(schema);
dataFormat.setTextData(testData);
Object[] out = data.getObjectData();
@SuppressWarnings("unused")
Object[] out = dataFormat.getObjectData();
}
@Test(expected = SqoopException.class)
public void testNullSchema() {
dataFormat.setSchema(null);
@SuppressWarnings("unused")
Object[] out = dataFormat.getObjectData();
}
@Test(expected = SqoopException.class)
public void testNotSettingSchema() {
@SuppressWarnings("unused")
Object[] out = dataFormat.getObjectData();
}
}

View File

@ -1,529 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.regex.Matcher;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MRExecutionError;
public class Data implements WritableComparable<Data> {
// The content is an Object to accommodate different kinds of data.
// For example, it can be:
// - Object[] for an array of object record
// - String for a text of CSV record
private volatile Object content = null;
public static final int EMPTY_DATA = 0;
public static final int CSV_RECORD = 1;
public static final int ARRAY_RECORD = 2;
private int type = EMPTY_DATA;
public static final String CHARSET_NAME = "UTF-8";
public static final char DEFAULT_RECORD_DELIMITER = '\n';
public static final char DEFAULT_FIELD_DELIMITER = ',';
public static final char DEFAULT_STRING_DELIMITER = '\'';
public static final char DEFAULT_STRING_ESCAPE = '\\';
private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
private char stringDelimiter = DEFAULT_STRING_DELIMITER;
private char stringEscape = DEFAULT_STRING_ESCAPE;
private String escapedStringDelimiter = String.valueOf(new char[] {
stringEscape, stringDelimiter
});
private int[] fieldTypes = null;
public void setFieldDelimiter(char fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
}
public void setFieldTypes(int[] fieldTypes) {
this.fieldTypes = fieldTypes;
}
public void setContent(Object content, int type) {
switch (type) {
case EMPTY_DATA:
case CSV_RECORD:
case ARRAY_RECORD:
this.type = type;
this.content = content;
break;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
public Object getContent(int targetType) {
switch (targetType) {
case CSV_RECORD:
return format();
case ARRAY_RECORD:
return parse();
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
}
}
public int getType() {
return type;
}
public boolean isEmpty() {
return (type == EMPTY_DATA);
}
@Override
public String toString() {
return (String)getContent(CSV_RECORD);
}
@Override
public int compareTo(Data other) {
byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
byte[] otherBytes = other.toString().getBytes(
Charset.forName(CHARSET_NAME));
return WritableComparator.compareBytes(
myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof Data)) {
return false;
}
Data data = (Data)other;
if (type != data.getType()) {
return false;
}
return toString().equals(data.toString());
}
@Override
public int hashCode() {
int result = super.hashCode();
switch (type) {
case CSV_RECORD:
result += 31 * content.hashCode();
return result;
case ARRAY_RECORD:
Object[] array = (Object[])content;
for (int i = 0; i < array.length; i++) {
result += 31 * array[i].hashCode();
}
return result;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@Override
public void readFields(DataInput in) throws IOException {
type = readType(in);
switch (type) {
case CSV_RECORD:
readCsv(in);
break;
case ARRAY_RECORD:
readArray(in);
break;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
@Override
public void write(DataOutput out) throws IOException {
writeType(out, type);
switch (type) {
case CSV_RECORD:
writeCsv(out);
break;
case ARRAY_RECORD:
writeArray(out);
break;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
private int readType(DataInput in) throws IOException {
return WritableUtils.readVInt(in);
}
private void writeType(DataOutput out, int type) throws IOException {
WritableUtils.writeVInt(out, type);
}
private void readCsv(DataInput in) throws IOException {
content = in.readUTF();
}
private void writeCsv(DataOutput out) throws IOException {
out.writeUTF((String)content);
}
private void readArray(DataInput in) throws IOException {
// read number of columns
int columns = in.readInt();
content = new Object[columns];
Object[] array = (Object[])content;
// read each column
for (int i = 0; i < array.length; i++) {
int type = readType(in);
switch (type) {
case FieldTypes.UTF:
array[i] = in.readUTF();
break;
case FieldTypes.BIN:
int length = in.readInt();
byte[] bytes = new byte[length];
in.readFully(bytes);
array[i] = bytes;
break;
case FieldTypes.DOUBLE:
array[i] = in.readDouble();
break;
case FieldTypes.FLOAT:
array[i] = in.readFloat();
break;
case FieldTypes.LONG:
array[i] = in.readLong();
break;
case FieldTypes.INT:
array[i] = in.readInt();
break;
case FieldTypes.SHORT:
array[i] = in.readShort();
break;
case FieldTypes.CHAR:
array[i] = in.readChar();
break;
case FieldTypes.BYTE:
array[i] = in.readByte();
break;
case FieldTypes.BOOLEAN:
array[i] = in.readBoolean();
break;
case FieldTypes.NULL:
array[i] = null;
break;
default:
throw new IOException(
new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
);
}
}
}
private void writeArray(DataOutput out) throws IOException {
Object[] array = (Object[])content;
// write number of columns
out.writeInt(array.length);
// write each column
for (int i = 0; i < array.length; i++) {
if (array[i] instanceof String) {
writeType(out, FieldTypes.UTF);
out.writeUTF((String)array[i]);
} else if (array[i] instanceof byte[]) {
writeType(out, FieldTypes.BIN);
out.writeInt(((byte[])array[i]).length);
out.write((byte[])array[i]);
} else if (array[i] instanceof Double) {
writeType(out, FieldTypes.DOUBLE);
out.writeDouble((Double)array[i]);
} else if (array[i] instanceof Float) {
writeType(out, FieldTypes.FLOAT);
out.writeFloat((Float)array[i]);
} else if (array[i] instanceof Long) {
writeType(out, FieldTypes.LONG);
out.writeLong((Long)array[i]);
} else if (array[i] instanceof Integer) {
writeType(out, FieldTypes.INT);
out.writeInt((Integer)array[i]);
} else if (array[i] instanceof Short) {
writeType(out, FieldTypes.SHORT);
out.writeShort((Short)array[i]);
} else if (array[i] instanceof Character) {
writeType(out, FieldTypes.CHAR);
out.writeChar((Character)array[i]);
} else if (array[i] instanceof Byte) {
writeType(out, FieldTypes.BYTE);
out.writeByte((Byte)array[i]);
} else if (array[i] instanceof Boolean) {
writeType(out, FieldTypes.BOOLEAN);
out.writeBoolean((Boolean)array[i]);
} else if (array[i] == null) {
writeType(out, FieldTypes.NULL);
} else {
throw new IOException(
new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
array[i].getClass().getName()
)
);
}
}
}
private String format() {
switch (type) {
case EMPTY_DATA:
return null;
case CSV_RECORD:
if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
return (String)content;
} else {
// TODO: need to exclude the case where comma is part of a string.
return ((String)content).replaceAll(
String.valueOf(DEFAULT_FIELD_DELIMITER),
String.valueOf(fieldDelimiter));
}
case ARRAY_RECORD:
StringBuilder sb = new StringBuilder();
Object[] array = (Object[])content;
for (int i = 0; i < array.length; i++) {
if (i != 0) {
sb.append(fieldDelimiter);
}
if (array[i] instanceof String) {
sb.append(stringDelimiter);
sb.append(escape((String)array[i]));
sb.append(stringDelimiter);
} else if (array[i] instanceof byte[]) {
sb.append(Arrays.toString((byte[])array[i]));
} else {
sb.append(String.valueOf(array[i]));
}
}
return sb.toString();
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
private Object[] parse() {
switch (type) {
case EMPTY_DATA:
return null;
case CSV_RECORD:
ArrayList<Object> list = new ArrayList<Object>();
char[] record = ((String)content).toCharArray();
int start = 0;
int position = start;
boolean stringDelimited = false;
boolean arrayDelimited = false;
int index = 0;
while (position < record.length) {
if (record[position] == fieldDelimiter) {
if (!stringDelimited && !arrayDelimited) {
index = parseField(list, record, start, position, index);
start = position + 1;
}
} else if (record[position] == stringDelimiter) {
if (!stringDelimited) {
stringDelimited = true;
}
else if (position > 0 && record[position-1] != stringEscape) {
stringDelimited = false;
}
} else if (record[position] == '[') {
if (!stringDelimited) {
arrayDelimited = true;
}
} else if (record[position] == ']') {
if (!stringDelimited) {
arrayDelimited = false;
}
}
position++;
}
parseField(list, record, start, position, index);
return list.toArray();
case ARRAY_RECORD:
return (Object[])content;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
}
}
private int parseField(ArrayList<Object> list, char[] record,
int start, int end, int index) {
String field = String.valueOf(record, start, end-start).trim();
int fieldType;
if (fieldTypes == null) {
fieldType = guessType(field);
} else {
fieldType = fieldTypes[index];
}
switch (fieldType) {
case FieldTypes.UTF:
if (field.charAt(0) != stringDelimiter ||
field.charAt(field.length()-1) != stringDelimiter) {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
list.add(index, unescape(field.substring(1, field.length()-1)));
break;
case FieldTypes.BIN:
if (field.charAt(0) != '[' ||
field.charAt(field.length()-1) != ']') {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
}
String[] splits =
field.substring(1, field.length()-1).split(String.valueOf(','));
byte[] bytes = new byte[splits.length];
for (int i=0; i<bytes.length; i++) {
bytes[i] = Byte.parseByte(splits[i].trim());
}
list.add(index, bytes);
break;
case FieldTypes.DOUBLE:
list.add(index, Double.parseDouble(field));
break;
case FieldTypes.FLOAT:
list.add(index, Float.parseFloat(field));
break;
case FieldTypes.LONG:
list.add(index, Long.parseLong(field));
break;
case FieldTypes.INT:
list.add(index, Integer.parseInt(field));
break;
case FieldTypes.SHORT:
list.add(index, Short.parseShort(field));
break;
case FieldTypes.CHAR:
list.add(index, Character.valueOf(field.charAt(0)));
break;
case FieldTypes.BYTE:
list.add(index, Byte.parseByte(field));
break;
case FieldTypes.BOOLEAN:
list.add(index, Boolean.parseBoolean(field));
break;
case FieldTypes.NULL:
list.add(index, null);
break;
default:
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
}
return ++index;
}
private int guessType(String field) {
char[] value = field.toCharArray();
if (value[0] == stringDelimiter) {
return FieldTypes.UTF;
}
switch (value[0]) {
case 'n':
case 'N':
return FieldTypes.NULL;
case '[':
return FieldTypes.BIN;
case 't':
case 'f':
case 'T':
case 'F':
return FieldTypes.BOOLEAN;
}
int position = 1;
while (position < value.length) {
switch (value[position++]) {
case '.':
return FieldTypes.DOUBLE;
}
}
return FieldTypes.LONG;
}
private String escape(String string) {
// TODO: Also need to escape those special characters as documented in:
// https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
String regex = String.valueOf(stringDelimiter);
String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
return string.replaceAll(regex, replacement);
}
private String unescape(String string) {
// TODO: Also need to unescape those special characters as documented in:
// https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
String regex = Matcher.quoteReplacement(escapedStringDelimiter);
String replacement = String.valueOf(stringDelimiter);
return string.replaceAll(regex, replacement);
}
}

View File

@ -37,6 +37,7 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Extractor;
@ -46,17 +47,13 @@
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
import org.apache.sqoop.job.mr.SqoopSplit;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.job.util.MRJobTestUtil;
import org.junit.Assert;
import org.junit.Test;
@ -67,11 +64,10 @@ public class TestMapReduce {
private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
@Test
public void testInputFormat() throws Exception {
public void testSqoopInputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
SqoopInputFormat inputformat = new SqoopInputFormat();
@ -79,51 +75,47 @@ public void testInputFormat() throws Exception {
assertEquals(9, splits.size());
for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
SqoopSplit split = (SqoopSplit)splits.get(id-1);
DummyPartition partition = (DummyPartition)split.getPartition();
SqoopSplit split = (SqoopSplit) splits.get(id - 1);
DummyPartition partition = (DummyPartition) split.getPartition();
assertEquals(id, partition.getId());
}
}
@Test
public void testMapper() throws Exception {
public void testSqoopMapper() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
// from and to have the same schema in this test case
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
SqoopInputFormat.class,
SqoopMapper.class,
DummyOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
}
@Test
public void testOutputFormat() throws Exception {
public void testNullOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
// from and to have the same schema in this test case
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
SqoopInputFormat.class,
SqoopMapper.class,
SqoopNullOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
// Make sure both destroyers get called.
@ -171,15 +163,17 @@ public List<Partition> getPartitions(PartitionerContext context, Object oc, Obje
}
}
public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
public static class DummyExtractor extends
Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
@Override
public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) {
int id = ((DummyPartition)partition).getId();
public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj,
DummyPartition partition) {
int id = ((DummyPartition) partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
context.getDataWriter().writeArrayRecord(new Object[] {
id * NUMBER_OF_ROWS_PER_PARTITION + row,
(double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
context.getDataWriter().writeArrayRecord(
new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row,
(double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) });
}
}
@ -189,16 +183,14 @@ public long getRowsRead() {
}
}
public static class DummyOutputFormat
extends OutputFormat<SqoopWritable, NullWritable> {
public static class DummyOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
@Override
public void checkOutputSpecs(JobContext context) {
// do nothing
}
@Override
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) {
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) {
return new DummyRecordWriter();
}
@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new DummyOutputCommitter();
}
public static class DummyRecordWriter
extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data();
public static class DummyRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
// should I use a dummy IDF for testing?
private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
@Override
public void write(SqoopWritable key, NullWritable value) {
data.setContent(new Object[] {
index,
(double) index,
String.valueOf(index)},
Data.ARRAY_RECORD);
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
dataFormat.setTextData(testData);
index++;
assertEquals(data.toString(), key.toString());
assertEquals(dataFormat.getTextData().toString(), key.toString());
}
@Override
@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) {
public static class DummyOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext jobContext) { }
public void setupJob(JobContext jobContext) {
}
@Override
public void setupTask(TaskAttemptContext taskContext) { }
public void setupTask(TaskAttemptContext taskContext) {
}
@Override
public void commitTask(TaskAttemptContext taskContext) { }
public void commitTask(TaskAttemptContext taskContext) {
}
@Override
public void abortTask(TaskAttemptContext taskContext) { }
public void abortTask(TaskAttemptContext taskContext) {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
}
}
// it is writing to the target.
public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data expected = new Data();
private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
@Override
public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{
public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj)
throws Exception {
String data;
while ((data = context.getDataReader().readTextRecord()) != null) {
expected.setContent(new Object[] {
index,
(double) index,
String.valueOf(index)},
Data.ARRAY_RECORD);
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
dataFormat.setTextData(testData);
index++;
assertEquals(expected.toString(), data);
assertEquals(dataFormat.getTextData().toString(), data);
}
}
}
public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
public static int count = 0;
@Override
public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
count++;
}
}
public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> {
public static class DummyToDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
public static int count = 0;
@Override
public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
count++;

View File

@ -38,16 +38,17 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.util.MRJobTestUtil;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
@ -121,6 +122,7 @@ public static Collection<Object[]> data() {
return parameters;
}
@SuppressWarnings("deprecation")
@Test
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception {
Job job = new Job(conf);
MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
boolean success = JobUtils.runJob(job.getConfiguration(),
boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
if (from.getName().split("-")[1].equals("EMPTY")) {
@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
public static class DummyRecordWriter
extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data();
private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
@Override
public void write(SqoopWritable key, NullWritable value) {
data.setContent(new Object[] {
index,
(double) index,
String.valueOf(index)},
Data.ARRAY_RECORD);
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
dataFormat.setTextData(testData);
index++;
assertEquals(data.toString(), key.toString());
assertEquals(dataFormat.getTextData().toString(), key.toString());
}
@Override

View File

@ -1,117 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.io;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
public class TestData {
private static final double TEST_NUMBER = Math.PI + 100;
@Test
public void testArrayToCsv() throws Exception {
Data data = new Data();
String expected;
String actual;
// with special characters:
expected =
Long.valueOf((long)TEST_NUMBER) + "," +
Double.valueOf(TEST_NUMBER) + "," +
"'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
Arrays.toString(new byte[] {1, 2, 3, 4, 5});
data.setContent(new Object[] {
Long.valueOf((long)TEST_NUMBER),
Double.valueOf(TEST_NUMBER),
String.valueOf(TEST_NUMBER) + "',s",
new byte[] {1, 2, 3, 4, 5} },
Data.ARRAY_RECORD);
actual = (String)data.getContent(Data.CSV_RECORD);
assertEquals(expected, actual);
// with null characters:
expected =
Long.valueOf((long)TEST_NUMBER) + "," +
Double.valueOf(TEST_NUMBER) + "," +
"null" + "," +
Arrays.toString(new byte[] {1, 2, 3, 4, 5});
data.setContent(new Object[] {
Long.valueOf((long)TEST_NUMBER),
Double.valueOf(TEST_NUMBER),
null,
new byte[] {1, 2, 3, 4, 5} },
Data.ARRAY_RECORD);
actual = (String)data.getContent(Data.CSV_RECORD);
assertEquals(expected, actual);
}
@Test
public void testCsvToArray() throws Exception {
Data data = new Data();
Object[] expected;
Object[] actual;
// with special characters:
expected = new Object[] {
Long.valueOf((long)TEST_NUMBER),
Double.valueOf(TEST_NUMBER),
String.valueOf(TEST_NUMBER) + "',s",
new byte[] {1, 2, 3, 4, 5} };
data.setContent(
Long.valueOf((long)TEST_NUMBER) + "," +
Double.valueOf(TEST_NUMBER) + "," +
"'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
Data.CSV_RECORD);
actual = (Object[])data.getContent(Data.ARRAY_RECORD);
assertEquals(expected.length, actual.length);
for (int c=0; c<expected.length; c++) {
assertEquals(expected[c], actual[c]);
}
// with null characters:
expected = new Object[] {
Long.valueOf((long)TEST_NUMBER),
Double.valueOf(TEST_NUMBER),
null,
new byte[] {1, 2, 3, 4, 5} };
data.setContent(
Long.valueOf((long)TEST_NUMBER) + "," +
Double.valueOf(TEST_NUMBER) + "," +
"null" + "," +
Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
Data.CSV_RECORD);
actual = (Object[])data.getContent(Data.ARRAY_RECORD);
assertEquals(expected.length, actual.length);
for (int c=0; c<expected.length; c++) {
assertEquals(expected[c], actual[c]);
}
}
public static void assertEquals(Object expected, Object actual) {
if (expected instanceof byte[]) {
Assert.assertEquals(Arrays.toString((byte[])expected),
Arrays.toString((byte[])actual));
} else {
Assert.assertEquals(expected, actual);
}
}
}

View File

@ -18,8 +18,6 @@
*/
package org.apache.sqoop.job.io;
import com.google.common.base.Charsets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@ -29,13 +27,10 @@
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.MRJobConstants;
import org.junit.Assert;
import org.junit.Test;
public class SqoopWritableTest {
public class TestSqoopWritable {
private final SqoopWritable writable = new SqoopWritable();
@ -49,7 +44,6 @@ public void testStringInStringOut() {
@Test
public void testDataWritten() throws IOException {
String testData = "One ring to rule them all";
byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
writable.setString(testData);
ByteArrayOutputStream ostream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(ostream);

View File

@ -18,6 +18,10 @@
*/
package org.apache.sqoop.job.mr;
import java.util.ConcurrentModificationException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
@ -28,14 +32,11 @@
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.util.MRJobTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ConcurrentModificationException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
public class TestSqoopOutputFormatLoadExecutor {
private Configuration conf;
@ -130,12 +131,12 @@ public void testWhenLoaderThrows() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat data = new CSVIntermediateDataFormat();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable();
try {
for (int count = 0; count < 100; count++) {
data.setTextData(String.valueOf(count));
writable.setString(data.getTextData());
dataFormat.setTextData(String.valueOf(count));
writable.setString(dataFormat.getTextData());
writer.write(writable, null);
}
} catch (SqoopException ex) {
@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat data = new CSVIntermediateDataFormat();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable();
for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder();
@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable {
builder.append(",");
}
}
data.setTextData(builder.toString());
writable.setString(data.getTextData());
dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null);
}
writer.close(null);
@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat data = new CSVIntermediateDataFormat();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable();
StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) {
@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable {
builder.append(",");
}
}
data.setTextData(builder.toString());
writable.setString(data.getTextData());
dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null);
//Allow writer to complete.
@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat data = new CSVIntermediateDataFormat();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable();
try {
for (int i = 0; i < 10; i++) {
@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable {
builder.append(",");
}
}
data.setTextData(builder.toString());
writable.setString(data.getTextData());
dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null);
}
writer.close(null);

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
package org.apache.sqoop.job.util;
import java.io.IOException;
@ -27,20 +27,27 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopSplit;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.apache.sqoop.utils.ClassUtils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class JobUtils {
public class MRJobTestUtil {
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
throws IOException, InterruptedException, ClassNotFoundException {
Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setInputFormatClass(input);
job.setMapperClass(mapper);
@ -52,7 +59,8 @@ public static boolean runJob(Configuration conf,
boolean ret = job.waitForCompletion(true);
// Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner
// Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
// LocalJobRuner
if (isHadoop1()) {
callOutputCommitter(job, output);
}
@ -60,11 +68,25 @@ public static boolean runJob(Configuration conf,
return ret;
}
public static Schema getTestSchema() {
Schema schema = new Schema("Test");
schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new Text("3"));
return schema;
}
public static IntermediateDataFormat<?> getTestIDF() {
return new CSVIntermediateDataFormat(getTestSchema());
}
/**
* Call output format on given job manually.
*/
private static void callOutputCommitter(Job job, Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, InterruptedException {
OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null);
private static void callOutputCommitter(Job job,
Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException,
InterruptedException {
OutputCommitter committer = ((OutputFormat<?,?>) ClassUtils.instantiate(outputFormat))
.getOutputCommitter(null);
JobContext jobContext = mock(JobContext.class);
when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
@ -79,14 +101,13 @@ private static void callOutputCommitter(Job job, Class<? extends OutputFormat<Sq
*/
public static boolean isHadoop1() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
if (version.matches("\\b0\\.20\\..+\\b")
|| version.matches("\\b1\\.\\d\\.\\d")) {
if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) {
return true;
}
return false;
}
private JobUtils() {
private MRJobTestUtil() {
// Disable explicit object creation
}