From b04e796f01cb659efc55314029fa18cfd80bb16d Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Sun, 21 Sep 2014 13:00:46 -0700 Subject: [PATCH] SQOOP-1488: Sqoop2: From/To: Run both destroyers (Abraham Elmahrek via Jarek Jarcec Cecho) --- .../mapreduce/MapreduceExecutionEngine.java | 3 +- .../org/apache/sqoop/job/JobConstants.java | 6 ++-- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 33 ++++++++++++++----- .../sqoop/job/mr/SqoopFileOutputFormat.java | 7 ++-- .../sqoop/job/mr/SqoopNullOutputFormat.java | 7 ++-- .../org/apache/sqoop/job/TestMapReduce.java | 29 ++++++++++++++++ .../jdbc/generic/TableStagedRDBMSTest.java | 15 ++++----- 7 files changed, 75 insertions(+), 25 deletions(-) diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index ef7ff4e7..049d183c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -68,7 +68,8 @@ public void prepareJob(JobRequest jobRequest) { context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName()); - context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName()); + context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); + context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, mrJobRequest.getIntermediateDataFormat().getName()); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index 4cdb002a..542607af 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -37,9 +37,11 @@ public final class JobConstants extends Constants { public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG + "etl.loader"; - public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG - + "etl.destroyer"; + public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG + + "etl.from.destroyer"; + public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG + + "etl.to.destroyer"; public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG + "mr.output.file"; diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index e3af6e1c..aecde400 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -40,10 +40,24 @@ public class SqoopDestroyerExecutor { * @param success True if the job execution was successfull * @param configuration Configuration object to get destroyer class with context * and configuration objects. - * @param propertyName Name of property that holds destroyer class. + * @param direction The direction of the Destroyer to execute. */ - public static void executeDestroyer(boolean success, Configuration configuration, String propertyName) { - Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName)); + public static void executeDestroyer(boolean success, Configuration configuration, Direction direction) { + String destroyerPropertyName, prefixPropertyName; + switch (direction) { + default: + case FROM: + destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER; + prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT; + break; + + case TO: + destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER; + prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT; + break; + } + + Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(destroyerPropertyName)); if(destroyer == null) { LOG.info("Skipping running destroyer as non was defined."); @@ -51,16 +65,17 @@ public static void executeDestroyer(boolean success, Configuration configuration } // Objects that should be pass to the Destroyer execution - PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration); - Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration); + PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); + Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration); + Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration); + + // Propagate connector schema in every case for now + Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration); - // TODO(Abe/Gwen): Change to conditional choosing between schemas. - Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration); DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); LOG.info("Executing destroyer class " + destroyer.getClass()); - destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob); + destroyer.destroy(destroyerContext, configConnection, configJob); } private SqoopDestroyerExecutor() { diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index 3e2b1c5b..ca77e160 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; +import org.apache.sqoop.common.Direction; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.io.SqoopWritable; @@ -84,7 +85,8 @@ public void commitJob(JobContext context) throws IOException { super.commitJob(context); Configuration config = context.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER); + SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO); } @Override @@ -92,7 +94,8 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti super.abortJob(context, state); Configuration config = context.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER); + SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO); } } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index b3461bba..594b5e90 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import org.apache.sqoop.common.Direction; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.io.SqoopWritable; @@ -67,7 +68,8 @@ public void commitJob(JobContext jobContext) throws IOException { super.commitJob(jobContext); Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER); + SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO); } @Override @@ -75,7 +77,8 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce super.abortJob(jobContext, state); Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER); + SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO); } @Override diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 2dfc487f..869c727b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Loader; @@ -100,6 +103,8 @@ public void testOutputFormat() throws Exception { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); + conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Schema schema = new Schema("Test"); @@ -110,6 +115,10 @@ public void testOutputFormat() throws Exception { ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); + + // Make sure both destroyers get called. + Assert.assertEquals(1, DummyFromDestroyer.count); + Assert.assertEquals(1, DummyToDestroyer.count); } public static class DummyPartition extends Partition { @@ -251,4 +260,24 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception{ } } } + + public static class DummyFromDestroyer extends Destroyer { + + public static int count = 0; + + @Override + public void destroy(DestroyerContext context, Object o, Object o2) { + count++; + } + } + + public static class DummyToDestroyer extends Destroyer { + + public static int count = 0; + + @Override + public void destroy(DestroyerContext context, Object o, Object o2) { + count++; + } + } } diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java index cb782c7d..1af0cdc6 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java @@ -68,15 +68,12 @@ public void testStagedTransfer() throws Exception { runJob(job); - // @TODO(Abe): Change back after SQOOP-1488 -// assertEquals(0L, provider.rowCount(stageTableName)); -// assertEquals(4L, rowCount()); -// assertRowInCities(1, "USA", "San Francisco"); -// assertRowInCities(2, "USA", "Sunnyvale"); -// assertRowInCities(3, "Czech Republic", "Brno"); -// assertRowInCities(4, "USA", "Palo Alto"); - assertEquals(4L, provider.rowCount(stageTableName)); - assertEquals(0L, rowCount()); + assertEquals(0L, provider.rowCount(stageTableName)); + assertEquals(4L, rowCount()); + assertRowInCities(1, "USA", "San Francisco"); + assertRowInCities(2, "USA", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "Brno"); + assertRowInCities(4, "USA", "Palo Alto"); // Clean up testing table provider.dropTable(stageTableName);