diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java deleted file mode 100644 index e96dc6e2..00000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -public final class FieldTypes { - - public static final int NULL = 0; - - public static final int BOOLEAN = 1; - - public static final int BYTE = 10; - public static final int CHAR = 11; - - public static final int SHORT = 20; - public static final int INT = 21; - public static final int LONG = 22; - - public static final int FLOAT = 50; - public static final int DOUBLE = 51; - - public static final int BIN = 100; - public static final int UTF = 101; - - private FieldTypes() { - // Disable explicit object creation - } -} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java index ed118d2c..05b731ae 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java @@ -18,42 +18,64 @@ */ package org.apache.sqoop.job.io; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.utils.ClassUtils; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class SqoopWritable implements WritableComparable { - private String strData; +public class SqoopWritable implements Configurable, WritableComparable { + private IntermediateDataFormat dataFormat; + private Configuration conf; - public SqoopWritable() {} + public SqoopWritable() { + this(null); + } + + public SqoopWritable(IntermediateDataFormat dataFormat) { + this.dataFormat = dataFormat; + } public void setString(String data) { - strData = data; + this.dataFormat.setTextData(data); } - public String getString() { - return strData; - } + public String getString() { return dataFormat.getTextData(); } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(strData); + out.writeUTF(dataFormat.getTextData()); } @Override - public void readFields(DataInput in) throws IOException { - strData = in.readUTF(); - } + public void readFields(DataInput in) throws IOException { dataFormat.setTextData(in.readUTF()); } @Override - public int compareTo(SqoopWritable o) { - return strData.compareTo(o.getString()); - } + public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); } @Override public String toString() { return getString(); } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + + if (dataFormat == null) { + String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); + this.dataFormat = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); + } + } + + @Override + public Configuration getConf() { + return conf; + } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 887b4bb7..d20c9034 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -30,10 +30,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MRJobConstants; -import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.job.MRExecutionError; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; @@ -63,14 +63,14 @@ public List getSplits(JobContext context) Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); - Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); + Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); - List partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob); + List partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); List splits = new LinkedList(); for (Partition partition : partitions) { LOG.debug("Partition: " + partition); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index e25f404e..664692a2 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -107,13 +107,17 @@ public void run(Context context) throws IOException, InterruptedException { } } + // There are two IDF objects we carry around in memory during the sqoop job execution. + // The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it. + // Before we do the writing to the toDatFormat object we do the matching process to negotiate between + // the two schemas and their corresponding column types before we write the data to the toDataFormat object private class SqoopMapDataWriter extends DataWriter { private Context context; private SqoopWritable writable; public SqoopMapDataWriter(Context context) { this.context = context; - this.writable = new SqoopWritable(); + this.writable = new SqoopWritable(toDataFormat); } @Override @@ -139,10 +143,10 @@ private void writeContent() { if (LOG.isDebugEnabled()) { LOG.debug("Extracted data: " + fromDataFormat.getTextData()); } - - toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) ); - - writable.setString(toDataFormat.getTextData()); + // NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process + // The output of the mappers is finally written to the toDataFormat object after the matching process + // since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat) + toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData())); context.write(writable, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 579101ee..49a66b96 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -63,6 +63,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean isTest = false; private String loaderName; + // NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ this.isTest = isTest; this.loaderName = loaderName; @@ -79,6 +80,7 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); dataFormat = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); + // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat dataFormat.setSchema(matcher.getToSchema()); } @@ -99,6 +101,7 @@ private class SqoopRecordWriter extends RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); try { for (int count = 0; count < 100; count++) { dataFormat.setTextData(String.valueOf(count)); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } } catch (SqoopException ex) { @@ -151,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -161,7 +160,6 @@ public void testSuccessfulContinuousLoader() throws Throwable { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); @@ -173,7 +171,7 @@ public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { builder.append(String.valueOf(count)); @@ -182,7 +180,6 @@ public void testSuccessfulLoader() throws Throwable { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); //Allow writer to complete. @@ -198,7 +195,7 @@ public void testThrowingContinuousLoader() throws Throwable { SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); try { for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -209,7 +206,6 @@ public void testThrowingContinuousLoader() throws Throwable { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null);