5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-10 02:21:34 +08:00

SQOOP-1665: Sqoop2: Misc Cleanup / rename lingering connection to link

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-11-04 08:10:22 -08:00
parent f43878bc75
commit 21d887636d
8 changed files with 32 additions and 32 deletions

View File

@ -153,11 +153,11 @@ public static void setConnectorSchema(Direction type, Job job, Schema schema) {
} }
/** /**
* Retrieve Connector configuration object for connection. * Retrieve Connector configuration object for link.
* @param configuration MapReduce configuration object * @param configuration MapReduce configuration object
* @return Configuration object * @return Configuration object
*/ */
public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) { public static Object getConnectorLinkConfig(Direction type, Configuration configuration) {
switch (type) { switch (type) {
case FROM: case FROM:
return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY); return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY);

View File

@ -68,7 +68,7 @@ public static void executeDestroyer(boolean success, Configuration configuration
// Objects that should be pass to the Destroyer execution // Objects that should be pass to the Destroyer execution
PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration); Object configConnection = MRConfigurationUtils.getConnectorLinkConfig(direction, configuration);
Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration); Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration);
// Propagate connector schema in every case for now // Propagate connector schema in every case for now

View File

@ -63,7 +63,7 @@ public List<InputSplit> getSplits(JobContext context)
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);

View File

@ -80,7 +80,7 @@ public void run(Context context) throws IOException, InterruptedException {
// Objects that should be passed to the Executor execution // Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey(); SqoopSplit split = context.getCurrentKey();

View File

@ -18,6 +18,8 @@
package org.apache.sqoop.job.mr; package org.apache.sqoop.job.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -28,18 +30,14 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.io.SqoopWritable;
import java.io.IOException;
/** /**
* An output format for MapReduce job. * An output format for MapReduce job.
*/ */
public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
public static final Logger LOG = public static final Logger LOG = Logger.getLogger(SqoopNullOutputFormat.class);
Logger.getLogger(SqoopNullOutputFormat.class);
@Override @Override
public void checkOutputSpecs(JobContext context) { public void checkOutputSpecs(JobContext context) {
@ -47,48 +45,50 @@ public void checkOutputSpecs(JobContext context) {
} }
@Override @Override
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) {
TaskAttemptContext context) { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context);
SqoopOutputFormatLoadExecutor executor =
new SqoopOutputFormatLoadExecutor(context);
return executor.getRecordWriter(); return executor.getRecordWriter();
} }
@Override @Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) { public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new DestroyerOutputCommitter(); return new SqoopDestroyerOutputCommitter();
} }
class DestroyerOutputCommitter extends OutputCommitter { class SqoopDestroyerOutputCommitter extends OutputCommitter {
@Override @Override
public void setupJob(JobContext jobContext) { } public void setupJob(JobContext jobContext) {
}
@Override @Override
public void commitJob(JobContext jobContext) throws IOException { public void commitJob(JobContext jobContext) throws IOException {
super.commitJob(jobContext); super.commitJob(jobContext);
invokeDestroyerExecutor(jobContext, true);
Configuration config = jobContext.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
} }
@Override @Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
super.abortJob(jobContext, state); super.abortJob(jobContext, state);
invokeDestroyerExecutor(jobContext, false);
}
private void invokeDestroyerExecutor(JobContext jobContext, boolean success) {
Configuration config = jobContext.getConfiguration(); Configuration config = jobContext.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM); SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO); SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO);
} }
@Override @Override
public void setupTask(TaskAttemptContext taskContext) { } public void setupTask(TaskAttemptContext taskContext) {
}
@Override @Override
public void commitTask(TaskAttemptContext taskContext) { } public void commitTask(TaskAttemptContext taskContext) {
}
@Override @Override
public void abortTask(TaskAttemptContext taskContext) { } public void abortTask(TaskAttemptContext taskContext) {
}
@Override @Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) { public boolean needsTaskCommit(TaskAttemptContext taskContext) {

View File

@ -236,7 +236,7 @@ public void run() {
schema = matcher.getToSchema(); schema = matcher.getToSchema();
subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); configConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
} }

View File

@ -63,11 +63,11 @@ public void setUpHadoopJobConf() throws Exception {
public void testLinkConfiguration() throws Exception { public void testLinkConfiguration() throws Exception {
MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
setUpHadoopJobConf(); setUpHadoopJobConf();
assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, jobConfSpy));
MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
setUpHadoopJobConf(); setUpHadoopJobConf();
assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, jobConfSpy));
} }
@Test @Test

View File

@ -24,13 +24,13 @@
public abstract class Destroyer<LinkConfiguration, JobConfiguration> { public abstract class Destroyer<LinkConfiguration, JobConfiguration> {
/** /**
* Callback to clean up after job execution. * Callback to clean up after job execution
* *
* @param context Destroyer context * @param context Destroyer context
* @param linkConfiguration link configuration object * @param linkConfiguration link configuration object
* @param jobConfiguration job configuration object for the FROM and TO * @param jobConfiguration job configuration object for the FROM and TO
* In case of the FROM initializer this will represent the FROM job configuration * In case of the FROM destroyer this will represent the FROM job configuration
* In case of the TO initializer this will represent the TO job configuration * In case of the TO destroyer this will represent the TO job configuration
*/ */
public abstract void destroy(DestroyerContext context, public abstract void destroy(DestroyerContext context,
LinkConfiguration linkConfiguration, LinkConfiguration linkConfiguration,