5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 04:50:47 +08:00

SQOOP-1560: Sqoop2: Move matcher out of Schema

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-10-02 16:55:29 -07:00 committed by Abraham Elmahrek
parent 97da12aa50
commit c1e53e5d54
13 changed files with 515 additions and 319 deletions

View File

@ -22,12 +22,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.matcher.AbstractMatcher;
import org.apache.sqoop.connector.idf.matcher.LocationMatcher;
import org.apache.sqoop.connector.idf.matcher.NameMatcher;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
@ -41,7 +36,6 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@ -71,8 +65,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
private Schema fromSchema;
private Schema toSchema;
private Schema schema;
/**
* {@inheritDoc}
@ -94,11 +87,11 @@ public void setTextData(String text) {
* {@inheritDoc}
*/
@Override
public void setFromSchema(Schema schema) {
public void setSchema(Schema schema) {
if(schema == null) {
return;
}
this.fromSchema = schema;
this.schema = schema;
List<Column> columns = schema.getColumns();
int i = 0;
for(Column col : columns) {
@ -111,19 +104,6 @@ public void setFromSchema(Schema schema) {
}
}
/**
* {@inheritDoc}
*/
@Override
public void setToSchema(Schema schema) {
if(schema == null) {
return;
}
this.toSchema = schema;
}
/**
* Custom CSV parser that honors quoting and escaped quotes.
* All other escaping is handled elsewhere.
@ -180,69 +160,68 @@ private String[] getFields() {
/**
* {@inheritDoc}
*
* The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema.
* We currently have 3 methods of matching fields in one schema to another:
* - by location
* - by name
* - user-defined matching
*
* If one schema exists (either to or from) and the other is empty
* We'll match fields based on location.
* If both schemas exist, we'll match names of fields.
*
* In the future, we may want to let users choose the method
* Currently nothing is implemented for user-defined matching
*/
@Override
public Object[] getObjectData() {
if (schema.isEmpty()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
}
String[] fields = getFields();
if (fields == null) {
return null;
}
if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
if (fields.length != schema.getColumns().size()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
"The data " + getTextData() + " has the wrong number of fields.");
}
AbstractMatcher matcher = getMatcher(fromSchema,toSchema);
String[] outFields = matcher.getMatchingData(fields, fromSchema, toSchema);
Object[] out = new Object[outFields.length];
int i = 0;
// After getting back the data in order that matches the output schema
// We need to un-do the CSV escaping
for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) {
Type colType = col.getType();
if (outFields[i] == null) {
Object[] out = new Object[fields.length];
Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
for (int i = 0; i < fields.length; i++) {
Type colType = cols[i].getType();
if (fields[i].equals("NULL")) {
out[i] = null;
continue;
}
if (colType == Type.TEXT) {
out[i] = unescapeStrings(outFields[i]);
} else if (colType == Type.BINARY) {
out[i] = unescapeByteArray(outFields[i]);
} else if (colType == Type.FIXED_POINT) {
Long byteSize = ((FixedPoint) col).getByteSize();
Long byteSize;
switch(colType) {
case TEXT:
out[i] = unescapeStrings(fields[i]);
break;
case BINARY:
out[i] = unescapeByteArray(fields[i]);
break;
case FIXED_POINT:
byteSize = ((FixedPoint) cols[i]).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
out[i] = Integer.valueOf(outFields[i]);
out[i] = Integer.valueOf(fields[i]);
} else {
out[i] = Long.valueOf(outFields[i]);
out[i] = Long.valueOf(fields[i]);
}
} else if (colType == Type.FLOATING_POINT) {
Long byteSize = ((FloatingPoint) col).getByteSize();
break;
case FLOATING_POINT:
byteSize = ((FloatingPoint) cols[i]).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
out[i] = Float.valueOf(outFields[i]);
out[i] = Float.valueOf(fields[i]);
} else {
out[i] = Double.valueOf(outFields[i]);
out[i] = Double.valueOf(fields[i]);
}
} else if (colType == Type.DECIMAL) {
out[i] = new BigDecimal(outFields[i]);
} else {
break;
case DECIMAL:
out[i] = new BigDecimal(fields[i]);
break;
case DATE:
case DATE_TIME:
case BIT:
out[i] = fields[i];
break;
default:
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
}
i++;
}
return out;
}
@ -380,15 +359,4 @@ private byte[] unescapeByteArray(String orig) {
public String toString() {
return data;
}
private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) {
if (toSchema.isEmpty() || fromSchema.isEmpty()) {
return new LocationMatcher();
} else {
return new NameMatcher();
}
}
}

View File

@ -111,14 +111,7 @@ public T getData() {
*
* @param schema - the schema used for reading data
*/
public abstract void setFromSchema(Schema schema);
/**
* Set the schema for writing data.
*
* @param schema - the schema used for writing data
*/
public abstract void setToSchema(Schema schema);
public abstract void setSchema(Schema schema);
/**
* Serialize the fields of this object to <code>out</code>.

View File

@ -15,21 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
package org.apache.sqoop.connector.matcher;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Type;
import java.math.BigDecimal;
import java.util.Iterator;
/**
@ -39,24 +31,28 @@
* If TO schema has more fields and they are "nullable", the value will be set to null
* If TO schema has extra non-null fields, we'll throw an exception
*/
public class LocationMatcher extends AbstractMatcher {
public class LocationMatcher extends Matcher {
public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
@Override
public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
String[] out = new String[toSchema.getColumns().size()];
public LocationMatcher(Schema from, Schema to) {
super(from, to);
}
@Override
public Object[] getMatchingData(Object[] fields) {
Object[] out = new Object[getToSchema().getColumns().size()];
int i = 0;
if (toSchema.isEmpty()) {
if (getToSchema().isEmpty()) {
// If there's no destination schema, no need to convert anything
// Just use the original data
return fields;
}
for (Column col: toSchema.getColumns())
{
for (Column col: getToSchema().getColumns()) {
if (i < fields.length) {
if (isNull(fields[i])) {
out[i] = null;

View File

@ -15,44 +15,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
package org.apache.sqoop.connector.matcher;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
public abstract class AbstractMatcher {
public abstract class Matcher {
//NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats
//NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS.
private final Schema fromSchema;
private final Schema toSchema;
public Matcher(Schema fromSchema, Schema toSchema) {
if (fromSchema.isEmpty() && toSchema.isEmpty()) {
throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided.");
} else if (toSchema.isEmpty()) {
this.fromSchema = fromSchema;
this.toSchema = fromSchema;
} else if (fromSchema.isEmpty()) {
this.fromSchema = toSchema;
this.toSchema = toSchema;
} else {
this.fromSchema = fromSchema;
this.toSchema = toSchema;
}
}
/**
*
* @param fields
* @param fromSchema
* @param toSchema
* @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
* Right not "converted" means re-ordering if needed and handling nulls.
*/
abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema);
abstract public Object[] getMatchingData(Object[] fields);
/***
*
* @param fromSchema
* @param toSchema
* @return return a schema with which to read the output data
* This always returns the toSchema (since this is used when getting output data), unless its empty
*/
public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) {
if (toSchema.isEmpty()) {
public Schema getFromSchema() {
return fromSchema;
} else {
}
public Schema getToSchema() {
return toSchema;
}
}
protected boolean isNull(String value) {
if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) {
protected boolean isNull(Object value) {
if (value == null || value.equals("NULL")
|| value.equals("null") || value.equals("'null'")
|| value.equals("")) {
return true;
}
return false;

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.matcher;
import org.apache.sqoop.common.ErrorCode;
public enum MatcherError implements ErrorCode {
MATCHER_0000("To few Schemas provided."),
;
private final String message;
private MatcherError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.matcher;
import org.apache.sqoop.schema.Schema;
public class MatcherFactory {
public static Matcher getMatcher(Schema fromSchema, Schema toSchema) {
if (toSchema.isEmpty() || fromSchema.isEmpty()) {
return new LocationMatcher(fromSchema, toSchema);
} else {
return new NameMatcher(fromSchema, toSchema);
}
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
package org.apache.sqoop.connector.matcher;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
@ -25,26 +25,31 @@
import java.util.HashMap;
public class NameMatcher extends AbstractMatcher {
public class NameMatcher extends Matcher {
public static final Logger LOG = Logger.getLogger(NameMatcher.class);
public NameMatcher(Schema from, Schema to) {
super(from, to);
}
@Override
public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
String[] out = new String[toSchema.getColumns().size()];
public Object[] getMatchingData(Object[] fields) {
Object[] out = new Object[getToSchema().getColumns().size()];
HashMap<String,Column> colNames = new HashMap<String, Column>();
for (Column fromCol: fromSchema.getColumns()) {
for (Column fromCol: getFromSchema().getColumns()) {
colNames.put(fromCol.getName(), fromCol);
}
int toIndex = 0;
for (Column toCol: toSchema.getColumns()) {
for (Column toCol: getToSchema().getColumns()) {
Column fromCol = colNames.get(toCol.getName());
if (fromCol != null) {
int fromIndex = fromSchema.getColumns().indexOf(fromCol);
int fromIndex = getFromSchema().getColumns().indexOf(fromCol);
if (isNull(fields[fromIndex])) {
out[toIndex] = null;
} else {

View File

@ -20,7 +20,6 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
@ -40,8 +39,6 @@ public class TestCSVIntermediateDataFormat {
private IntermediateDataFormat<?> data;
private Schema emptySchema = new Schema("empty");
@Before
public void setUp() {
data = new CSVIntermediateDataFormat();
@ -73,7 +70,7 @@ public void testNullStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setFromSchema(schema);
data.setSchema(schema);
data.setTextData(null);
Object[] out = data.getObjectData();
@ -90,7 +87,7 @@ public void testEmptyStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setFromSchema(schema);
data.setSchema(schema);
data.setTextData("");
data.getObjectData();
@ -110,8 +107,7 @@ public void testStringInObjectOut() {
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setFromSchema(schema);
data.setToSchema(emptySchema);
data.setSchema(schema);
data.setTextData(testData);
Object[] out = data.getObjectData();
@ -120,7 +116,7 @@ public void testStringInObjectOut() {
assertEquals(new Long(34),out[1]);
assertEquals("54",out[2]);
assertEquals("random data",out[3]);
assertEquals(-112, ((byte[])out[4])[0]);
assertEquals(-112, ((byte[]) out[4])[0]);
assertEquals(54, ((byte[])out[4])[1]);
assertEquals("\n", out[5].toString());
}
@ -134,7 +130,7 @@ public void testObjectInStringOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setFromSchema(schema);
data.setSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
@ -164,8 +160,7 @@ public void testObjectInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setFromSchema(schema);
data.setToSchema(emptySchema);
data.setSchema(schema);
Object[] in = new Object[6];
in[0] = new Long(10);
@ -188,8 +183,7 @@ public void testStringFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
data.setFromSchema(schema);
data.setToSchema(emptySchema);
data.setSchema(schema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
@ -212,148 +206,30 @@ public void testStringFullRangeOfCharacters() {
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
data.setFromSchema(schema);
data.setToSchema(emptySchema);
data.setSchema(schema);
byte[] allCharByteArr = new byte[256];
for(int i = 0; i < allCharByteArr.length; ++i) {
allCharByteArr[i] = (byte)i;
for (int i = 0; i < allCharByteArr.length; ++i) {
allCharByteArr[i] = (byte) i;
}
Object[] in = {allCharByteArr};
Object[] inCopy = new Object[1];
System.arraycopy(in,0,inCopy,0,in.length);
System.arraycopy(in, 0, inCopy, 0, in.length);
// Modifies the input array, so we use the copy to confirm
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
/**
* Note that we don't have an EmptyTo matching test
* Because most tests above have empty "to" schema
*/
@Test
public void testMatchingEmptyFrom() {
data.setFromSchema(emptySchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setToSchema(toSchema);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
Object[] out = new Object[2];
out[0] = new Long(10);
out[1] = new Long(34);
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test(expected=SqoopException.class)
public void testMatchingTwoEmptySchema() {
data.setFromSchema(emptySchema);
data.setToSchema(emptySchema);
public void testEmptySchema() {
String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ ",'\\n'";
Schema schema = new Schema("Test");
data.setSchema(schema);
data.setTextData(testData);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
data.setObjectData(in);
data.getObjectData();
Object[] out = data.getObjectData();
}
@Test
public void testMatchingFewerFromColumns(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"));
data.setToSchema(toSchema);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
Object[] out = new Object[3];
out[0] = new Long(10);
out[1] = new Long(34);
out[2] = null;
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test
public void testMatchingFewerToColumns(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setToSchema(toSchema);
Object[] in = new Object[3];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = new Long(50);
Object[] out = new Object[2];
out[0] = new Long(10);
out[1] = new Long(34);
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test
public void testWithSomeNonMatchingFields(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("From");
toSchema.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"))
.addColumn(new FixedPoint("4"));
data.setToSchema(toSchema);
Object[] in = new Object[3];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = new Long(50);
Object[] out = new Object[3];
out[0] = new Long(34);
out[1] = new Long(50);
out[2] = null;
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
}

View File

@ -29,13 +29,14 @@
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
@ -54,8 +55,9 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
private IntermediateDataFormat<String> dataFormat = null;
private SqoopWritable dataOut = null;
private IntermediateDataFormat<String> fromDataFormat = null;
private IntermediateDataFormat<String> toDataFormat = null;
private Matcher matcher;
@Override
public void run(Context context) throws IOException, InterruptedException {
@ -64,19 +66,17 @@ public void run(Context context) throws IOException, InterruptedException {
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
matcher = MatcherFactory.getMatcher(
ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
dataFormat = (IntermediateDataFormat<String>) ClassUtils
fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
dataFormat.setFromSchema(fromSchema);
dataFormat.setToSchema(toSchema);
dataOut = new SqoopWritable();
fromDataFormat.setSchema(matcher.getFromSchema());
toDataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
toDataFormat.setSchema(matcher.getToSchema());
// Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
@ -109,36 +109,41 @@ public void run(Context context) throws IOException, InterruptedException {
private class SqoopMapDataWriter extends DataWriter {
private Context context;
private SqoopWritable writable;
public SqoopMapDataWriter(Context context) {
this.context = context;
this.writable = new SqoopWritable();
}
@Override
public void writeArrayRecord(Object[] array) {
dataFormat.setObjectData(array);
fromDataFormat.setObjectData(array);
writeContent();
}
@Override
public void writeStringRecord(String text) {
dataFormat.setTextData(text);
fromDataFormat.setTextData(text);
writeContent();
}
@Override
public void writeRecord(Object obj) {
dataFormat.setData(obj.toString());
fromDataFormat.setData(obj.toString());
writeContent();
}
private void writeContent() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Extracted data: " + dataFormat.getTextData());
LOG.debug("Extracted data: " + fromDataFormat.getTextData());
}
dataOut.setString(dataFormat.getTextData());
context.write(dataOut, NullWritable.get());
toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) );
writable.setString(toDataFormat.getTextData());
context.write(writable, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
}

View File

@ -34,6 +34,8 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.common.PrefixContext;
@ -52,6 +54,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
private volatile IntermediateDataFormat<String> dataFormat;
private Matcher matcher;
private JobContext context;
private SqoopRecordWriter writer;
private Future<?> consumerFuture;
@ -65,19 +68,18 @@ public class SqoopOutputFormatLoadExecutor {
this.loaderName = loaderName;
dataFormat = new CSVIntermediateDataFormat();
writer = new SqoopRecordWriter();
matcher = null;
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
writer = new SqoopRecordWriter();
matcher = MatcherFactory.getMatcher(
ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
dataFormat.setFromSchema(fromSchema);
Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
dataFormat.setToSchema(toSchema);
dataFormat.setSchema(matcher.getToSchema());
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

View File

@ -36,14 +36,7 @@
public class JobUtils {
public static void runJob(Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
(conf.get(JobConstants.HADOOP_OUTDIR) != null) ?
SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
}
public static void runJob(Configuration conf,
public static boolean runJob(Configuration conf,
Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
@ -57,8 +50,7 @@ public static void runJob(Configuration conf,
job.setOutputKeyClass(SqoopWritable.class);
job.setOutputValueClass(NullWritable.class);
boolean success = job.waitForCompletion(true);
Assert.assertEquals("Job failed!", true, success);
return job.waitForCompletion(true);
}
private JobUtils() {

View File

@ -54,6 +54,7 @@
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -96,8 +97,10 @@ public void testMapper() throws Exception {
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
}
@Test
@ -116,8 +119,11 @@ public void testOutputFormat() throws Exception {
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
Assert.assertEquals("Job failed!", true, success);
// Make sure both destroyers get called.
assertEquals(1, DummyFromDestroyer.count);

View File

@ -0,0 +1,275 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestMatching {
private static final int START_PARTITION = 1;
private static final int NUMBER_OF_PARTITIONS = 1;
private static final int NUMBER_OF_ROWS_PER_PARTITION = 1;
private Schema from;
private Schema to;
public TestMatching(Schema from,
Schema to)
throws Exception {
this.from = from;
this.to = to;
System.out.println("Testing with Schemas\n\tFROM: " + this.from + "\n\tTO: " + this.to);
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<Object[]>();
Schema emptyFrom = new Schema("FROM-EMPTY");
Schema emptyTo = new Schema("TO-EMPTY");
Schema from1 = new Schema("FROM-1");
Schema to1 = new Schema("TO-1");
Schema from2 = new Schema("FROM-2");
Schema to2 = new Schema("TO-2");
from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
.addColumn(new org.apache.sqoop.schema.type.Text("3"));
from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
parameters.add(new Object[]{
emptyFrom,
emptyTo
});
parameters.add(new Object[]{
from1,
emptyTo
});
parameters.add(new Object[]{
emptyTo,
to1
});
parameters.add(new Object[]{
from1,
to1
});
parameters.add(new Object[]{
from2,
to1
});
parameters.add(new Object[]{
from1,
to2
});
return parameters;
}
@Test
public void testSchemaMatching() throws Exception {
Configuration conf = new Configuration();
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
boolean success = JobUtils.runJob(job.getConfiguration(),
SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
if (from.getName().split("-")[1].equals("EMPTY")) {
if (to.getName().split("-")[1].equals("EMPTY")) {
Assert.assertEquals("Job succeeded!", false, success);
} else {
Assert.assertEquals("Job failed!", true, success);
}
} else {
if (to.getName().split("-")[1].equals("EMPTY")) {
Assert.assertEquals("Job failed!", true, success);
} else if (from.getName().split("-")[1].equals(to.getName().split("-")[1])) {
Assert.assertEquals("Job failed!", true, success);
} else {
Assert.assertEquals("Job succeeded!", false, success);
}
}
}
public static class DummyPartition extends Partition {
private int id;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
}
@Override
public String toString() {
return Integer.toString(id);
}
}
public static class DummyPartitioner extends Partitioner {
@Override
public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
List<Partition> partitions = new LinkedList<Partition>();
for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
DummyPartition partition = new DummyPartition();
partition.setId(id);
partitions.add(partition);
}
return partitions;
}
}
public static class DummyExtractor extends Extractor {
@Override
public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
int id = ((DummyPartition)partition).getId();
for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
context.getDataWriter().writeArrayRecord(new Object[] {
id * NUMBER_OF_ROWS_PER_PARTITION + row,
(double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
}
}
@Override
public long getRowsRead() {
return NUMBER_OF_ROWS_PER_PARTITION;
}
}
public static class DummyOutputFormat
extends OutputFormat<SqoopWritable, NullWritable> {
@Override
public void checkOutputSpecs(JobContext context) {
// do nothing
}
@Override
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) {
return new DummyRecordWriter();
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new DummyOutputCommitter();
}
public static class DummyRecordWriter
extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data();
@Override
public void write(SqoopWritable key, NullWritable value) {
data.setContent(new Object[] {
index,
(double) index,
String.valueOf(index)},
Data.ARRAY_RECORD);
index++;
assertEquals(data.toString(), key.toString());
}
@Override
public void close(TaskAttemptContext context) {
// do nothing
}
}
public static class DummyOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext jobContext) { }
@Override
public void setupTask(TaskAttemptContext taskContext) { }
@Override
public void commitTask(TaskAttemptContext taskContext) { }
@Override
public void abortTask(TaskAttemptContext taskContext) { }
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
}
}
}