5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 01:50:07 +08:00

SQOOP-1488: Sqoop2: From/To: Run both destroyers

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-09-21 13:00:46 -07:00 committed by Abraham Elmahrek
parent 27fb31d42e
commit b04e796f01
7 changed files with 75 additions and 25 deletions

View File

@ -68,7 +68,8 @@ public void prepareJob(JobRequest jobRequest) {
context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
mrJobRequest.getIntermediateDataFormat().getName());

View File

@ -37,9 +37,11 @@ public final class JobConstants extends Constants {
public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+ "etl.loader";
public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
+ "etl.destroyer";
public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
+ "etl.from.destroyer";
public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
+ "etl.to.destroyer";
public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+ "mr.output.file";

View File

@ -40,10 +40,24 @@ public class SqoopDestroyerExecutor {
* @param success True if the job execution was successfull
* @param configuration Configuration object to get destroyer class with context
* and configuration objects.
* @param propertyName Name of property that holds destroyer class.
* @param direction The direction of the Destroyer to execute.
*/
public static void executeDestroyer(boolean success, Configuration configuration, String propertyName) {
Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName));
public static void executeDestroyer(boolean success, Configuration configuration, Direction direction) {
String destroyerPropertyName, prefixPropertyName;
switch (direction) {
default:
case FROM:
destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
break;
case TO:
destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
break;
}
Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(destroyerPropertyName));
if(destroyer == null) {
LOG.info("Skipping running destroyer as non was defined.");
@ -51,16 +65,17 @@ public static void executeDestroyer(boolean success, Configuration configuration
}
// Objects that should be pass to the Destroyer execution
PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
// Propagate connector schema in every case for now
Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
// TODO(Abe/Gwen): Change to conditional choosing between schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());
destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob);
destroyer.destroy(destroyerContext, configConnection, configJob);
}
private SqoopDestroyerExecutor() {

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
@ -84,7 +85,8 @@ public void commitJob(JobContext context) throws IOException {
super.commitJob(context);
Configuration config = context.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
}
@Override
@ -92,7 +94,8 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti
super.abortJob(context, state);
Configuration config = context.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
}
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.io.SqoopWritable;
@ -67,7 +68,8 @@ public void commitJob(JobContext jobContext) throws IOException {
super.commitJob(jobContext);
Configuration config = jobContext.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO);
}
@Override
@ -75,7 +77,8 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce
super.abortJob(jobContext, state);
Configuration config = jobContext.getConfiguration();
SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM);
SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO);
}
@Override

View File

@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@ -36,6 +37,8 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Loader;
@ -100,6 +103,8 @@ public void testOutputFormat() throws Exception {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
CSVIntermediateDataFormat.class.getName());
Schema schema = new Schema("Test");
@ -110,6 +115,10 @@ public void testOutputFormat() throws Exception {
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
// Make sure both destroyers get called.
Assert.assertEquals(1, DummyFromDestroyer.count);
Assert.assertEquals(1, DummyToDestroyer.count);
}
public static class DummyPartition extends Partition {
@ -251,4 +260,24 @@ public void load(LoaderContext context, Object oc, Object oj) throws Exception{
}
}
}
public static class DummyFromDestroyer extends Destroyer {
public static int count = 0;
@Override
public void destroy(DestroyerContext context, Object o, Object o2) {
count++;
}
}
public static class DummyToDestroyer extends Destroyer {
public static int count = 0;
@Override
public void destroy(DestroyerContext context, Object o, Object o2) {
count++;
}
}
}

View File

@ -68,15 +68,12 @@ public void testStagedTransfer() throws Exception {
runJob(job);
// @TODO(Abe): Change back after SQOOP-1488
// assertEquals(0L, provider.rowCount(stageTableName));
// assertEquals(4L, rowCount());
// assertRowInCities(1, "USA", "San Francisco");
// assertRowInCities(2, "USA", "Sunnyvale");
// assertRowInCities(3, "Czech Republic", "Brno");
// assertRowInCities(4, "USA", "Palo Alto");
assertEquals(4L, provider.rowCount(stageTableName));
assertEquals(0L, rowCount());
assertEquals(0L, provider.rowCount(stageTableName));
assertEquals(4L, rowCount());
assertRowInCities(1, "USA", "San Francisco");
assertRowInCities(2, "USA", "Sunnyvale");
assertRowInCities(3, "Czech Republic", "Brno");
assertRowInCities(4, "USA", "Palo Alto");
// Clean up testing table
provider.dropTable(stageTableName);