5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 12:52:24 +08:00

SQOOP-1378: Sqoop2: From/To: Refactor schema

This patch also changes the tools documentation.
This commit is contained in:
Gwen Shapira 2014-09-25 15:39:43 -07:00 committed by Abraham Elmahrek
parent d0153621d3
commit 2c20d920f4
29 changed files with 584 additions and 138 deletions

View File

@ -30,12 +30,10 @@ public class ExtractorContext extends TransferableContext {
private DataWriter writer;
private Schema schema;
public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) {
public ExtractorContext(ImmutableContext context, DataWriter writer) {
super(context);
this.writer = writer;
this.schema = schema;
}
/**
@ -47,12 +45,5 @@ public DataWriter getDataWriter() {
return writer;
}
/**
* Return schema associated with this step.
*
* @return
*/
public Schema getSchema() {
return schema;
}
}

View File

@ -48,7 +48,7 @@ public JSONObject extract(boolean skipSensitive) {
@Override
public void restore(JSONObject jsonObject) {
schema = SchemaSerialization.restoreSchemna(jsonObject);
schema = SchemaSerialization.restoreSchema(jsonObject);
}
}

View File

@ -32,7 +32,7 @@
import java.util.Set;
import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchemna;
import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
/**
*
@ -188,11 +188,12 @@ public void restore(JSONObject json) {
if(object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
}
if(object.containsKey(FROM_SCHEMA)) {
submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA)));
submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA)));
}
if(object.containsKey(TO_SCHEMA)) {
submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA)));
submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA)));
}
this.submissions.add(submission);

View File

@ -79,7 +79,7 @@ public static JSONObject extractSchema(Schema schema) {
return object;
}
public static Schema restoreSchemna(JSONObject jsonObject) {
public static Schema restoreSchema(JSONObject jsonObject) {
String name = (String)jsonObject.get(NAME);
String note = (String)jsonObject.get(NOTE);
java.util.Date date = new java.util.Date((Long)jsonObject.get(CREATION_DATE));

View File

@ -102,17 +102,14 @@ public class MSubmission extends MAccountableEntity {
/**
* Schema for the FROM part of the job submission
*
* This property is required.
* This property is required, but can be empty.
*/
Schema fromSchema;
/**
* Schema for the TO part of the job submission
* Optional schema that reported by the underlying I/O implementation. Please
* note that this property might be empty and in such case use the FROM schema
* on the TO side.
*
* This property is optional.
* This property is required, but can be empty.
*/
Schema toSchema;
@ -224,16 +221,16 @@ public Schema getFromSchema() {
return fromSchema;
}
public void setFromSchema(Schema connectorSchema) {
this.fromSchema = connectorSchema;
public void setFromSchema(Schema fromSchema) {
this.fromSchema = fromSchema;
}
public Schema getToSchema() {
return toSchema;
}
public void setToSchema(Schema hioSchema) {
this.toSchema = hioSchema;
public void setToSchema(Schema toSchema) {
this.toSchema = toSchema;
}
@Override

View File

@ -77,7 +77,7 @@ public Schema(String name) {
* same name will lead to an exception being thrown.
*
* @param column Column that should be added to the schema at the end.
* @return
* @return a reference to this object
*/
public Schema addColumn(Column column) {
if(column.getName() == null) {
@ -121,6 +121,15 @@ public List<Column> getColumns() {
return columns;
}
public boolean isEmpty() {
if (columns.size()==0) {
return true;
} else {
return false;
}
}
public String toString() {
return new StringBuilder("Schema{")
.append("name=").append(name).append("")

View File

@ -30,6 +30,12 @@ public enum SchemaError implements ErrorCode {
SCHEMA_0002("Duplicate column name"),
SCHEMA_0003("Source and Target schemas don't match"),
SCHEMA_0004("Non-null target column has no matching source column"),
SCHEMA_0005("No matching method available for source and target schemas")
;
private final String message;

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.schema;
/**
* The order of the matching options here indicates an order of preference
* if it is possible to use both NAME and LOCATION matching options, we will prefer NAME
*
* NAME - match columns in FROM and TO schemas by column name. Data from column "hello"
* will be written to a column named "hello". If TO schema doesn't have a column with
* identical name, the column will be skipped.
*
* LOCATION - match columns in FROM and TO schemas by the column location.
* Data from first column goes into first column in TO link.
* If FROM link has more columns than TO, the extra columns will be skipped.
*
* USER_DEFINED - not implemented yet.
*/
public enum SchemaMatchOption {
NAME,
LOCATION,
//TODO: SQOOP-1546 - SQOOP2: Allow users to define their own schema matching
USER_DEFINED
}

View File

@ -32,11 +32,15 @@ public abstract class Column {
*/
Boolean nullable;
/**
* By default columns are empty name and are nullable
*/
public Column() {
this("", true);
}
public Column(String name) {
setName(name);
this(name, true);
}
public Column(String name, Boolean nullable) {

View File

@ -172,6 +172,6 @@ protected Schema transfer(Schema schema) {
String transferredString = extractJson.toJSONString();
JSONObject restoreJson = (JSONObject) JSONValue.parse(transferredString);
return SchemaSerialization.restoreSchemna(restoreJson);
return SchemaSerialization.restoreSchema(restoreJson);
}
}

View File

@ -81,7 +81,7 @@ public void testQuery() throws Exception {
Extractor extractor = new GenericJdbcExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
ExtractorContext extractorContext = new ExtractorContext(context, writer);
partition = new GenericJdbcPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@ -115,7 +115,7 @@ public void testSubquery() throws Exception {
Extractor extractor = new GenericJdbcExtractor();
DummyWriter writer = new DummyWriter();
ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
ExtractorContext extractorContext = new ExtractorContext(context, writer);
partition = new GenericJdbcPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");

View File

@ -19,6 +19,7 @@
package org.apache.sqoop.connector.hdfs;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
@ -87,7 +88,9 @@ public Class getJobConfigurationClass(Direction jobType) {
case TO:
return ToJobConfiguration.class;
default:
return null;
throw new SqoopException(
HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0006,
String.valueOf(jobType));
}
}

View File

@ -31,7 +31,8 @@ public enum HdfsConnectorError implements ErrorCode{
/** The system was unable to instantiate the specified class. */
GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
/** Error occurs during loader run */
GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run")
GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"),
GENERIC_HDFS_CONNECTOR_0006("Unknown job type")
;

View File

@ -39,6 +39,6 @@ public void initialize(InitializerContext context, Object linkConf, Object jobCo
@Override
public Schema getSchema(InitializerContext context, Object linkConf, Object jobConf) {
return null;
return new Schema("HDFS file");
}
}

View File

@ -129,8 +129,10 @@ public void writeStringRecord(String text) {
public void writeRecord(Object obj) {
throw new AssertionError("Should not be writing object.");
}
}, null);
});
LinkConfiguration connConf = new LinkConfiguration();
FromJobConfiguration jobConf = new FromJobConfiguration();
HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));

View File

@ -22,7 +22,12 @@
import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.matcher.AbstractMatcher;
import org.apache.sqoop.connector.idf.matcher.LocationMatcher;
import org.apache.sqoop.connector.idf.matcher.NameMatcher;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
@ -36,6 +41,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@ -65,7 +71,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
private Schema schema;
private Schema fromSchema;
private Schema toSchema;
/**
* {@inheritDoc}
@ -87,19 +94,11 @@ public void setTextData(String text) {
* {@inheritDoc}
*/
@Override
public Schema getSchema() {
return schema;
}
/**
* {@inheritDoc}
*/
@Override
public void setSchema(Schema schema) {
public void setFromSchema(Schema schema) {
if(schema == null) {
return;
}
this.schema = schema;
this.fromSchema = schema;
List<Column> columns = schema.getColumns();
int i = 0;
for(Column col : columns) {
@ -112,6 +111,19 @@ public void setSchema(Schema schema) {
}
}
/**
* {@inheritDoc}
*/
@Override
public void setToSchema(Schema schema) {
if(schema == null) {
return;
}
this.toSchema = schema;
}
/**
* Custom CSV parser that honors quoting and escaped quotes.
* All other escaping is handled elsewhere.
@ -168,6 +180,19 @@ private String[] getFields() {
/**
* {@inheritDoc}
*
* The CSV data is ordered according to the fromSchema. We "translate" it to the TO schema.
* We currently have 3 methods of matching fields in one schema to another:
* - by location
* - by name
* - user-defined matching
*
* If one schema exists (either to or from) and the other is empty
* We'll match fields based on location.
* If both schemas exist, we'll match names of fields.
*
* In the future, we may want to let users choose the method
* Currently nothing is implemented for user-defined matching
*/
@Override
public Object[] getObjectData() {
@ -176,51 +201,53 @@ public Object[] getObjectData() {
return null;
}
if (schema == null) {
if (fromSchema == null || toSchema == null || (toSchema.isEmpty() && fromSchema.isEmpty())) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
}
if (fields.length != schema.getColumns().size()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
"The data " + getTextData() + " has the wrong number of fields.");
}
AbstractMatcher matcher = getMatcher(fromSchema,toSchema);
String[] outFields = matcher.getMatchingData(fields, fromSchema, toSchema);
Object[] out = new Object[outFields.length];
Object[] out = new Object[fields.length];
Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
for (int i = 0; i < fields.length; i++) {
Type colType = cols[i].getType();
//TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method
if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) {
int i = 0;
// After getting back the data in order that matches the output schema
// We need to un-do the CSV escaping
for (Column col: matcher.getMatchingSchema(fromSchema,toSchema).getColumns()) {
Type colType = col.getType();
if (outFields[i] == null) {
out[i] = null;
continue;
}
if (colType == Type.TEXT) {
out[i] = unescapeStrings(fields[i]);
out[i] = unescapeStrings(outFields[i]);
} else if (colType == Type.BINARY) {
out[i] = unescapeByteArray(fields[i]);
out[i] = unescapeByteArray(outFields[i]);
} else if (colType == Type.FIXED_POINT) {
Long byteSize = ((FixedPoint) cols[i]).getByteSize();
Long byteSize = ((FixedPoint) col).getByteSize();
if (byteSize != null && byteSize <= Integer.SIZE) {
out[i] = Integer.valueOf(fields[i]);
out[i] = Integer.valueOf(outFields[i]);
} else {
out[i] = Long.valueOf(fields[i]);
out[i] = Long.valueOf(outFields[i]);
}
} else if (colType == Type.FLOATING_POINT) {
Long byteSize = ((FloatingPoint) cols[i]).getByteSize();
Long byteSize = ((FloatingPoint) col).getByteSize();
if (byteSize != null && byteSize <= Float.SIZE) {
out[i] = Float.valueOf(fields[i]);
out[i] = Float.valueOf(outFields[i]);
} else {
out[i] = Double.valueOf(fields[i]);
out[i] = Double.valueOf(outFields[i]);
}
} else if (colType == Type.DECIMAL) {
out[i] = new BigDecimal(fields[i]);
out[i] = new BigDecimal(outFields[i]);
} else {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
}
i++;
}
return out;
}
/**
* {@inheritDoc}
*/
@ -353,4 +380,15 @@ private byte[] unescapeByteArray(String orig) {
public String toString() {
return data;
}
private AbstractMatcher getMatcher(Schema fromSchema, Schema toSchema) {
if (toSchema.isEmpty() || fromSchema.isEmpty()) {
return new LocationMatcher();
} else {
return new NameMatcher();
}
}
}

View File

@ -81,7 +81,8 @@ public T getData() {
/**
* Get one row of data as CSV.
*
* @return - String representing the data in CSV
* @return - String representing the data in CSV, according to the "FROM" schema.
* No schema conversion is done on textData, to keep it as "high performance" option.
*/
public abstract String getTextData();
@ -95,6 +96,7 @@ public T getData() {
* Get one row of data as an Object array.
*
* @return - String representing the data as an Object array
* If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
*/
public abstract Object[] getObjectData();
@ -105,18 +107,18 @@ public T getData() {
public abstract void setObjectData(Object[] data);
/**
* Set the schema to be used.
* Set the schema for reading data.
*
* @param schema - the schema to be used
* @param schema - the schema used for reading data
*/
public abstract void setSchema(Schema schema);
public abstract void setFromSchema(Schema schema);
/**
* Get the schema of the data.
* Set the schema for writing data.
*
* @return - The schema of the data.
* @param schema - the schema used for writing data
*/
public abstract Schema getSchema();
public abstract void setToSchema(Schema schema);
/**
* Serialize the fields of this object to <code>out</code>.

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
public abstract class AbstractMatcher {
//NOTE: This is currently tightly coupled to the CSV idf. We'll need refactoring after adding additional formats
//NOTE: There's is a very blatant special case of empty schemas that seem to apply only to HDFS.
/**
*
* @param fields
* @param fromSchema
* @param toSchema
* @return Return the data in "fields" converted from matching the fromSchema to matching the toSchema.
* Right not "converted" means re-ordering if needed and handling nulls.
*/
abstract public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema);
/***
*
* @param fromSchema
* @param toSchema
* @return return a schema with which to read the output data
* This always returns the toSchema (since this is used when getting output data), unless its empty
*/
public Schema getMatchingSchema(Schema fromSchema, Schema toSchema) {
if (toSchema.isEmpty()) {
return fromSchema;
} else {
return toSchema;
}
}
protected boolean isNull(String value) {
if (value.equals("NULL") || value.equals("null") || value.equals("'null'") || value.isEmpty()) {
return true;
}
return false;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Type;
import java.math.BigDecimal;
import java.util.Iterator;
/**
* Convert data according to FROM schema to data according to TO schema
* This is done based on column location
* So data in first column in FROM goes into first column in TO, etc
* If TO schema has more fields and they are "nullable", the value will be set to null
* If TO schema has extra non-null fields, we'll throw an exception
*/
public class LocationMatcher extends AbstractMatcher {
public static final Logger LOG = Logger.getLogger(LocationMatcher.class);
@Override
public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
String[] out = new String[toSchema.getColumns().size()];
int i = 0;
if (toSchema.isEmpty()) {
// If there's no destination schema, no need to convert anything
// Just use the original data
return fields;
}
for (Column col: toSchema.getColumns())
{
if (i < fields.length) {
if (isNull(fields[i])) {
out[i] = null;
} else {
out[i] = fields[i];
}
}
// We ran out of fields before we ran out of schema
else {
if (!col.getNullable()) {
throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + col + " didn't match with any source column and cannot be null");
} else {
LOG.warn("Column " + col + " has no matching source column. Will be ignored. ");
out[i] = null;
}
}
i++;
}
return out;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.idf.matcher;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaError;
import org.apache.sqoop.schema.type.Column;
import java.util.HashMap;
public class NameMatcher extends AbstractMatcher {
public static final Logger LOG = Logger.getLogger(NameMatcher.class);
@Override
public String[] getMatchingData(String[] fields, Schema fromSchema, Schema toSchema) {
String[] out = new String[toSchema.getColumns().size()];
HashMap<String,Column> colNames = new HashMap<String, Column>();
for (Column fromCol: fromSchema.getColumns()) {
colNames.put(fromCol.getName(), fromCol);
}
int toIndex = 0;
for (Column toCol: toSchema.getColumns()) {
Column fromCol = colNames.get(toCol.getName());
if (fromCol != null) {
int fromIndex = fromSchema.getColumns().indexOf(fromCol);
if (isNull(fields[fromIndex])) {
out[toIndex] = null;
} else {
out[toIndex] = fields[fromIndex];
}
} else {
//column exists in TO schema but not in FROM schema
if (toCol.getNullable() == false) {
throw new SqoopException(SchemaError.SCHEMA_0004,"target column " + toCol + " didn't match with any source column and cannot be null");
} else {
LOG.warn("Column " + toCol + " has no matching source column. Will be ignored. ");
out[toIndex] = null;
}
}
toIndex++;
}
return out;
}
}

View File

@ -20,6 +20,7 @@
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.SchemaMatchOption;
import org.apache.sqoop.schema.type.Binary;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.Text;
@ -39,6 +40,8 @@ public class TestCSVIntermediateDataFormat {
private IntermediateDataFormat<?> data;
private Schema emptySchema = new Schema("empty");
@Before
public void setUp() {
data = new CSVIntermediateDataFormat();
@ -70,7 +73,7 @@ public void testNullStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setTextData(null);
Object[] out = data.getObjectData();
@ -87,7 +90,7 @@ public void testEmptyStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setTextData("");
data.getObjectData();
@ -106,7 +109,9 @@ public void testStringInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setToSchema(emptySchema);
data.setTextData(testData);
Object[] out = data.getObjectData();
@ -129,7 +134,7 @@ public void testObjectInStringOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setFromSchema(schema);
byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
Object[] in = new Object[6];
@ -159,7 +164,8 @@ public void testObjectInObjectOut() {
.addColumn(new Text("4"))
.addColumn(new Binary("5"))
.addColumn(new Text("6"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setToSchema(emptySchema);
Object[] in = new Object[6];
in[0] = new Long(10);
@ -181,7 +187,9 @@ public void testObjectInObjectOut() {
public void testStringFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Text("1"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setToSchema(emptySchema);
char[] allCharArr = new char[256];
for(int i = 0; i < allCharArr.length; ++i) {
@ -204,7 +212,8 @@ public void testStringFullRangeOfCharacters() {
public void testByteArrayFullRangeOfCharacters() {
Schema schema = new Schema("test");
schema.addColumn(new Binary("1"));
data.setSchema(schema);
data.setFromSchema(schema);
data.setToSchema(emptySchema);
byte[] allCharByteArr = new byte[256];
for(int i = 0; i < allCharByteArr.length; ++i) {
@ -219,4 +228,132 @@ public void testByteArrayFullRangeOfCharacters() {
data.setObjectData(in);
assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
}
/**
* Note that we don't have an EmptyTo matching test
* Because most tests above have empty "to" schema
*/
@Test
public void testMatchingEmptyFrom() {
data.setFromSchema(emptySchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setToSchema(toSchema);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
Object[] out = new Object[2];
out[0] = new Long(10);
out[1] = new Long(34);
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test(expected=SqoopException.class)
public void testMatchingTwoEmptySchema() {
data.setFromSchema(emptySchema);
data.setToSchema(emptySchema);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
data.setObjectData(in);
data.getObjectData();
}
@Test
public void testMatchingFewerFromColumns(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new Text("3"));
data.setToSchema(toSchema);
Object[] in = new Object[2];
in[0] = new Long(10);
in[1] = new Long(34);
Object[] out = new Object[3];
out[0] = new Long(10);
out[1] = new Long(34);
out[2] = null;
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test
public void testMatchingFewerToColumns(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("To");
toSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"));
data.setToSchema(toSchema);
Object[] in = new Object[3];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = new Long(50);
Object[] out = new Object[2];
out[0] = new Long(10);
out[1] = new Long(34);
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
@Test
public void testWithSomeNonMatchingFields(){
Schema fromSchema = new Schema("From");
fromSchema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"));
data.setFromSchema(fromSchema);
Schema toSchema = new Schema("From");
toSchema.addColumn(new FixedPoint("2"))
.addColumn(new FixedPoint("3"))
.addColumn(new FixedPoint("4"));
data.setToSchema(toSchema);
Object[] in = new Object[3];
in[0] = new Long(10);
in[1] = new Long(34);
in[2] = new Long(50);
Object[] out = new Object[3];
out[0] = new Long(34);
out[1] = new Long(50);
out[2] = null;
data.setObjectData(in);
assertTrue(Arrays.deepEquals(out, data.getObjectData()));
}
}

View File

@ -374,13 +374,10 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
// TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
if (fromSchema != null) {
jobRequest.getSummary().setFromSchema(fromSchema);
}
else {
jobRequest.getSummary().setFromSchema(toSchema);
}
jobRequest.getSummary().setFromSchema(fromSchema);
jobRequest.getSummary().setToSchema(toSchema);
LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
return jobRequest;
}
@ -458,7 +455,7 @@ private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction
initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
jobRequest.getConnectorJobConfig(direction));
// TODO(Abe): Alter behavior of Schema here.
return initializer.getSchema(initializerContext,
jobRequest.getConnectorLinkConfig(direction),
jobRequest.getConnectorJobConfig(direction));
@ -709,4 +706,4 @@ public void run() {
LOG.info("Ending submission manager update thread");
}
}
}
}

View File

@ -23,8 +23,7 @@ Tools are server commands that administrators can execute on the Sqoop server ma
In order to perform the maintenance task each tool is suppose to do, they need to be executed in exactly the same environment as the main Sqoop server. The tool binary will take care of setting up the ``CLASSPATH`` and other environmental variables that might be required. However it's up to the administrator himself to run the tool under the same user as is used for the server. This is usually configured automatically for various Hadoop distributions (such as Apache Bigtop).
.. note:: Running tools under a different user such as ``root`` might prevent Sqoop Server from running correctly.
.. note:: Running tools while the Sqoop Server is also running is not recommended as it might lead to a data corruption and service disruption.
List of available tools:
* verify

View File

@ -82,13 +82,13 @@ public final class ConfigurationUtils {
private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR);
private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
/**
@ -162,6 +162,27 @@ public static void setFrameworkJobConfig(Job job, Object obj) {
job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
}
/**
* Persist Connector generated schema.
*
* @param type Direction of schema we are persisting
* @param job MapReduce Job object
* @param schema Schema
*/
public static void setConnectorSchema(Direction type, Job job, Schema schema) {
if(schema != null) {
String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString();
switch (type) {
case FROM:
job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
return;
case TO:
job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
return;
}
}
}
/**
* Retrieve Connector configuration object for connection.
*
@ -226,23 +247,7 @@ public static Object getFrameworkJobConfig(Configuration configuration) {
return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
}
/**
* Persist From Connector generated schema.
*
* @param job MapReduce Job object
* @param schema Schema
*/
public static void setConnectorSchema(Direction type, Job job, Schema schema) {
if(schema != null) {
switch (type) {
case FROM:
job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
case TO:
job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
}
}
}
/**
* Retrieve Connector generated schema.
@ -253,10 +258,10 @@ public static void setConnectorSchema(Direction type, Job job, Schema schema) {
public static Schema getConnectorSchema(Direction type, Configuration configuration) {
switch (type) {
case FROM:
return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY));
return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
case TO:
return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY));
return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
}
return null;
@ -274,7 +279,9 @@ private static Schema getSchemaFromBytes(byte[] bytes) {
if(bytes == null) {
return null;
}
return SchemaSerialization.restoreSchemna((JSONObject) JSONValue.parse(new String(bytes)));
JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
return SchemaSerialization.restoreSchema(jsonSchema);
}
/**

View File

@ -64,20 +64,18 @@ public void run(Context context) throws IOException, InterruptedException {
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
// TODO(Abe/Gwen): Change to conditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
if (schema == null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
}
if (schema == null) {
LOG.info("setting an empty schema");
}
Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
dataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
dataFormat.setSchema(schema);
dataFormat.setFromSchema(fromSchema);
dataFormat.setToSchema(toSchema);
dataOut = new SqoopWritable();
// Objects that should be passed to the Executor execution
@ -86,7 +84,7 @@ public void run(Context context) throws IOException, InterruptedException {
Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema);
ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context));
try {
LOG.info("Starting progress service");

View File

@ -73,12 +73,11 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
if (schema==null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
}
Schema fromSchema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
dataFormat.setFromSchema(fromSchema);
dataFormat.setSchema(schema);
Schema toSchema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
dataFormat.setToSchema(toSchema);
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
@ -231,10 +230,8 @@ public void run() {
Schema schema = null;
if (!isTest) {
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
// @TODO(Abe): Maybe use TO schema?
schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
// Using the TO schema since the IDF returns data in TO schema
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);

View File

@ -68,12 +68,12 @@ public static void displayHeader(MSubmission submission) {
}
}
if(isVerbose() && submission.getFromSchema() != null) {
if(isVerbose() && submission.getFromSchema() != null && !submission.getFromSchema().isEmpty() ) {
print(resourceString(Constants.RES_FROM_SCHEMA)+": ");
println(submission.getFromSchema());
}
if(isVerbose() && submission.getToSchema() != null) {
if(isVerbose() && submission.getToSchema() != null && !submission.getToSchema().isEmpty() ) {
print(resourceString(Constants.RES_TO_SCHEMA)+": ");
println(submission.getToSchema());
}

View File

@ -227,5 +227,6 @@ submission.progress_not_available = Progress is not available
submission.counters = Counters
submission.executed_success = Job executed successfully
submission.server_url = Server URL
submission.from_schema = From schema
submission.to_schema = To schema
submission.from_schema = Source Connector schema
submission.to_schema = Target Connector schema

View File

@ -206,11 +206,13 @@ public boolean submit(JobRequest mrJobRequest) {
ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getConnectorJobConfig(Direction.FROM));
ConfigurationUtils.setConnectorConnectionConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getConnectorJobConfig(Direction.TO));
ConfigurationUtils.setFrameworkConnectionConfig(Direction.FROM, job, request.getFrameworkLinkConfig(Direction.FROM));
ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkLinkConfig(Direction.TO));
ConfigurationUtils.setFrameworkJobConfig(job, request.getFrameworkJobConfig());
// @TODO(Abe): Persist TO schema.
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());
@ -413,4 +415,5 @@ private boolean isLocal() {
return "local".equals(globalConfiguration.get("mapreduce.jobtracker.address"))
|| "local".equals(globalConfiguration.get("mapred.job.tracker"));
}
}
}