diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 2a492217..02d1a519 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -22,12 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.idf.matcher.AbstractMatcher; -import org.apache.sqoop.connector.idf.matcher.LocationMatcher; -import org.apache.sqoop.connector.idf.matcher.NameMatcher; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.SchemaError; -import org.apache.sqoop.schema.SchemaMatchOption; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -41,7 +36,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.regex.Matcher; public class CSVIntermediateDataFormat extends IntermediateDataFormat { @@ -71,8 +65,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { private final List stringFieldIndices = new ArrayList(); private final List byteFieldIndices = new ArrayList(); - private Schema fromSchema; - private Schema toSchema; + private Schema schema; /** * {@inheritDoc} @@ -94,11 +87,11 @@ public void setTextData(String text) { * {@inheritDoc} */ @Override - public void setFromSchema(Schema schema) { + public void setSchema(Schema schema) { if(schema == null) { return; } - this.fromSchema = schema; + this.schema = schema; List columns = schema.getColumns(); int i = 0; for(Column col : columns) { @@ -111,19 +104,6 @@ public void setFromSchema(Schema schema) { } } - /** - * {@inheritDoc} - */ - @Override - public void setToSchema(Schema schema) { - if(schema == null) { - return; - } - this.toSchema = schema; - } - - - /** * Custom CSV parser that honors quoting and escaped quotes. * All other escaping is handled elsewhere. @@ -180,69 +160,68 @@ private String[] getFields() { /** * {@inheritDoc} - * - * The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema. - * We currently have 3 methods of matching fields in one schema to another: - * - by location - * - by name - * - user-defined matching - * - * If one schema exists (either to or from) and the other is empty - * We'll match fields based on location. - * If both schemas exist, we'll match names of fields. - * - * In the future, we may want to let users choose the method - * Currently nothing is implemented for user-defined matching */ @Override public Object[] getObjectData() { + if (schema.isEmpty()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + } + String[] fields = getFields(); + if (fields == null) { return null; } - if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + if (fields.length != schema.getColumns().size()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + "The data " + getTextData() + " has the wrong number of fields."); } - AbstractMatcher matcher = getMatcher(fromSchema,toSchema); - String[] outFields = matcher.getMatchingData(fields, fromSchema, toSchema); - Object[] out = new Object[outFields.length]; - - int i = 0; - - // After getting back the data in order that matches the output schema - // We need to un-do the CSV escaping - for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) { - Type colType = col.getType(); - if (outFields[i] == null) { + Object[] out = new Object[fields.length]; + Column[] cols = schema.getColumns().toArray(new Column[fields.length]); + for (int i = 0; i < fields.length; i++) { + Type colType = cols[i].getType(); + if (fields[i].equals("NULL")) { out[i] = null; continue; } - if (colType == Type.TEXT) { - out[i] = unescapeStrings(outFields[i]); - } else if (colType == Type.BINARY) { - out[i] = unescapeByteArray(outFields[i]); - } else if (colType == Type.FIXED_POINT) { - Long byteSize = ((FixedPoint) col).getByteSize(); - if (byteSize != null && byteSize <= Integer.SIZE) { - out[i] = Integer.valueOf(outFields[i]); - } else { - out[i] = Long.valueOf(outFields[i]); - } - } else if (colType == Type.FLOATING_POINT) { - Long byteSize = ((FloatingPoint) col).getByteSize(); - if (byteSize != null && byteSize <= Float.SIZE) { - out[i] = Float.valueOf(outFields[i]); - } else { - out[i] = Double.valueOf(outFields[i]); - } - } else if (colType == Type.DECIMAL) { - out[i] = new BigDecimal(outFields[i]); - } else { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType); + + Long byteSize; + switch(colType) { + case TEXT: + out[i] = unescapeStrings(fields[i]); + break; + case BINARY: + out[i] = unescapeByteArray(fields[i]); + break; + case FIXED_POINT: + byteSize = ((FixedPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Integer.SIZE) { + out[i] = Integer.valueOf(fields[i]); + } else { + out[i] = Long.valueOf(fields[i]); + } + break; + case FLOATING_POINT: + byteSize = ((FloatingPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Float.SIZE) { + out[i] = Float.valueOf(fields[i]); + } else { + out[i] = Double.valueOf(fields[i]); + } + break; + case DECIMAL: + out[i] = new BigDecimal(fields[i]); + break; + case DATE: + case DATE_TIME: + case BIT: + out[i] = fields[i]; + break; + default: + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType); } - i++; } return out; } @@ -380,15 +359,4 @@ private byte[] unescapeByteArray(String orig) { public String toString() { return data; } - - private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) { - if (toSchema.isEmpty() || fromSchema.isEmpty()) { - return new LocationMatcher(); - } else { - return new NameMatcher(); - } - - - } - } diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index d98b779d..5ef6fc64 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -111,14 +111,7 @@ public T getData() { * * @param schema - the schema used for reading data */ - public abstract void setFromSchema(Schema schema); - - /** - * Set the schema for writing data. - * - * @param schema - the schema used for writing data - */ - public abstract void setToSchema(Schema schema); + public abstract void setSchema(Schema schema); /** * Serialize the fields of this object to out. diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java similarity index 77% rename from connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java rename to connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java index 938a5df0..58b709e4 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/LocationMatcher.java @@ -15,21 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.connector.idf.matcher; +package org.apache.sqoop.connector.matcher; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.idf.IntermediateDataFormatError; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.SchemaError; -import org.apache.sqoop.schema.SchemaMatchOption; import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.apache.sqoop.schema.type.Type; - -import java.math.BigDecimal; -import java.util.Iterator; /** @@ -39,24 +31,28 @@ * If TO schema has more fields and they are "nullable", the value will be set to null * If TO schema has extra non-null fields, we'll throw an exception */ -public class LocationMatcher extends AbstractMatcher { +public class LocationMatcher extends Matcher { public static final Logger LOG = Logger.getLogger(LocationMatcher.class); - @Override - public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) { - String[] out = new String[toSchema.getColumns().size()]; + public LocationMatcher(Schema from, Schema to) { + super(from, to); + } + + @Override + public Object[] getMatchingData(Object[] fields) { + + Object[] out = new Object[getToSchema().getColumns().size()]; int i = 0; - if (toSchema.isEmpty()) { + if (getToSchema().isEmpty()) { // If there's no destination schema, no need to convert anything // Just use the original data return fields; } - for (Column col: toSchema.getColumns()) - { + for (Column col: getToSchema().getColumns()) { if (i < fields.length) { if (isNull(fields[i])) { out[i] = null; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java similarity index 52% rename from connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java rename to connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java index e6b23169..8ab13181 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/Matcher.java @@ -15,44 +15,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.connector.idf.matcher; +package org.apache.sqoop.connector.matcher; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Column; -public abstract class AbstractMatcher { +public abstract class Matcher { - //NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats - //NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS. + private final Schema fromSchema; + private final Schema toSchema; + + public Matcher(Schema fromSchema, Schema toSchema) { + if (fromSchema.isEmpty() && toSchema.isEmpty()) { + throw new SqoopException(MatcherError.MATCHER_0000, "Neither a FROM or TO schemas been provided."); + } else if (toSchema.isEmpty()) { + this.fromSchema = fromSchema; + this.toSchema = fromSchema; + } else if (fromSchema.isEmpty()) { + this.fromSchema = toSchema; + this.toSchema = toSchema; + } else { + this.fromSchema = fromSchema; + this.toSchema = toSchema; + } + } /** * * @param fields - * @param fromSchema - * @param toSchema * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema. * Right not "converted" means re-ordering if needed and handling nulls. */ - abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema); - - /*** - * - * @param fromSchema - * @param toSchema - * @return return a schema with which to read the output data - * This always returns the toSchema (since this is used when getting output data), unless its empty - */ - public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) { - if (toSchema.isEmpty()) { - return fromSchema; - } else { - return toSchema; - } + abstract public Object[] getMatchingData(Object[] fields); + public Schema getFromSchema() { + return fromSchema; } - protected boolean isNull(String value) { - if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) { + public Schema getToSchema() { + return toSchema; + } + + protected boolean isNull(Object value) { + if (value == null || value.equals("NULL") + || value.equals("null") || value.equals("'null'") + || value.equals("")) { return true; } return false; diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java new file mode 100644 index 00000000..577b0916 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherError.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.matcher; + +import org.apache.sqoop.common.ErrorCode; + +public enum MatcherError implements ErrorCode { + MATCHER_0000("To few Schemas provided."), + + ; + + private final String message; + + private MatcherError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java new file mode 100644 index 00000000..ae89e6ce --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/MatcherFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.matcher; + +import org.apache.sqoop.schema.Schema; + +public class MatcherFactory { + public static Matcher getMatcher(Schema fromSchema, Schema toSchema) { + if (toSchema.isEmpty() || fromSchema.isEmpty()) { + return new LocationMatcher(fromSchema, toSchema); + } else { + return new NameMatcher(fromSchema, toSchema); + } + } +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java similarity index 81% rename from connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java rename to connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java index 417c85b0..69d5ebd7 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/matcher/NameMatcher.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sqoop.connector.idf.matcher; +package org.apache.sqoop.connector.matcher; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -25,26 +25,31 @@ import java.util.HashMap; -public class NameMatcher extends AbstractMatcher { +public class NameMatcher extends Matcher { + public static final Logger LOG = Logger.getLogger(NameMatcher.class); + public NameMatcher(Schema from, Schema to) { + super(from, to); + } + @Override - public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) { - String[] out = new String[toSchema.getColumns().size()]; + public Object[] getMatchingData(Object[] fields) { + Object[] out = new Object[getToSchema().getColumns().size()]; HashMap colNames = new HashMap(); - for (Column fromCol: fromSchema.getColumns()) { + for (Column fromCol: getFromSchema().getColumns()) { colNames.put(fromCol.getName(), fromCol); } int toIndex = 0; - for (Column toCol: toSchema.getColumns()) { + for (Column toCol: getToSchema().getColumns()) { Column fromCol = colNames.get(toCol.getName()); if (fromCol != null) { - int fromIndex = fromSchema.getColumns().indexOf(fromCol); + int fromIndex = getFromSchema().getColumns().indexOf(fromCol); if (isNull(fields[fromIndex])) { out[toIndex] = null; } else { diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 3954039b..765bedd2 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -20,7 +20,6 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.SchemaMatchOption; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.Text; @@ -40,8 +39,6 @@ public class TestCSVIntermediateDataFormat { private IntermediateDataFormat data; - private Schema emptySchema = new Schema("empty"); - @Before public void setUp() { data = new CSVIntermediateDataFormat(); @@ -73,7 +70,7 @@ public void testNullStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setFromSchema(schema); + data.setSchema(schema); data.setTextData(null); Object[] out = data.getObjectData(); @@ -90,7 +87,7 @@ public void testEmptyStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setFromSchema(schema); + data.setSchema(schema); data.setTextData(""); data.getObjectData(); @@ -110,8 +107,7 @@ public void testStringInObjectOut() { .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setFromSchema(schema); - data.setToSchema(emptySchema); + data.setSchema(schema); data.setTextData(testData); Object[] out = data.getObjectData(); @@ -120,7 +116,7 @@ public void testStringInObjectOut() { assertEquals(new Long(34),out[1]); assertEquals("54",out[2]); assertEquals("random data",out[3]); - assertEquals(-112, ((byte[])out[4])[0]); + assertEquals(-112, ((byte[]) out[4])[0]); assertEquals(54, ((byte[])out[4])[1]); assertEquals("\n", out[5].toString()); } @@ -134,7 +130,7 @@ public void testObjectInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setFromSchema(schema); + data.setSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -164,8 +160,7 @@ public void testObjectInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setFromSchema(schema); - data.setToSchema(emptySchema); + data.setSchema(schema); Object[] in = new Object[6]; in[0] = new Long(10); @@ -188,8 +183,7 @@ public void testStringFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - data.setFromSchema(schema); - data.setToSchema(emptySchema); + data.setSchema(schema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -212,148 +206,30 @@ public void testStringFullRangeOfCharacters() { public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - data.setFromSchema(schema); - data.setToSchema(emptySchema); + data.setSchema(schema); byte[] allCharByteArr = new byte[256]; - for(int i = 0; i < allCharByteArr.length; ++i) { - allCharByteArr[i] = (byte)i; + for (int i = 0; i < allCharByteArr.length; ++i) { + allCharByteArr[i] = (byte) i; } Object[] in = {allCharByteArr}; Object[] inCopy = new Object[1]; - System.arraycopy(in,0,inCopy,0,in.length); + System.arraycopy(in, 0, inCopy, 0, in.length); // Modifies the input array, so we use the copy to confirm data.setObjectData(in); assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); } - /** - * Note that we don't have an EmptyTo matching test - * Because most tests above have empty "to" schema - */ - @Test - public void testMatchingEmptyFrom() { - - data.setFromSchema(emptySchema); - - Schema toSchema = new Schema("To"); - toSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")); - data.setToSchema(toSchema); - - Object[] in = new Object[2]; - in[0] = new Long(10); - in[1] = new Long(34); - - Object[] out = new Object[2]; - out[0] = new Long(10); - out[1] = new Long(34); - - data.setObjectData(in); - - assertTrue(Arrays.deepEquals(out, data.getObjectData())); - } - @Test(expected=SqoopException.class) - public void testMatchingTwoEmptySchema() { - data.setFromSchema(emptySchema); - data.setToSchema(emptySchema); + public void testEmptySchema() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("Test"); + data.setSchema(schema); + data.setTextData(testData); - Object[] in = new Object[2]; - in[0] = new Long(10); - in[1] = new Long(34); - - data.setObjectData(in); - - data.getObjectData(); + Object[] out = data.getObjectData(); } - - @Test - public void testMatchingFewerFromColumns(){ - Schema fromSchema = new Schema("From"); - fromSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")); - data.setFromSchema(fromSchema); - - Schema toSchema = new Schema("To"); - toSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")); - data.setToSchema(toSchema); - - Object[] in = new Object[2]; - in[0] = new Long(10); - in[1] = new Long(34); - - Object[] out = new Object[3]; - out[0] = new Long(10); - out[1] = new Long(34); - out[2] = null; - - data.setObjectData(in); - - assertTrue(Arrays.deepEquals(out, data.getObjectData())); - } - - @Test - public void testMatchingFewerToColumns(){ - Schema fromSchema = new Schema("From"); - fromSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new FixedPoint("3")); - data.setFromSchema(fromSchema); - - Schema toSchema = new Schema("To"); - toSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")); - data.setToSchema(toSchema); - - Object[] in = new Object[3]; - in[0] = new Long(10); - in[1] = new Long(34); - in[2] = new Long(50); - - Object[] out = new Object[2]; - out[0] = new Long(10); - out[1] = new Long(34); - - - data.setObjectData(in); - - assertTrue(Arrays.deepEquals(out, data.getObjectData())); - } - - - @Test - public void testWithSomeNonMatchingFields(){ - - Schema fromSchema = new Schema("From"); - fromSchema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new FixedPoint("3")); - data.setFromSchema(fromSchema); - - Schema toSchema = new Schema("From"); - toSchema.addColumn(new FixedPoint("2")) - .addColumn(new FixedPoint("3")) - .addColumn(new FixedPoint("4")); - data.setToSchema(toSchema); - - Object[] in = new Object[3]; - in[0] = new Long(10); - in[1] = new Long(34); - in[2] = new Long(50); - - Object[] out = new Object[3]; - out[0] = new Long(34); - out[1] = new Long(50); - out[2] = null; - - data.setObjectData(in); - - assertTrue(Arrays.deepEquals(out, data.getObjectData())); - } - } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 8c88d527..03d84d4b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -29,13 +29,14 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.matcher.Matcher; +import org.apache.sqoop.connector.matcher.MatcherFactory; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; -import org.apache.sqoop.schema.Schema; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.utils.ClassUtils; @@ -54,8 +55,9 @@ public class SqoopMapper extends Mapper dataFormat = null; - private SqoopWritable dataOut = null; + private IntermediateDataFormat fromDataFormat = null; + private IntermediateDataFormat toDataFormat = null; + private Matcher matcher; @Override public void run(Context context) throws IOException, InterruptedException { @@ -64,19 +66,17 @@ public void run(Context context) throws IOException, InterruptedException { String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); - - - Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); - Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); + matcher = MatcherFactory.getMatcher( + ConfigurationUtils.getConnectorSchema(Direction.FROM, conf), + ConfigurationUtils.getConnectorSchema(Direction.TO, conf)); String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT); - dataFormat = (IntermediateDataFormat) ClassUtils + fromDataFormat = (IntermediateDataFormat) ClassUtils .instantiate(intermediateDataFormatName); - - dataFormat.setFromSchema(fromSchema); - dataFormat.setToSchema(toSchema); - - dataOut = new SqoopWritable(); + fromDataFormat.setSchema(matcher.getFromSchema()); + toDataFormat = (IntermediateDataFormat) ClassUtils + .instantiate(intermediateDataFormatName); + toDataFormat.setSchema(matcher.getToSchema()); // Objects that should be passed to the Executor execution PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); @@ -109,36 +109,41 @@ public void run(Context context) throws IOException, InterruptedException { private class SqoopMapDataWriter extends DataWriter { private Context context; + private SqoopWritable writable; public SqoopMapDataWriter(Context context) { this.context = context; + this.writable = new SqoopWritable(); } @Override public void writeArrayRecord(Object[] array) { - dataFormat.setObjectData(array); + fromDataFormat.setObjectData(array); writeContent(); } @Override public void writeStringRecord(String text) { - dataFormat.setTextData(text); + fromDataFormat.setTextData(text); writeContent(); } @Override public void writeRecord(Object obj) { - dataFormat.setData(obj.toString()); + fromDataFormat.setData(obj.toString()); writeContent(); } private void writeContent() { try { if (LOG.isDebugEnabled()) { - LOG.debug("Extracted data: " + dataFormat.getTextData()); + LOG.debug("Extracted data: " + fromDataFormat.getTextData()); } - dataOut.setString(dataFormat.getTextData()); - context.write(dataOut, NullWritable.get()); + + toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) ); + + writable.setString(toDataFormat.getTextData()); + context.write(writable, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 941b31d7..1ebd3e43 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -34,6 +34,8 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.matcher.Matcher; +import org.apache.sqoop.connector.matcher.MatcherFactory; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.common.PrefixContext; @@ -52,6 +54,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; private volatile IntermediateDataFormat dataFormat; + private Matcher matcher; private JobContext context; private SqoopRecordWriter writer; private Future consumerFuture; @@ -65,19 +68,18 @@ public class SqoopOutputFormatLoadExecutor { this.loaderName = loaderName; dataFormat = new CSVIntermediateDataFormat(); writer = new SqoopRecordWriter(); + matcher = null; } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { context = jobctx; writer = new SqoopRecordWriter(); + matcher = MatcherFactory.getMatcher( + ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), + ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); dataFormat = (IntermediateDataFormat) ClassUtils.instantiate(context - .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - - Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); - dataFormat.setFromSchema(fromSchema); - - Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); - dataFormat.setToSchema(toSchema); + .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); + dataFormat.setSchema(matcher.getToSchema()); } public RecordWriter getRecordWriter() { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java index b5435ff8..1952cbbd 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -36,14 +36,7 @@ public class JobUtils { - public static void runJob(Configuration conf) - throws IOException, InterruptedException, ClassNotFoundException { - runJob(conf, SqoopInputFormat.class, SqoopMapper.class, - (conf.get(JobConstants.HADOOP_OUTDIR) != null) ? - SqoopFileOutputFormat.class : SqoopNullOutputFormat.class); - } - - public static void runJob(Configuration conf, + public static boolean runJob(Configuration conf, Class> input, Class> mapper, Class> output) @@ -57,8 +50,7 @@ public static void runJob(Configuration conf, job.setOutputKeyClass(SqoopWritable.class); job.setOutputValueClass(NullWritable.class); - boolean success = job.waitForCompletion(true); - Assert.assertEquals("Job failed!", true, success); + return job.waitForCompletion(true); } private JobUtils() { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 5662120f..032cc11d 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -54,6 +54,7 @@ import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; import org.apache.sqoop.schema.type.Text; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -96,8 +97,10 @@ public void testMapper() throws Exception { Job job = new Job(conf); ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, - DummyOutputFormat.class); + ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + boolean success = JobUtils.runJob(job.getConfiguration(), + SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); } @Test @@ -116,8 +119,11 @@ public void testOutputFormat() throws Exception { Job job = new Job(conf); ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + boolean success = JobUtils.runJob(job.getConfiguration(), + SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); + Assert.assertEquals("Job failed!", true, success); // Make sure both destroyers get called. assertEquals(1, DummyFromDestroyer.count); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java new file mode 100644 index 00000000..7f9a1473 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; +import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.mr.SqoopInputFormat; +import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertEquals; + + +@RunWith(Parameterized.class) +public class TestMatching { + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 1; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 1; + + private Schema from; + private Schema to; + + public TestMatching(Schema from, + Schema to) + throws Exception { + this.from = from; + this.to = to; + + System.out.println("Testing with Schemas\n\tFROM: " + this.from + "\n\tTO: " + this.to); + } + + @Parameterized.Parameters + public static Collection data() { + List parameters = new ArrayList(); + + Schema emptyFrom = new Schema("FROM-EMPTY"); + Schema emptyTo = new Schema("TO-EMPTY"); + Schema from1 = new Schema("FROM-1"); + Schema to1 = new Schema("TO-1"); + Schema from2 = new Schema("FROM-2"); + Schema to2 = new Schema("TO-2"); + + from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")); + to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")); + + parameters.add(new Object[]{ + emptyFrom, + emptyTo + }); + parameters.add(new Object[]{ + from1, + emptyTo + }); + parameters.add(new Object[]{ + emptyTo, + to1 + }); + parameters.add(new Object[]{ + from1, + to1 + }); + parameters.add(new Object[]{ + from2, + to1 + }); + parameters.add(new Object[]{ + from1, + to2 + }); + + return parameters; + } + + @Test + public void testSchemaMatching() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); + ConfigurationUtils.setConnectorSchema(Direction.TO, job, to); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + boolean success = JobUtils.runJob(job.getConfiguration(), + SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + if (from.getName().split("-")[1].equals("EMPTY")) { + if (to.getName().split("-")[1].equals("EMPTY")) { + Assert.assertEquals("Job succeeded!", false, success); + } else { + Assert.assertEquals("Job failed!", true, success); + } + } else { + if (to.getName().split("-")[1].equals("EMPTY")) { + Assert.assertEquals("Job failed!", true, success); + } else if (from.getName().split("-")[1].equals(to.getName().split("-")[1])) { + Assert.assertEquals("Job failed!", true, success); + } else { + Assert.assertEquals("Job succeeded!", false, success); + } + } + } + + public static class DummyPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + + @Override + public String toString() { + return Integer.toString(id); + } + } + + public static class DummyPartitioner extends Partitioner { + @Override + public List getPartitions(PartitionerContext context, Object oc, Object oj) { + List partitions = new LinkedList(); + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + DummyPartition partition = new DummyPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyExtractor extends Extractor { + @Override + public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { + int id = ((DummyPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { + context.getDataWriter().writeArrayRecord(new Object[] { + id * NUMBER_OF_ROWS_PER_PARTITION + row, + (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + } + } + + @Override + public long getRowsRead() { + return NUMBER_OF_ROWS_PER_PARTITION; + } + } + + public static class DummyOutputFormat + extends OutputFormat { + @Override + public void checkOutputSpecs(JobContext context) { + // do nothing + } + + @Override + public RecordWriter getRecordWriter( + TaskAttemptContext context) { + return new DummyRecordWriter(); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new DummyOutputCommitter(); + } + + public static class DummyRecordWriter + extends RecordWriter { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + private Data data = new Data(); + + @Override + public void write(SqoopWritable key, NullWritable value) { + + data.setContent(new Object[] { + index, + (double) index, + String.valueOf(index)}, + Data.ARRAY_RECORD); + index++; + + assertEquals(data.toString(), key.toString()); + } + + @Override + public void close(TaskAttemptContext context) { + // do nothing + } + } + + public static class DummyOutputCommitter extends OutputCommitter { + @Override + public void setupJob(JobContext jobContext) { } + + @Override + public void setupTask(TaskAttemptContext taskContext) { } + + @Override + public void commitTask(TaskAttemptContext taskContext) { } + + @Override + public void abortTask(TaskAttemptContext taskContext) { } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + } + } +}