From 2c20d920f4ab31ae97ba57952d17677146069b5c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 25 Sep 2014 15:39:43 -0700 Subject: [PATCH] SQOOP-1378: Sqoop2: From/To: Refactor schema This patch also changes the tools documentation. --- .../sqoop/job/etl/ExtractorContext.java | 13 +- .../org/apache/sqoop/json/SchemaBean.java | 2 +- .../org/apache/sqoop/json/SubmissionBean.java | 7 +- .../sqoop/json/util/SchemaSerialization.java | 2 +- .../org/apache/sqoop/model/MSubmission.java | 15 +- .../java/org/apache/sqoop/schema/Schema.java | 11 +- .../org/apache/sqoop/schema/SchemaError.java | 6 + .../sqoop/schema/SchemaMatchOption.java | 40 +++++ .../org/apache/sqoop/schema/type/Column.java | 6 +- .../json/util/TestSchemaSerialization.java | 2 +- .../sqoop/connector/jdbc/TestExtractor.java | 4 +- .../sqoop/connector/hdfs/HdfsConnector.java | 5 +- .../connector/hdfs/HdfsConnectorError.java | 3 +- .../sqoop/connector/hdfs/HdfsInitializer.java | 2 +- .../sqoop/connector/hdfs/TestExtractor.java | 4 +- .../idf/CSVIntermediateDataFormat.java | 100 ++++++++---- .../connector/idf/IntermediateDataFormat.java | 16 +- .../idf/matcher/AbstractMatcher.java | 62 +++++++ .../idf/matcher/LocationMatcher.java | 82 ++++++++++ .../connector/idf/matcher/NameMatcher.java | 69 ++++++++ .../idf/TestCSVIntermediateDataFormat.java | 151 +++++++++++++++++- .../org/apache/sqoop/driver/JobManager.java | 15 +- docs/src/site/sphinx/Tools.rst | 3 +- .../sqoop/job/mr/ConfigurationUtils.java | 53 +++--- .../org/apache/sqoop/job/mr/SqoopMapper.java | 18 +-- .../job/mr/SqoopOutputFormatLoadExecutor.java | 15 +- .../shell/utils/SubmissionDisplayer.java | 4 +- .../main/resources/shell-resource.properties | 5 +- .../mapreduce/MapreduceSubmissionEngine.java | 7 +- 29 files changed, 584 insertions(+), 138 deletions(-) create mode 100644 common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java create mode 100644 connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java index fd73890d..3272b56d 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java @@ -30,12 +30,10 @@ public class ExtractorContext extends TransferableContext { private DataWriter writer; - private Schema schema; - public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) { + public ExtractorContext(ImmutableContext context, DataWriter writer) { super(context); this.writer = writer; - this.schema = schema; } /** @@ -47,12 +45,5 @@ public DataWriter getDataWriter() { return writer; } - /** - * Return schema associated with this step. - * - * @return - */ - public Schema getSchema() { - return schema; - } + } diff --git a/common/src/main/java/org/apache/sqoop/json/SchemaBean.java b/common/src/main/java/org/apache/sqoop/json/SchemaBean.java index 468f7eef..f51fec85 100644 --- a/common/src/main/java/org/apache/sqoop/json/SchemaBean.java +++ b/common/src/main/java/org/apache/sqoop/json/SchemaBean.java @@ -48,7 +48,7 @@ public JSONObject extract(boolean skipSensitive) { @Override public void restore(JSONObject jsonObject) { - schema = SchemaSerialization.restoreSchemna(jsonObject); + schema = SchemaSerialization.restoreSchema(jsonObject); } } diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java index 9b1ae742..4b803380 100644 --- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java +++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java @@ -32,7 +32,7 @@ import java.util.Set; import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema; -import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchemna; +import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema; /** * @@ -188,11 +188,12 @@ public void restore(JSONObject json) { if(object.containsKey(COUNTERS)) { submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS))); } + if(object.containsKey(FROM_SCHEMA)) { - submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA))); + submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA))); } if(object.containsKey(TO_SCHEMA)) { - submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA))); + submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA))); } this.submissions.add(submission); diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java index f6a9bbf4..1e6da6d6 100644 --- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java +++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java @@ -79,7 +79,7 @@ public static JSONObject extractSchema(Schema schema) { return object; } - public static Schema restoreSchemna(JSONObject jsonObject) { + public static Schema restoreSchema(JSONObject jsonObject) { String name = (String)jsonObject.get(NAME); String note = (String)jsonObject.get(NOTE); java.util.Date date = new java.util.Date((Long)jsonObject.get(CREATION_DATE)); diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java index ca211359..7290df50 100644 --- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java +++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java @@ -102,17 +102,14 @@ public class MSubmission extends MAccountableEntity { /** * Schema for the FROM part of the job submission * - * This property is required. + * This property is required, but can be empty. */ Schema fromSchema; /** * Schema for the TO part of the job submission - * Optional schema that reported by the underlying I/O implementation. Please - * note that this property might be empty and in such case use the FROM schema - * on the TO side. * - * This property is optional. + * This property is required, but can be empty. */ Schema toSchema; @@ -224,16 +221,16 @@ public Schema getFromSchema() { return fromSchema; } - public void setFromSchema(Schema connectorSchema) { - this.fromSchema = connectorSchema; + public void setFromSchema(Schema fromSchema) { + this.fromSchema = fromSchema; } public Schema getToSchema() { return toSchema; } - public void setToSchema(Schema hioSchema) { - this.toSchema = hioSchema; + public void setToSchema(Schema toSchema) { + this.toSchema = toSchema; } @Override diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java index bbebab84..40c362c8 100644 --- a/common/src/main/java/org/apache/sqoop/schema/Schema.java +++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java @@ -77,7 +77,7 @@ public Schema(String name) { * same name will lead to an exception being thrown. * * @param column Column that should be added to the schema at the end. - * @return + * @return a reference to this object */ public Schema addColumn(Column column) { if(column.getName() == null) { @@ -121,6 +121,15 @@ public List getColumns() { return columns; } + public boolean isEmpty() { + if (columns.size()==0) { + return true; + } else { + return false; + } + + } + public String toString() { return new StringBuilder("Schema{") .append("name=").append(name).append("") diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java index 7c8c61e4..d430a643 100644 --- a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java +++ b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java @@ -30,6 +30,12 @@ public enum SchemaError implements ErrorCode { SCHEMA_0002("Duplicate column name"), + SCHEMA_0003("Source and Target schemas don't match"), + + SCHEMA_0004("Non-null target column has no matching source column"), + + SCHEMA_0005("No matching method available for source and target schemas") + ; private final String message; diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java b/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java new file mode 100644 index 00000000..e3ab026d --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/schema/SchemaMatchOption.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.schema; + +/** + * The order of the matching options here indicates an order of preference + * if it is possible to use both NAME and LOCATION matching options, we will prefer NAME + * + * NAME - match columns in FROM and TO schemas by column name. Data from column "hello" + * will be written to a column named "hello". If TO schema doesn't have a column with + * identical name, the column will be skipped. + * + * LOCATION - match columns in FROM and TO schemas by the column location. + * Data from first column goes into first column in TO link. + * If FROM link has more columns than TO, the extra columns will be skipped. + * + * USER_DEFINED - not implemented yet. + */ +public enum SchemaMatchOption { + NAME, + LOCATION, + //TODO: SQOOP-1546 - SQOOP2: Allow users to define their own schema matching + USER_DEFINED + } + diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java index 30c26a3c..97bd303c 100644 --- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java +++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java @@ -32,11 +32,15 @@ public abstract class Column { */ Boolean nullable; + /** + * By default columns are empty name and are nullable + */ public Column() { + this("", true); } public Column(String name) { - setName(name); + this(name, true); } public Column(String name, Boolean nullable) { diff --git a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java index ab5bbd46..b652b323 100644 --- a/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java +++ b/common/src/test/java/org/apache/sqoop/json/util/TestSchemaSerialization.java @@ -172,6 +172,6 @@ protected Schema transfer(Schema schema) { String transferredString = extractJson.toJSONString(); JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString); - return SchemaSerialization.restoreSchemna(restoreJson); + return SchemaSerialization.restoreSchema(restoreJson); } } diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java index 776359a0..5f091deb 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java @@ -81,7 +81,7 @@ public void testQuery() throws Exception { Extractor extractor = new GenericJdbcExtractor(); DummyWriter writer = new DummyWriter(); - ExtractorContext extractorContext = new ExtractorContext(context, writer, null); + ExtractorContext extractorContext = new ExtractorContext(context, writer); partition = new GenericJdbcPartition(); partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); @@ -115,7 +115,7 @@ public void testSubquery() throws Exception { Extractor extractor = new GenericJdbcExtractor(); DummyWriter writer = new DummyWriter(); - ExtractorContext extractorContext = new ExtractorContext(context, writer, null); + ExtractorContext extractorContext = new ExtractorContext(context, writer); partition = new GenericJdbcPartition(); partition.setConditions("-50 <= ICOL AND ICOL < -16"); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java index 70833a0f..cd5350ec 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -19,6 +19,7 @@ package org.apache.sqoop.connector.hdfs; import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; @@ -87,7 +88,9 @@ public Class getJobConfigurationClass(Direction jobType) { case TO: return ToJobConfiguration.class; default: - return null; + throw new SqoopException( + HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0006, + String.valueOf(jobType)); } } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java index 8a095d26..71f0a032 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java @@ -31,7 +31,8 @@ public enum HdfsConnectorError implements ErrorCode{ /** The system was unable to instantiate the specified class. */ GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"), /** Error occurs during loader run */ - GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run") + GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"), + GENERIC_HDFS_CONNECTOR_0006("Unknown job type") ; diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java index 923f9048..c2dc1a5c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java @@ -39,6 +39,6 @@ public void initialize(InitializerContext context, Object linkConf, Object jobCo @Override public Schema getSchema(InitializerContext context, Object linkConf, Object jobConf) { - return null; + return new Schema("HDFS file"); } } diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 7942d590..c6d2f901 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -129,8 +129,10 @@ public void writeStringRecord(String text) { public void writeRecord(Object obj) { throw new AssertionError("Should not be writing object."); } - }, null); + }); + LinkConfiguration connConf = new LinkConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index df5cb9c1..2a492217 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -22,7 +22,12 @@ import org.apache.commons.lang.StringUtils; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.matcher.AbstractMatcher; +import org.apache.sqoop.connector.idf.matcher.LocationMatcher; +import org.apache.sqoop.connector.idf.matcher.NameMatcher; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.SchemaError; +import org.apache.sqoop.schema.SchemaMatchOption; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -36,6 +41,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.regex.Matcher; public class CSVIntermediateDataFormat extends IntermediateDataFormat { @@ -65,7 +71,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat { private final List stringFieldIndices = new ArrayList(); private final List byteFieldIndices = new ArrayList(); - private Schema schema; + private Schema fromSchema; + private Schema toSchema; /** * {@inheritDoc} @@ -87,19 +94,11 @@ public void setTextData(String text) { * {@inheritDoc} */ @Override - public Schema getSchema() { - return schema; - } - - /** - * {@inheritDoc} - */ - @Override - public void setSchema(Schema schema) { + public void setFromSchema(Schema schema) { if(schema == null) { return; } - this.schema = schema; + this.fromSchema = schema; List columns = schema.getColumns(); int i = 0; for(Column col : columns) { @@ -112,6 +111,19 @@ public void setSchema(Schema schema) { } } + /** + * {@inheritDoc} + */ + @Override + public void setToSchema(Schema schema) { + if(schema == null) { + return; + } + this.toSchema = schema; + } + + + /** * Custom CSV parser that honors quoting and escaped quotes. * All other escaping is handled elsewhere. @@ -168,6 +180,19 @@ private String[] getFields() { /** * {@inheritDoc} + * + * The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema. + * We currently have 3 methods of matching fields in one schema to another: + * - by location + * - by name + * - user-defined matching + * + * If one schema exists (either to or from) and the other is empty + * We'll match fields based on location. + * If both schemas exist, we'll match names of fields. + * + * In the future, we may want to let users choose the method + * Currently nothing is implemented for user-defined matching */ @Override public Object[] getObjectData() { @@ -176,51 +201,53 @@ public Object[] getObjectData() { return null; } - if (schema == null) { + if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); } - if (fields.length != schema.getColumns().size()) { - throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, - "The data " + getTextData() + " has the wrong number of fields."); - } + AbstractMatcher matcher = getMatcher(fromSchema,toSchema); + String[] outFields = matcher.getMatchingData(fields, fromSchema, toSchema); + Object[] out = new Object[outFields.length]; - Object[] out = new Object[fields.length]; - Column[] cols = schema.getColumns().toArray(new Column[fields.length]); - for (int i = 0; i < fields.length; i++) { - Type colType = cols[i].getType(); - //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method - if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) { + int i = 0; + + // After getting back the data in order that matches the output schema + // We need to un-do the CSV escaping + for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) { + Type colType = col.getType(); + if (outFields[i] == null) { out[i] = null; continue; } if (colType == Type.TEXT) { - out[i] = unescapeStrings(fields[i]); + out[i] = unescapeStrings(outFields[i]); } else if (colType == Type.BINARY) { - out[i] = unescapeByteArray(fields[i]); + out[i] = unescapeByteArray(outFields[i]); } else if (colType == Type.FIXED_POINT) { - Long byteSize = ((FixedPoint) cols[i]).getByteSize(); + Long byteSize = ((FixedPoint) col).getByteSize(); if (byteSize != null && byteSize <= Integer.SIZE) { - out[i] = Integer.valueOf(fields[i]); + out[i] = Integer.valueOf(outFields[i]); } else { - out[i] = Long.valueOf(fields[i]); + out[i] = Long.valueOf(outFields[i]); } } else if (colType == Type.FLOATING_POINT) { - Long byteSize = ((FloatingPoint) cols[i]).getByteSize(); + Long byteSize = ((FloatingPoint) col).getByteSize(); if (byteSize != null && byteSize <= Float.SIZE) { - out[i] = Float.valueOf(fields[i]); + out[i] = Float.valueOf(outFields[i]); } else { - out[i] = Double.valueOf(fields[i]); + out[i] = Double.valueOf(outFields[i]); } } else if (colType == Type.DECIMAL) { - out[i] = new BigDecimal(fields[i]); + out[i] = new BigDecimal(outFields[i]); } else { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType); } + i++; } return out; } + /** * {@inheritDoc} */ @@ -353,4 +380,15 @@ private byte[] unescapeByteArray(String orig) { public String toString() { return data; } + + private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) { + if (toSchema.isEmpty() || fromSchema.isEmpty()) { + return new LocationMatcher(); + } else { + return new NameMatcher(); + } + + + } + } diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 74b95184..d98b779d 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -81,7 +81,8 @@ public T getData() { /** * Get one row of data as CSV. * - * @return - String representing the data in CSV + * @return - String representing the data in CSV, according to the "FROM" schema. + * No schema conversion is done on textData, to keep it as "high performance" option. */ public abstract String getTextData(); @@ -95,6 +96,7 @@ public T getData() { * Get one row of data as an Object array. * * @return - String representing the data as an Object array + * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema */ public abstract Object[] getObjectData(); @@ -105,18 +107,18 @@ public T getData() { public abstract void setObjectData(Object[] data); /** - * Set the schema to be used. + * Set the schema for reading data. * - * @param schema - the schema to be used + * @param schema - the schema used for reading data */ - public abstract void setSchema(Schema schema); + public abstract void setFromSchema(Schema schema); /** - * Get the schema of the data. + * Set the schema for writing data. * - * @return - The schema of the data. + * @param schema - the schema used for writing data */ - public abstract Schema getSchema(); + public abstract void setToSchema(Schema schema); /** * Serialize the fields of this object to out. diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java new file mode 100644 index 00000000..e6b23169 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/AbstractMatcher.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.idf.matcher; + +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; + +public abstract class AbstractMatcher { + + //NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats + //NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS. + + /** + * + * @param fields + * @param fromSchema + * @param toSchema + * @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema. + * Right not "converted" means re-ordering if needed and handling nulls. + */ + abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema); + + /*** + * + * @param fromSchema + * @param toSchema + * @return return a schema with which to read the output data + * This always returns the toSchema (since this is used when getting output data), unless its empty + */ + public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) { + if (toSchema.isEmpty()) { + return fromSchema; + } else { + return toSchema; + } + + } + + protected boolean isNull(String value) { + if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) { + return true; + } + return false; + } + + +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java new file mode 100644 index 00000000..938a5df0 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/LocationMatcher.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.idf.matcher; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormatError; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.SchemaError; +import org.apache.sqoop.schema.SchemaMatchOption; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Type; + +import java.math.BigDecimal; +import java.util.Iterator; + + +/** + * Convert data according to FROM schema to data according to TO schema + * This is done based on column location + * So data in first column in FROM goes into first column in TO, etc + * If TO schema has more fields and they are "nullable", the value will be set to null + * If TO schema has extra non-null fields, we'll throw an exception + */ +public class LocationMatcher extends AbstractMatcher { + + public static final Logger LOG = Logger.getLogger(LocationMatcher.class); + @Override + public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) { + + String[] out = new String[toSchema.getColumns().size()]; + + int i = 0; + + if (toSchema.isEmpty()) { + // If there's no destination schema, no need to convert anything + // Just use the original data + return fields; + } + + for (Column col: toSchema.getColumns()) + { + if (i < fields.length) { + if (isNull(fields[i])) { + out[i] = null; + } else { + out[i] = fields[i]; + } + } + // We ran out of fields before we ran out of schema + else { + if (!col.getNullable()) { + throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null"); + } else { + LOG.warn("Column " + col + " has no matching source column. Will be ignored. "); + out[i] = null; + } + } + i++; + } + return out; + } + + +} diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java new file mode 100644 index 00000000..417c85b0 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/matcher/NameMatcher.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.idf.matcher; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.SchemaError; +import org.apache.sqoop.schema.type.Column; + +import java.util.HashMap; + +public class NameMatcher extends AbstractMatcher { + public static final Logger LOG = Logger.getLogger(NameMatcher.class); + + @Override + public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) { + String[] out = new String[toSchema.getColumns().size()]; + + HashMap colNames = new HashMap(); + + for (Column fromCol: fromSchema.getColumns()) { + colNames.put(fromCol.getName(), fromCol); + } + + int toIndex = 0; + + for (Column toCol: toSchema.getColumns()) { + Column fromCol = colNames.get(toCol.getName()); + + if (fromCol != null) { + int fromIndex = fromSchema.getColumns().indexOf(fromCol); + if (isNull(fields[fromIndex])) { + out[toIndex] = null; + } else { + out[toIndex] = fields[fromIndex]; + } + } else { + //column exists in TO schema but not in FROM schema + if (toCol.getNullable() == false) { + throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null"); + } else { + LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. "); + out[toIndex] = null; + } + } + + toIndex++; + } + + return out; + } + +} diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 8c83a71e..3954039b 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -20,6 +20,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.SchemaMatchOption; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.Text; @@ -39,6 +40,8 @@ public class TestCSVIntermediateDataFormat { private IntermediateDataFormat data; + private Schema emptySchema = new Schema("empty"); + @Before public void setUp() { data = new CSVIntermediateDataFormat(); @@ -70,7 +73,7 @@ public void testNullStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + data.setFromSchema(schema); data.setTextData(null); Object[] out = data.getObjectData(); @@ -87,7 +90,7 @@ public void testEmptyStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + data.setFromSchema(schema); data.setTextData(""); data.getObjectData(); @@ -106,7 +109,9 @@ public void testStringInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + + data.setFromSchema(schema); + data.setToSchema(emptySchema); data.setTextData(testData); Object[] out = data.getObjectData(); @@ -129,7 +134,7 @@ public void testObjectInStringOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + data.setFromSchema(schema); byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; Object[] in = new Object[6]; @@ -159,7 +164,8 @@ public void testObjectInObjectOut() { .addColumn(new Text("4")) .addColumn(new Binary("5")) .addColumn(new Text("6")); - data.setSchema(schema); + data.setFromSchema(schema); + data.setToSchema(emptySchema); Object[] in = new Object[6]; in[0] = new Long(10); @@ -181,7 +187,9 @@ public void testObjectInObjectOut() { public void testStringFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Text("1")); - data.setSchema(schema); + + data.setFromSchema(schema); + data.setToSchema(emptySchema); char[] allCharArr = new char[256]; for(int i = 0; i < allCharArr.length; ++i) { @@ -204,7 +212,8 @@ public void testStringFullRangeOfCharacters() { public void testByteArrayFullRangeOfCharacters() { Schema schema = new Schema("test"); schema.addColumn(new Binary("1")); - data.setSchema(schema); + data.setFromSchema(schema); + data.setToSchema(emptySchema); byte[] allCharByteArr = new byte[256]; for(int i = 0; i < allCharByteArr.length; ++i) { @@ -219,4 +228,132 @@ public void testByteArrayFullRangeOfCharacters() { data.setObjectData(in); assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); } + + /** + * Note that we don't have an EmptyTo matching test + * Because most tests above have empty "to" schema + */ + @Test + public void testMatchingEmptyFrom() { + + data.setFromSchema(emptySchema); + + Schema toSchema = new Schema("To"); + toSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")); + data.setToSchema(toSchema); + + Object[] in = new Object[2]; + in[0] = new Long(10); + in[1] = new Long(34); + + Object[] out = new Object[2]; + out[0] = new Long(10); + out[1] = new Long(34); + + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(out, data.getObjectData())); + } + + @Test(expected=SqoopException.class) + public void testMatchingTwoEmptySchema() { + data.setFromSchema(emptySchema); + data.setToSchema(emptySchema); + + Object[] in = new Object[2]; + in[0] = new Long(10); + in[1] = new Long(34); + + data.setObjectData(in); + + data.getObjectData(); + } + + @Test + public void testMatchingFewerFromColumns(){ + Schema fromSchema = new Schema("From"); + fromSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")); + data.setFromSchema(fromSchema); + + Schema toSchema = new Schema("To"); + toSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")); + data.setToSchema(toSchema); + + Object[] in = new Object[2]; + in[0] = new Long(10); + in[1] = new Long(34); + + Object[] out = new Object[3]; + out[0] = new Long(10); + out[1] = new Long(34); + out[2] = null; + + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(out, data.getObjectData())); + } + + @Test + public void testMatchingFewerToColumns(){ + Schema fromSchema = new Schema("From"); + fromSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new FixedPoint("3")); + data.setFromSchema(fromSchema); + + Schema toSchema = new Schema("To"); + toSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")); + data.setToSchema(toSchema); + + Object[] in = new Object[3]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = new Long(50); + + Object[] out = new Object[2]; + out[0] = new Long(10); + out[1] = new Long(34); + + + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(out, data.getObjectData())); + } + + + @Test + public void testWithSomeNonMatchingFields(){ + + Schema fromSchema = new Schema("From"); + fromSchema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new FixedPoint("3")); + data.setFromSchema(fromSchema); + + Schema toSchema = new Schema("From"); + toSchema.addColumn(new FixedPoint("2")) + .addColumn(new FixedPoint("3")) + .addColumn(new FixedPoint("4")); + data.setToSchema(toSchema); + + Object[] in = new Object[3]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = new Long(50); + + Object[] out = new Object[3]; + out[0] = new Long(34); + out[1] = new Long(50); + out[2] = null; + + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(out, data.getObjectData())); + } + } diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 277c6be7..e91c4369 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -374,13 +374,10 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) { Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM); Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO); - // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378 - if (fromSchema != null) { - jobRequest.getSummary().setFromSchema(fromSchema); - } - else { - jobRequest.getSummary().setFromSchema(toSchema); - } + + jobRequest.getSummary().setFromSchema(fromSchema); + jobRequest.getSummary().setToSchema(toSchema); + LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); return jobRequest; } @@ -458,7 +455,7 @@ private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getConnectorJobConfig(direction)); - // TODO(Abe): Alter behavior of Schema here. + return initializer.getSchema(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getConnectorJobConfig(direction)); @@ -709,4 +706,4 @@ public void run() { LOG.info("Ending submission manager update thread"); } } -} \ No newline at end of file +} diff --git a/docs/src/site/sphinx/Tools.rst b/docs/src/site/sphinx/Tools.rst index 84cbd5f4..6d36b274 100644 --- a/docs/src/site/sphinx/Tools.rst +++ b/docs/src/site/sphinx/Tools.rst @@ -23,8 +23,7 @@ Tools are server commands that administrators can execute on the Sqoop server ma In order to perform the maintenance task each tool is suppose to do, they need to be executed in exactly the same environment as the main Sqoop server. The tool binary will take care of setting up the ``CLASSPATH`` and other environmental variables that might be required. However it's up to the administrator himself to run the tool under the same user as is used for the server. This is usually configured automatically for various Hadoop distributions (such as Apache Bigtop). -.. note:: Running tools under a different user such as ``root`` might prevent Sqoop Server from running correctly. - +.. note:: Running tools while the Sqoop Server is also running is not recommended as it might lead to a data corruption and service disruption. List of available tools: * verify diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index 2ed06a8c..b5338372 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -82,13 +82,13 @@ public final class ConfigurationUtils { private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB); - private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; + private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; - private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR); + private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM); - private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; + private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; - private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR); + private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO); /** @@ -162,6 +162,27 @@ public static void setFrameworkJobConfig(Job job, Object obj) { job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes()); } + /** + * Persist Connector generated schema. + * + * @param type Direction of schema we are persisting + * @param job MapReduce Job object + * @param schema Schema + */ + public static void setConnectorSchema(Direction type, Job job, Schema schema) { + if(schema != null) { + String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString(); + switch (type) { + case FROM: + job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes()); + return; + case TO: + job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes()); + return; + } + } + } + /** * Retrieve Connector configuration object for connection. * @@ -226,23 +247,7 @@ public static Object getFrameworkJobConfig(Configuration configuration) { return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY); } - /** - * Persist From Connector generated schema. - * - * @param job MapReduce Job object - * @param schema Schema - */ - public static void setConnectorSchema(Direction type, Job job, Schema schema) { - if(schema != null) { - switch (type) { - case FROM: - job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - case TO: - job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - } - } - } /** * Retrieve Connector generated schema. @@ -253,10 +258,10 @@ public static void setConnectorSchema(Direction type, Job job, Schema schema) { public static Schema getConnectorSchema(Direction type, Configuration configuration) { switch (type) { case FROM: - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY)); + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY)); case TO: - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY)); + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY)); } return null; @@ -274,7 +279,9 @@ private static Schema getSchemaFromBytes(byte[] bytes) { if(bytes == null) { return null; } - return SchemaSerialization.restoreSchemna((JSONObject) JSONValue.parse(new String(bytes))); + + JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes)); + return SchemaSerialization.restoreSchema(jsonSchema); } /** diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 6680f607..8c88d527 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -64,20 +64,18 @@ public void run(Context context) throws IOException, InterruptedException { String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); - // TODO(Abe/Gwen): Change to conditional choosing between Connector schemas. - Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); - if (schema == null) { - schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); - } - if (schema == null) { - LOG.info("setting an empty schema"); - } + + Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT); dataFormat = (IntermediateDataFormat) ClassUtils .instantiate(intermediateDataFormatName); - dataFormat.setSchema(schema); + + dataFormat.setFromSchema(fromSchema); + dataFormat.setToSchema(toSchema); + dataOut = new SqoopWritable(); // Objects that should be passed to the Executor execution @@ -86,7 +84,7 @@ public void run(Context context) throws IOException, InterruptedException { Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); - ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema); + ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context)); try { LOG.info("Starting progress service"); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index eea0623a..941b31d7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -73,12 +73,11 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { dataFormat = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); - if (schema==null) { - schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); - } + Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); + dataFormat.setFromSchema(fromSchema); - dataFormat.setSchema(schema); + Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); + dataFormat.setToSchema(toSchema); } public RecordWriter getRecordWriter() { @@ -231,10 +230,8 @@ public void run() { Schema schema = null; if (!isTest) { - // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between Connector schemas. - // @TODO(Abe): Maybe use TO schema? - schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + // Using the TO schema since the IDF returns data in TO schema + schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT); configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java index 60acfb6e..0e2a38d4 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java +++ b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java @@ -68,12 +68,12 @@ public static void displayHeader(MSubmission submission) { } } - if(isVerbose() && submission.getFromSchema() != null) { + if(isVerbose() && submission.getFromSchema() != null && !submission.getFromSchema().isEmpty() ) { print(resourceString(Constants.RES_FROM_SCHEMA)+": "); println(submission.getFromSchema()); } - if(isVerbose() && submission.getToSchema() != null) { + if(isVerbose() && submission.getToSchema() != null && !submission.getToSchema().isEmpty() ) { print(resourceString(Constants.RES_TO_SCHEMA)+": "); println(submission.getToSchema()); } diff --git a/shell/src/main/resources/shell-resource.properties b/shell/src/main/resources/shell-resource.properties index b59bd814..c0f86f77 100644 --- a/shell/src/main/resources/shell-resource.properties +++ b/shell/src/main/resources/shell-resource.properties @@ -227,5 +227,6 @@ submission.progress_not_available = Progress is not available submission.counters = Counters submission.executed_success = Job executed successfully submission.server_url = Server URL -submission.from_schema = From schema -submission.to_schema = To schema \ No newline at end of file +submission.from_schema = Source Connector schema +submission.to_schema = Target Connector schema + diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index fe92ac4f..25255ae7 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -206,11 +206,13 @@ public boolean submit(JobRequest mrJobRequest) { ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getConnectorJobConfig(Direction.FROM)); ConfigurationUtils.setConnectorConnectionConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO)); ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getConnectorJobConfig(Direction.TO)); + ConfigurationUtils.setFrameworkConnectionConfig(Direction.FROM, job, request.getFrameworkLinkConfig(Direction.FROM)); ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkLinkConfig(Direction.TO)); ConfigurationUtils.setFrameworkJobConfig(job, request.getFrameworkJobConfig()); - // @TODO(Abe): Persist TO schema. + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); + ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName()); @@ -413,4 +415,5 @@ private boolean isLocal() { return "local".equals(globalConfiguration.get("mapreduce.jobtracker.address")) || "local".equals(globalConfiguration.get("mapred.job.tracker")); } -} \ No newline at end of file + +}