From 7f53eb22e348ef1b9171eedb45063e552ce0281d Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Mon, 22 Dec 2014 18:14:23 -0800 Subject: [PATCH] SQOOP-1929: Sqoop2: Track number of records written in Loader (Veena Basavaraj via Abraham Elmahrek) --- .../submission/counter/SqoopCounters.java | 3 +- .../connector/jdbc/GenericJdbcLoader.java | 28 ++-- .../sqoop/connector/hdfs/HdfsExtractor.java | 8 +- .../sqoop/connector/hdfs/HdfsLoader.java | 15 ++- .../sqoop/connector/kafka/KafkaLoader.java | 10 ++ .../sqoop/connector/kite/KiteLoader.java | 14 +- execution/mapreduce/pom.xml | 6 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 59 +++++---- .../org/apache/sqoop/job/TestMapReduce.java | 10 ++ .../mr/TestSqoopOutputFormatLoadExecutor.java | 125 ++++++++++++------ .../java/org/apache/sqoop/job/etl/Loader.java | 12 ++ 11 files changed, 202 insertions(+), 88 deletions(-) diff --git a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java index 75f39802..dd9dd68e 100644 --- a/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java +++ b/common/src/main/java/org/apache/sqoop/submission/counter/SqoopCounters.java @@ -21,5 +21,6 @@ * */ public enum SqoopCounters { - ROWS_READ; + ROWS_READ, + ROWS_WRITTEN } diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java index 6340a70c..31fd8764 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java @@ -28,6 +28,7 @@ public class GenericJdbcLoader extends Loader { + + private long rowsWritten = 0; + /** * Load data to target. * @@ -79,19 +82,21 @@ public void load(LoaderContext context, LinkConfiguration linkConfiguration, GenericHdfsWriter filewriter = getWriter(toJobConfig); - filewriter.initialize(filepath,conf,codec); + filewriter.initialize(filepath, conf, codec); if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) { Object[] record; while ((record = reader.readArrayRecord()) != null) { filewriter.write(HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record)); + rowsWritten++; } } else { String record; while ((record = reader.readTextRecord()) != null) { filewriter.write(record); + rowsWritten++; } } filewriter.destroy(); @@ -142,4 +147,12 @@ private static String getExtension(ToJobConfiguration toJobConf, CompressionCode return codec.getDefaultExtension(); } + /* (non-Javadoc) + * @see org.apache.sqoop.job.etl.Loader#getRowsWritten() + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } + } diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java index 5d795164..1c08f602 100644 --- a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java @@ -38,6 +38,7 @@ public class KafkaLoader extends Loader { private List> messageList = new ArrayList>(KafkaConstants.DEFAULT_BATCH_SIZE); private Producer producer; + private long rowsWritten = 0; @Override public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws @@ -58,6 +59,7 @@ public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJo if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) { sendToKafka(messageList); } + rowsWritten ++; } if (messageList.size() > 0) { @@ -103,4 +105,12 @@ private Properties generateDefaultKafkaProps() { props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE); return props; } + + /* (non-Javadoc) + * @see org.apache.sqoop.job.etl.Loader#getRowsWritten() + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } } diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java index 709fd945..0a46f4a2 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java @@ -34,6 +34,7 @@ public class KiteLoader extends Loader { private static final Logger LOG = Logger.getLogger(KiteLoader.class); + private long rowsWritten = 0; @VisibleForTesting protected KiteDatasetExecutor getExecutor(String uri, Schema schema, FileFormat format) { @@ -57,14 +58,13 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, DataReader reader = context.getDataReader(); Object[] array; boolean success = false; - long count = 0L; try { while ((array = reader.readArrayRecord()) != null) { executor.writeRecord(array); - count++; + rowsWritten++; } - LOG.info(count + " data record(s) have been written into dataset."); + LOG.info(rowsWritten + " data record(s) have been written into dataset."); success = true; } finally { executor.closeWriter(); @@ -76,4 +76,12 @@ public void load(LoaderContext context, LinkConfiguration linkConfig, } } + /* (non-Javadoc) + * @see org.apache.sqoop.job.etl.Loader#getRowsWritten() + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } + } \ No newline at end of file diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml index b23b9055..ad7f489e 100644 --- a/execution/mapreduce/pom.xml +++ b/execution/mapreduce/pom.xml @@ -39,7 +39,11 @@ limitations under the License. junit test - + + org.mockito + mockito-all + test + org.apache.sqoop sqoop-core diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index aaf771c2..7835e38f 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.utils.ClassUtils; @@ -60,20 +60,20 @@ public class SqoopOutputFormatLoadExecutor { private Future consumerFuture; private Semaphore filled = new Semaphore(0, true); private Semaphore free = new Semaphore(1, true); - private volatile boolean isTest = false; private String loaderName; // NOTE: This method is only exposed for test cases - SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, IntermediateDataFormat idf) { - this.isTest = isTest; + SqoopOutputFormatLoadExecutor(JobContext jobctx, String loaderName, IntermediateDataFormat toDataFormat, Matcher matcher) { + context = jobctx; this.loaderName = loaderName; - toDataFormat = idf; + this.matcher = matcher; + this.toDataFormat = toDataFormat; writer = new SqoopRecordWriter(); - matcher = null; } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { context = jobctx; + loaderName = context.getConfiguration().get(MRJobConstants.JOB_ETL_LOADER); writer = new SqoopRecordWriter(); matcher = MatcherFactory.getMatcher( MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), @@ -87,12 +87,12 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { public RecordWriter getRecordWriter() { consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( - new ConsumerThread()); + new ConsumerThread(context)); return writer; } /* - * This is a producer-consumer problem and can be solved + * This is a reader-writer problem and can be solved * with two semaphores. */ private class SqoopRecordWriter extends RecordWriter { @@ -215,40 +215,43 @@ private void releaseSema(){ private class ConsumerThread implements Runnable { + /** + * Context class that we should use for reporting counters. + */ + private final JobContext jobctx; + + public ConsumerThread(final JobContext context) { + jobctx = context; + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); try { DataReader reader = new SqoopOutputFormatDataReader(); - - Configuration conf = null; - if (!isTest) { - conf = context.getConfiguration(); - loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER); - } + Configuration conf = context.getConfiguration(); Loader loader = (Loader) ClassUtils.instantiate(loaderName); - // Objects that should be pass to the Executor execution - PrefixContext subContext = null; - Object connectorLinkConfig = null; - Object connectorToJobConfig = null; - Schema schema = null; - - if (!isTest) { - // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat - schema = matcher.getToSchema(); - subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); - connectorToJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); - } + // Objects that should be passed to the Loader + PrefixContext subContext = new PrefixContext(conf, + MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); + Object connectorLinkConfig = MRConfigurationUtils + .getConnectorLinkConfig(Direction.TO, conf); + Object connectorToJobConfig = MRConfigurationUtils + .getConnectorJobConfig(Direction.TO, conf); + // Using the TO schema since the SqoopDataWriter in the SqoopMapper + // encapsulates the toDataFormat // Create loader context - LoaderContext loaderContext = new LoaderContext(subContext, reader, schema); + LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema()); LOG.info("Running loader class " + loaderName); loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig); LOG.info("Loader has finished"); + ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment( + loader.getRowsWritten()); + } catch (Throwable t) { readerFinished = true; LOG.error("Error while loading data out of MR job.", t); diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 47696cc9..cc0a3cc3 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -251,6 +251,7 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) { public static class DummyLoader extends Loader { private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION; private IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); + private long rowsWritten = 0; @Override public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) @@ -260,9 +261,18 @@ public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguratio String testData = "" + index + "," + (double) index + ",'" + String.valueOf(index) + "'"; dataFormat.setCSVTextData(testData); index++; + rowsWritten ++; assertEquals(dataFormat.getCSVTextData().toString(), data); } } + + /* (non-Javadoc) + * @see org.apache.sqoop.job.etl.Loader#getRowsWritten() + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } } public static class DummyFromDestroyer extends Destroyer { diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index ec0e8861..f5f627d6 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -18,50 +18,63 @@ */ package org.apache.sqoop.job.mr; -import java.util.ConcurrentModificationException; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.counters.GenericCounter; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.matcher.Matcher; +import org.apache.sqoop.connector.matcher.MatcherFactory; import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.util.MRJobTestUtil; +import org.apache.sqoop.schema.NullSchema; +import org.apache.sqoop.submission.counter.SqoopCounters; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ConcurrentModificationException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.TimeUnit; + public class TestSqoopOutputFormatLoadExecutor { private Configuration conf; + private TaskAttemptContext jobContextMock; - public static class ThrowingLoader extends Loader { - - public ThrowingLoader() { - - } + public static class ThrowingLoader extends Loader { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { context.getDataReader().readTextRecord(); throw new BrokenBarrierException(); } + + @Override + public long getRowsWritten() { + return 0; + } } - public static class ThrowingContinuousLoader extends Loader { + public static class ThrowingContinuousLoader extends Loader { + private long rowsWritten = 0; public ThrowingContinuousLoader() { } @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - int runCount = 0; Object o; String[] arr; while ((o = context.getDataReader().readTextRecord()) != null) { @@ -70,20 +83,20 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { for (int i = 0; i < arr.length; i++) { Assert.assertEquals(i, Integer.parseInt(arr[i])); } - runCount++; - if (runCount == 5) { + rowsWritten++; + if (rowsWritten == 5) { throw new ConcurrentModificationException(); } } } + + @Override + public long getRowsWritten() { + return rowsWritten; + } } - public static class GoodLoader extends Loader { - - public GoodLoader() { - - } - + public static class GoodLoader extends Loader { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { String[] arr = context.getDataReader().readTextRecord().toString().split(","); @@ -92,17 +105,20 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { Assert.assertEquals(i, Integer.parseInt(arr[i])); } } + + @Override + public long getRowsWritten() { + return 0; + } } - public static class GoodContinuousLoader extends Loader { + public static class GoodContinuousLoader extends Loader { - public GoodContinuousLoader() { - - } + private long rowsWritten = 0; @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - int runCount = 0; + int rowsWritten = 0; Object o; String[] arr; while ((o = context.getDataReader().readTextRecord()) != null) { @@ -111,26 +127,47 @@ public void load(LoaderContext context, Object cc, Object jc) throws Exception { for (int i = 0; i < arr.length; i++) { Assert.assertEquals(i, Integer.parseInt(arr[i])); } - runCount++; + rowsWritten++; } - Assert.assertEquals(10, runCount); + Assert.assertEquals(10, rowsWritten); + } + + @Override + public long getRowsWritten() { + return rowsWritten; } } + // TODO:SQOOP-1873: Mock objects instead + private Matcher getMatcher(){ + return MatcherFactory.getMatcher(NullSchema.getInstance(), + NullSchema.getInstance()); + + } + // TODO:SQOOP-1873: Mock objects instead + private IntermediateDataFormat getIDF(){ + return new CSVIntermediateDataFormat(); + } @Before public void setUp() { conf = new Configuration(); - conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); - + conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + jobContextMock = mock(TaskAttemptContext.class); + GenericCounter counter = new GenericCounter("test", "test-me"); + when(((TaskAttemptContext) jobContextMock).getCounter(SqoopCounters.ROWS_WRITTEN)).thenReturn(counter); + org.apache.hadoop.mapred.JobConf testConf = new org.apache.hadoop.mapred.JobConf(); + when(jobContextMock.getConfiguration()).thenReturn(testConf); } @Test(expected = BrokenBarrierException.class) public void testWhenLoaderThrows() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), new CSVIntermediateDataFormat()); - RecordWriter writer = executor.getRecordWriter(); + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, + ThrowingLoader.class.getName(), getIDF(), getMatcher()); + RecordWriter writer = executor + .getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); try { @@ -146,8 +183,9 @@ public void testWhenLoaderThrows() throws Throwable { @Test public void testSuccessfulContinuousLoader() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat()); + + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, + GoodContinuousLoader.class.getName(), getIDF(), getMatcher()); RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); @@ -163,13 +201,16 @@ public void testSuccessfulContinuousLoader() throws Throwable { writer.write(writable, null); } writer.close(null); + verify(jobContextMock, times(1)).getConfiguration(); + verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN); } - @Test (expected = SqoopException.class) + @Test(expected = SqoopException.class) public void testSuccessfulLoader() throws Throwable { - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new CSVIntermediateDataFormat()); - RecordWriter writer = executor.getRecordWriter(); + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, + GoodLoader.class.getName(), getIDF(), getMatcher()); + RecordWriter writer = executor + .getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); StringBuilder builder = new StringBuilder(); @@ -182,18 +223,19 @@ public void testSuccessfulLoader() throws Throwable { dataFormat.setCSVTextData(builder.toString()); writer.write(writable, null); - //Allow writer to complete. + // Allow writer to complete. TimeUnit.SECONDS.sleep(5); writer.close(null); + verify(jobContextMock, times(1)).getConfiguration(); + verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN); } - @Test(expected = ConcurrentModificationException.class) public void testThrowingContinuousLoader() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); - SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat()); - RecordWriter writer = executor.getRecordWriter(); + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock, + ThrowingContinuousLoader.class.getName(), getIDF(), getMatcher()); + RecordWriter writer = executor.getRecordWriter(); IntermediateDataFormat dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); try { @@ -213,4 +255,5 @@ public void testThrowingContinuousLoader() throws Throwable { throw ex.getCause(); } } + } diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index 3b6bd718..e47b244b 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -33,4 +33,16 @@ public abstract class Loader { public abstract void load(LoaderContext context, LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws Exception; + /** + * Return the number of rows witten by the last call to + * {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, java.lang.Object) } + * method. This method returns only the number of rows written in the last call, + * and not a cumulative total of the number of rows written by this Loader + * since its creation. + * + * @return the number of rows written by the last call to + * {@linkplain Loader#load(org.apache.sqoop.job.etl.LoaderContext, java.lang.Object) } + */ + public abstract long getRowsWritten(); + }