5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 18:32:09 +08:00

SQOOP-1929: Sqoop2: Track number of records written in Loader

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-22 18:14:23 -08:00
parent 22289d236a
commit 7f53eb22e3
11 changed files with 202 additions and 88 deletions

View File

@ -21,5 +21,6 @@
*
*/
public enum SqoopCounters {
ROWS_READ;
ROWS_READ,
ROWS_WRITTEN
}

View File

@ -28,6 +28,7 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat
public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
private long rowsWritten = 0;
@Override
public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{
@ -41,27 +42,28 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfi
String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
executor.beginBatch(sql);
try {
int numberOfRows = 0;
int numberOfBatches = 0;
int numberOfRowsPerBatch = 0;
int numberOfBatchesPerTransaction = 0;
Object[] array;
while ((array = context.getDataReader().readArrayRecord()) != null) {
numberOfRows++;
numberOfRowsPerBatch++;
executor.addBatch(array);
if (numberOfRows == rowsPerBatch) {
numberOfBatches++;
if (numberOfBatches == batchesPerTransaction) {
if (numberOfRowsPerBatch == rowsPerBatch) {
numberOfBatchesPerTransaction++;
if (numberOfBatchesPerTransaction == batchesPerTransaction) {
executor.executeBatch(true);
numberOfBatches = 0;
numberOfBatchesPerTransaction = 0;
} else {
executor.executeBatch(false);
}
numberOfRows = 0;
numberOfRowsPerBatch = 0;
}
rowsWritten ++;
}
if (numberOfRows != 0 || numberOfBatches != 0) {
if (numberOfRowsPerBatch != 0 || numberOfBatchesPerTransaction != 0) {
// execute and commit the remaining rows
executor.executeBatch(true);
}
@ -73,4 +75,12 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfi
}
}
/* (non-Javadoc)
* @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}

View File

@ -48,7 +48,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
private Configuration conf;
private DataWriter dataWriter;
private long rowRead = 0;
private long rowsRead = 0;
@Override
public void extract(ExtractorContext context, LinkConfiguration linkConfiguration,
@ -109,7 +109,7 @@ private void extractSequenceFile(LinkConfiguration linkConfiguration,
Text line = new Text();
boolean hasNext = filereader.next(line);
while (hasNext) {
rowRead++;
rowsRead++;
if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString()));
} else {
@ -176,7 +176,7 @@ private void extractTextFile(LinkConfiguration linkConfiguration,
} else {
next = fileseeker.getPos();
}
rowRead++;
rowsRead++;
if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, line.toString()));
} else {
@ -189,7 +189,7 @@ private void extractTextFile(LinkConfiguration linkConfiguration,
@Override
public long getRowsRead() {
return rowRead;
return rowsRead;
}
/**

View File

@ -38,6 +38,9 @@
import org.apache.sqoop.utils.ClassUtils;
public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
private long rowsWritten = 0;
/**
* Load data to target.
*
@ -79,19 +82,21 @@ public void load(LoaderContext context, LinkConfiguration linkConfiguration,
GenericHdfsWriter filewriter = getWriter(toJobConfig);
filewriter.initialize(filepath,conf,codec);
filewriter.initialize(filepath, conf, codec);
if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) {
Object[] record;
while ((record = reader.readArrayRecord()) != null) {
filewriter.write(HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record));
rowsWritten++;
}
} else {
String record;
while ((record = reader.readTextRecord()) != null) {
filewriter.write(record);
rowsWritten++;
}
}
filewriter.destroy();
@ -142,4 +147,12 @@ private static String getExtension(ToJobConfiguration toJobConf, CompressionCode
return codec.getDefaultExtension();
}
/* (non-Javadoc)
* @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}

View File

@ -38,6 +38,7 @@ public class KafkaLoader extends Loader<LinkConfiguration,ToJobConfiguration> {
private List<KeyedMessage<String, String>> messageList =
new ArrayList<KeyedMessage<String, String>>(KafkaConstants.DEFAULT_BATCH_SIZE);
private Producer producer;
private long rowsWritten = 0;
@Override
public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws
@ -58,6 +59,7 @@ public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJo
if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) {
sendToKafka(messageList);
}
rowsWritten ++;
}
if (messageList.size() > 0) {
@ -103,4 +105,12 @@ private Properties generateDefaultKafkaProps() {
props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE);
return props;
}
/* (non-Javadoc)
* @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}

View File

@ -34,6 +34,7 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(KiteLoader.class);
private long rowsWritten = 0;
@VisibleForTesting
protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
FileFormat format) {
@ -57,14 +58,13 @@ public void load(LoaderContext context, LinkConfiguration linkConfig,
DataReader reader = context.getDataReader();
Object[] array;
boolean success = false;
long count = 0L;
try {
while ((array = reader.readArrayRecord()) != null) {
executor.writeRecord(array);
count++;
rowsWritten++;
}
LOG.info(count + " data record(s) have been written into dataset.");
LOG.info(rowsWritten + " data record(s) have been written into dataset.");
success = true;
} finally {
executor.closeWriter();
@ -76,4 +76,12 @@ public void load(LoaderContext context, LinkConfiguration linkConfig,
}
}
/* (non-Javadoc)
* @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}

View File

@ -39,7 +39,11 @@ limitations under the License.
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-core</artifactId>

View File

@ -42,7 +42,7 @@
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.utils.ClassUtils;
@ -60,20 +60,20 @@ public class SqoopOutputFormatLoadExecutor {
private Future<?> consumerFuture;
private Semaphore filled = new Semaphore(0, true);
private Semaphore free = new Semaphore(1, true);
private volatile boolean isTest = false;
private String loaderName;
// NOTE: This method is only exposed for test cases
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, IntermediateDataFormat<?> idf) {
this.isTest = isTest;
SqoopOutputFormatLoadExecutor(JobContext jobctx, String loaderName, IntermediateDataFormat<?> toDataFormat, Matcher matcher) {
context = jobctx;
this.loaderName = loaderName;
toDataFormat = idf;
this.matcher = matcher;
this.toDataFormat = toDataFormat;
writer = new SqoopRecordWriter();
matcher = null;
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
loaderName = context.getConfiguration().get(MRJobConstants.JOB_ETL_LOADER);
writer = new SqoopRecordWriter();
matcher = MatcherFactory.getMatcher(
MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
@ -87,12 +87,12 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
("OutputFormatLoader-consumer").build()).submit(
new ConsumerThread());
new ConsumerThread(context));
return writer;
}
/*
* This is a producer-consumer problem and can be solved
* This is a reader-writer problem and can be solved
* with two semaphores.
*/
private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
@ -215,40 +215,43 @@ private void releaseSema(){
private class ConsumerThread implements Runnable {
/**
* Context class that we should use for reporting counters.
*/
private final JobContext jobctx;
public ConsumerThread(final JobContext context) {
jobctx = context;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
DataReader reader = new SqoopOutputFormatDataReader();
Configuration conf = null;
if (!isTest) {
conf = context.getConfiguration();
loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER);
}
Configuration conf = context.getConfiguration();
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
// Objects that should be pass to the Executor execution
PrefixContext subContext = null;
Object connectorLinkConfig = null;
Object connectorToJobConfig = null;
Schema schema = null;
if (!isTest) {
// Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat
schema = matcher.getToSchema();
subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
connectorToJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
}
// Objects that should be passed to the Loader
PrefixContext subContext = new PrefixContext(conf,
MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
Object connectorLinkConfig = MRConfigurationUtils
.getConnectorLinkConfig(Direction.TO, conf);
Object connectorToJobConfig = MRConfigurationUtils
.getConnectorJobConfig(Direction.TO, conf);
// Using the TO schema since the SqoopDataWriter in the SqoopMapper
// encapsulates the toDataFormat
// Create loader context
LoaderContext loaderContext = new LoaderContext(subContext, reader, schema);
LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema());
LOG.info("Running loader class " + loaderName);
loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
LOG.info("Loader has finished");
((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
loader.getRowsWritten());
} catch (Throwable t) {
readerFinished = true;
LOG.error("Error while loading data out of MR job.", t);

View File

@ -251,6 +251,7 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
private long rowsWritten = 0;
@Override
public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj)
@ -260,9 +261,18 @@ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguratio
String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'";
dataFormat.setCSVTextData(testData);
index++;
rowsWritten ++;
assertEquals(dataFormat.getCSVTextData().toString(), data);
}
}
/* (non-Javadoc)
* @see org.apache.sqoop.job.etl.Loader#getRowsWritten()
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}
public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {

View File

@ -18,50 +18,63 @@
*/
package org.apache.sqoop.job.mr;
import java.util.ConcurrentModificationException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.matcher.Matcher;
import org.apache.sqoop.connector.matcher.MatcherFactory;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.util.MRJobTestUtil;
import org.apache.sqoop.schema.NullSchema;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ConcurrentModificationException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
public class TestSqoopOutputFormatLoadExecutor {
private Configuration conf;
private TaskAttemptContext jobContextMock;
public static class ThrowingLoader extends Loader {
public ThrowingLoader() {
}
public static class ThrowingLoader extends Loader<Object, Object> {
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
context.getDataReader().readTextRecord();
throw new BrokenBarrierException();
}
@Override
public long getRowsWritten() {
return 0;
}
}
public static class ThrowingContinuousLoader extends Loader {
public static class ThrowingContinuousLoader extends Loader<Object, Object> {
private long rowsWritten = 0;
public ThrowingContinuousLoader() {
}
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0;
Object o;
String[] arr;
while ((o = context.getDataReader().readTextRecord()) != null) {
@ -70,20 +83,20 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
for (int i = 0; i < arr.length; i++) {
Assert.assertEquals(i, Integer.parseInt(arr[i]));
}
runCount++;
if (runCount == 5) {
rowsWritten++;
if (rowsWritten == 5) {
throw new ConcurrentModificationException();
}
}
}
@Override
public long getRowsWritten() {
return rowsWritten;
}
}
public static class GoodLoader extends Loader {
public GoodLoader() {
}
public static class GoodLoader extends Loader<Object, Object> {
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
String[] arr = context.getDataReader().readTextRecord().toString().split(",");
@ -92,17 +105,20 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
Assert.assertEquals(i, Integer.parseInt(arr[i]));
}
}
@Override
public long getRowsWritten() {
return 0;
}
}
public static class GoodContinuousLoader extends Loader {
public static class GoodContinuousLoader extends Loader<Object, Object> {
public GoodContinuousLoader() {
}
private long rowsWritten = 0;
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
int runCount = 0;
int rowsWritten = 0;
Object o;
String[] arr;
while ((o = context.getDataReader().readTextRecord()) != null) {
@ -111,26 +127,47 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception {
for (int i = 0; i < arr.length; i++) {
Assert.assertEquals(i, Integer.parseInt(arr[i]));
}
runCount++;
rowsWritten++;
}
Assert.assertEquals(10, runCount);
Assert.assertEquals(10, rowsWritten);
}
@Override
public long getRowsWritten() {
return rowsWritten;
}
}
// TODO:SQOOP-1873: Mock objects instead
private Matcher getMatcher(){
return MatcherFactory.getMatcher(NullSchema.getInstance(),
NullSchema.getInstance());
}
// TODO:SQOOP-1873: Mock objects instead
private IntermediateDataFormat<?> getIDF(){
return new CSVIntermediateDataFormat();
}
@Before
public void setUp() {
conf = new Configuration();
conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
jobContextMock = mock(TaskAttemptContext.class);
GenericCounter counter = new GenericCounter("test", "test-me");
when(((TaskAttemptContext) jobContextMock).getCounter(SqoopCounters.ROWS_WRITTEN)).thenReturn(counter);
org.apache.hadoop.mapred.JobConf testConf = new org.apache.hadoop.mapred.JobConf();
when(jobContextMock.getConfiguration()).thenReturn(testConf);
}
@Test(expected = BrokenBarrierException.class)
public void testWhenLoaderThrows() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
ThrowingLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor
.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
try {
@ -146,8 +183,9 @@ public void testWhenLoaderThrows() throws Throwable {
@Test
public void testSuccessfulContinuousLoader() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
GoodContinuousLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
@ -163,13 +201,16 @@ public void testSuccessfulContinuousLoader() throws Throwable {
writer.write(writable, null);
}
writer.close(null);
verify(jobContextMock, times(1)).getConfiguration();
verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
}
@Test (expected = SqoopException.class)
@Test(expected = SqoopException.class)
public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
GoodLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor
.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
StringBuilder builder = new StringBuilder();
@ -182,18 +223,19 @@ public void testSuccessfulLoader() throws Throwable {
dataFormat.setCSVTextData(builder.toString());
writer.write(writable, null);
//Allow writer to complete.
// Allow writer to complete.
TimeUnit.SECONDS.sleep(5);
writer.close(null);
verify(jobContextMock, times(1)).getConfiguration();
verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
}
@Test(expected = ConcurrentModificationException.class)
public void testThrowingContinuousLoader() throws Throwable {
conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
ThrowingContinuousLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
SqoopWritable writable = new SqoopWritable(dataFormat);
try {
@ -213,4 +255,5 @@ public void testThrowingContinuousLoader() throws Throwable {
throw ex.getCause();
}
}
}

View File

@ -33,4 +33,16 @@ public abstract class Loader<LinkConfiguration, ToJobConfiguration> {
public abstract void load(LoaderContext context, LinkConfiguration linkConfiguration,
ToJobConfiguration jobConfiguration) throws Exception;
/**
* Return the number of rows witten by the last call to
* {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, java.lang.Object) }
* method. This method returns only the number of rows written in the last call,
* and not a cumulative total of the number of rows written by this Loader
* since its creation.
*
* @return the number of rows written by the last call to
* {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, java.lang.Object) }
*/
public abstract long getRowsWritten();
}