diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java index 03a1dec0..d5f74f01 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java @@ -153,11 +153,11 @@ public static void setConnectorSchema(Direction type, Job job, Schema schema) { } /** - * Retrieve Connector configuration object for connection. + * Retrieve Connector configuration object for link. * @param configuration MapReduce configuration object * @return Configuration object */ - public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) { + public static Object getConnectorLinkConfig(Direction type, Configuration configuration) { switch (type) { case FROM: return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 32b5b1d7..c6ba749a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -68,7 +68,7 @@ public static void executeDestroyer(boolean success, Configuration configuration // Objects that should be pass to the Destroyer execution PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); - Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration); + Object configConnection = MRConfigurationUtils.getConnectorLinkConfig(direction, configuration); Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration); // Propagate connector schema in every case for now diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index d2cf5e4e..887b4bb7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -63,7 +63,7 @@ public List getSplits(JobContext context) Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index d31aa209..e25f404e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -80,7 +80,7 @@ public void run(Context context) throws IOException, InterruptedException { // Objects that should be passed to the Executor execution PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 1148c4a6..6134106d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -18,6 +18,8 @@ package org.apache.sqoop.job.mr; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -28,18 +30,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; -import java.io.IOException; - /** * An output format for MapReduce job. */ public class SqoopNullOutputFormat extends OutputFormat { - public static final Logger LOG = - Logger.getLogger(SqoopNullOutputFormat.class); + public static final Logger LOG = Logger.getLogger(SqoopNullOutputFormat.class); @Override public void checkOutputSpecs(JobContext context) { @@ -47,48 +45,50 @@ public void checkOutputSpecs(JobContext context) { } @Override - public RecordWriter getRecordWriter( - TaskAttemptContext context) { - SqoopOutputFormatLoadExecutor executor = - new SqoopOutputFormatLoadExecutor(context); + public RecordWriter getRecordWriter(TaskAttemptContext context) { + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); return executor.getRecordWriter(); } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new DestroyerOutputCommitter(); + return new SqoopDestroyerOutputCommitter(); } - class DestroyerOutputCommitter extends OutputCommitter { + class SqoopDestroyerOutputCommitter extends OutputCommitter { @Override - public void setupJob(JobContext jobContext) { } + public void setupJob(JobContext jobContext) { + } @Override public void commitJob(JobContext jobContext) throws IOException { super.commitJob(jobContext); - - Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM); - SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO); + invokeDestroyerExecutor(jobContext, true); } @Override public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { super.abortJob(jobContext, state); + invokeDestroyerExecutor(jobContext, false); + } + private void invokeDestroyerExecutor(JobContext jobContext, boolean success) { Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM); - SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO); } @Override - public void setupTask(TaskAttemptContext taskContext) { } + public void setupTask(TaskAttemptContext taskContext) { + } @Override - public void commitTask(TaskAttemptContext taskContext) { } + public void commitTask(TaskAttemptContext taskContext) { + } @Override - public void abortTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { + } @Override public boolean needsTaskCommit(TaskAttemptContext taskContext) { diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 8aad936c..579101ee 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -236,7 +236,7 @@ public void run() { schema = matcher.getToSchema(); subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); + configConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); } diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java index 972b5555..fbe3e7b1 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java @@ -63,11 +63,11 @@ public void setUpHadoopJobConf() throws Exception { public void testLinkConfiguration() throws Exception { MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, jobConfSpy)); MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, jobConfSpy)); } @Test diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index e2d98cac..84861544 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -24,13 +24,13 @@ public abstract class Destroyer { /** - * Callback to clean up after job execution. + * Callback to clean up after job execution * * @param context Destroyer context * @param linkConfiguration link configuration object * @param jobConfiguration job configuration object for the FROM and TO - * In case of the FROM initializer this will represent the FROM job configuration - * In case of the TO initializer this will represent the TO job configuration + * In case of the FROM destroyer this will represent the FROM job configuration + * In case of the TO destroyer this will represent the TO job configuration */ public abstract void destroy(DestroyerContext context, LinkConfiguration linkConfiguration,