5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 19:31:13 +08:00

SQOOP-1349: Sqoop2: Use configurable writable to get Intermediate Data Format

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-13 10:08:19 -08:00
parent a2e87bef01
commit 90ec25b2a8
7 changed files with 67 additions and 84 deletions

View File

@ -1,42 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.job.io;
public final class FieldTypes {
public static final int NULL = 0;
public static final int BOOLEAN = 1;
public static final int BYTE = 10;
public static final int CHAR = 11;
public static final int SHORT = 20;
public static final int INT = 21;
public static final int LONG = 22;
public static final int FLOAT = 50;
public static final int DOUBLE = 51;
public static final int BIN = 100;
public static final int UTF = 101;
private FieldTypes() {
// Disable explicit object creation
}
}

View File

@ -18,42 +18,64 @@
*/ */
package org.apache.sqoop.job.io; package org.apache.sqoop.job.io;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.utils.ClassUtils;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
public class SqoopWritable implements WritableComparable<SqoopWritable> { public class SqoopWritable implements Configurable, WritableComparable<SqoopWritable> {
private String strData; private IntermediateDataFormat<?> dataFormat;
private Configuration conf;
public SqoopWritable() {} public SqoopWritable() {
this(null);
}
public SqoopWritable(IntermediateDataFormat<?> dataFormat) {
this.dataFormat = dataFormat;
}
public void setString(String data) { public void setString(String data) {
strData = data; this.dataFormat.setTextData(data);
} }
public String getString() { public String getString() { return dataFormat.getTextData(); }
return strData;
}
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeUTF(strData); out.writeUTF(dataFormat.getTextData());
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException { dataFormat.setTextData(in.readUTF()); }
strData = in.readUTF();
}
@Override @Override
public int compareTo(SqoopWritable o) { public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); }
return strData.compareTo(o.getString());
}
@Override @Override
public String toString() { public String toString() {
return getString(); return getString();
} }
@Override
public void setConf(Configuration conf) {
this.conf = conf;
if (dataFormat == null) {
String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
this.dataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(intermediateDataFormatName);
}
}
@Override
public Configuration getConf() {
return conf;
}
} }

View File

@ -30,10 +30,10 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.PrefixContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.MRExecutionError;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.etl.PartitionerContext;
@ -63,14 +63,14 @@ public List<InputSplit> getSplits(JobContext context)
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob); List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig);
List<InputSplit> splits = new LinkedList<InputSplit>(); List<InputSplit> splits = new LinkedList<InputSplit>();
for (Partition partition : partitions) { for (Partition partition : partitions) {
LOG.debug("Partition: " + partition); LOG.debug("Partition: " + partition);

View File

@ -107,13 +107,17 @@ public void run(Context context) throws IOException, InterruptedException {
} }
} }
// There are two IDF objects we carry around in memory during the sqoop job execution.
// The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it.
// Before we do the writing to the toDatFormat object we do the matching process to negotiate between
// the two schemas and their corresponding column types before we write the data to the toDataFormat object
private class SqoopMapDataWriter extends DataWriter { private class SqoopMapDataWriter extends DataWriter {
private Context context; private Context context;
private SqoopWritable writable; private SqoopWritable writable;
public SqoopMapDataWriter(Context context) { public SqoopMapDataWriter(Context context) {
this.context = context; this.context = context;
this.writable = new SqoopWritable(); this.writable = new SqoopWritable(toDataFormat);
} }
@Override @Override
@ -139,10 +143,10 @@ private void writeContent() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Extracted data: " + fromDataFormat.getTextData()); LOG.debug("Extracted data: " + fromDataFormat.getTextData());
} }
// NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process
toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) ); // The output of the mappers is finally written to the toDataFormat object after the matching process
// since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat)
writable.setString(toDataFormat.getTextData()); toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData()));
context.write(writable, NullWritable.get()); context.write(writable, NullWritable.get());
} catch (Exception e) { } catch (Exception e) {
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);

View File

@ -63,6 +63,7 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean isTest = false; private volatile boolean isTest = false;
private String loaderName; private String loaderName;
// NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
this.isTest = isTest; this.isTest = isTest;
this.loaderName = loaderName; this.loaderName = loaderName;
@ -79,6 +80,7 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
// Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat
dataFormat.setSchema(matcher.getToSchema()); dataFormat.setSchema(matcher.getToSchema());
} }
@ -99,6 +101,7 @@ private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable
public void write(SqoopWritable key, NullWritable value) throws InterruptedException { public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
free.acquire(); free.acquire();
checkIfConsumerThrew(); checkIfConsumerThrew();
// NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
dataFormat.setTextData(key.getString()); dataFormat.setTextData(key.getString());
filled.release(); filled.release();
} }
@ -227,24 +230,23 @@ public void run() {
// Objects that should be pass to the Executor execution // Objects that should be pass to the Executor execution
PrefixContext subContext = null; PrefixContext subContext = null;
Object configConnection = null; Object connectorLinkConfig = null;
Object configJob = null; Object connectorToJobConfig = null;
Schema schema = null; Schema schema = null;
if (!isTest) { if (!isTest) {
// Using the TO schema since the IDF returns data in TO schema // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat
schema = matcher.getToSchema(); schema = matcher.getToSchema();
subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
configConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); connectorToJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
} }
// Create loader context // Create loader context
LoaderContext loaderContext = new LoaderContext(subContext, reader, schema); LoaderContext loaderContext = new LoaderContext(subContext, reader, schema);
LOG.info("Running loader class " + loaderName); LOG.info("Running loader class " + loaderName);
loader.load(loaderContext, configConnection, configJob); loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
LOG.info("Loader has finished"); LOG.info("Loader has finished");
} catch (Throwable t) { } catch (Throwable t) {
readerFinished = true; readerFinished = true;

View File

@ -27,12 +27,13 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestSqoopWritable { public class TestSqoopWritable {
private final SqoopWritable writable = new SqoopWritable(); private final SqoopWritable writable = new SqoopWritable(new CSVIntermediateDataFormat());
@Test @Test
public void testStringInStringOut() { public void testStringInStringOut() {
@ -78,7 +79,7 @@ public void testWriteReadUsingStream() throws IOException {
//Don't test what the data is, test that SqoopWritable can read it. //Don't test what the data is, test that SqoopWritable can read it.
InputStream instream = new ByteArrayInputStream(written); InputStream instream = new ByteArrayInputStream(written);
SqoopWritable newWritable = new SqoopWritable(); SqoopWritable newWritable = new SqoopWritable(new CSVIntermediateDataFormat());
DataInput in = new DataInputStream(instream); DataInput in = new DataInputStream(instream);
newWritable.readFields(in); newWritable.readFields(in);
Assert.assertEquals(testData, newWritable.getString()); Assert.assertEquals(testData, newWritable.getString());

View File

@ -132,11 +132,10 @@ public void testWhenLoaderThrows() throws Throwable {
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(); SqoopWritable writable = new SqoopWritable(dataFormat);
try { try {
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
dataFormat.setTextData(String.valueOf(count)); dataFormat.setTextData(String.valueOf(count));
writable.setString(dataFormat.getTextData());
writer.write(writable, null); writer.write(writable, null);
} }
} catch (SqoopException ex) { } catch (SqoopException ex) {
@ -151,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable {
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(); SqoopWritable writable = new SqoopWritable(dataFormat);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
@ -161,7 +160,6 @@ public void testSuccessfulContinuousLoader() throws Throwable {
} }
} }
dataFormat.setTextData(builder.toString()); dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null); writer.write(writable, null);
} }
writer.close(null); writer.close(null);
@ -173,7 +171,7 @@ public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(); SqoopWritable writable = new SqoopWritable(dataFormat);
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) { for (int count = 0; count < 100; count++) {
builder.append(String.valueOf(count)); builder.append(String.valueOf(count));
@ -182,7 +180,6 @@ public void testSuccessfulLoader() throws Throwable {
} }
} }
dataFormat.setTextData(builder.toString()); dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null); writer.write(writable, null);
//Allow writer to complete. //Allow writer to complete.
@ -198,7 +195,7 @@ public void testThrowingContinuousLoader() throws Throwable {
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(); SqoopWritable writable = new SqoopWritable(dataFormat);
try { try {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
@ -209,7 +206,6 @@ public void testThrowingContinuousLoader() throws Throwable {
} }
} }
dataFormat.setTextData(builder.toString()); dataFormat.setTextData(builder.toString());
writable.setString(dataFormat.getTextData());
writer.write(writable, null); writer.write(writable, null);
} }
writer.close(null); writer.close(null);