5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-09 20:42:12 +08:00

SQOOP-813: LoaderExecutor might get into deadlock when exception is raised outside Loader itself

(Jarek Jarcec Cecho via Cheolsoo Park)
This commit is contained in:
Cheolsoo Park 2013-01-22 16:06:29 -08:00
parent 612060139a
commit 5be8eb6808

View File

@ -175,44 +175,39 @@ private class ConsumerThread implements Runnable {
@Override
public void run() {
DataReader reader = new OutputFormatDataReader();
Configuration conf = null;
if (!isTest) {
conf = context.getConfiguration();
loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
}
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
// Objects that should be pass to the Executor execution
PrefixContext subContext = null;
Object configConnection = null;
Object configJob = null;
if (!isTest) {
switch (ConfigurationUtils.getJobType(conf)) {
case EXPORT:
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnection(conf);
configJob = ConfigurationUtils.getConnectorJob(conf);
break;
case IMPORT:
subContext = new PrefixContext(conf, "");
configConnection = ConfigurationUtils.getFrameworkConnection(conf);
configJob = ConfigurationUtils.getFrameworkJob(conf);
break;
default:
readerFinished = true;
// Release so that the writer can tell the framework something went
// wrong.
free.release();
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
}
}
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
DataReader reader = new OutputFormatDataReader();
Configuration conf = null;
if (!isTest) {
conf = context.getConfiguration();
loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
}
Loader loader = (Loader) ClassUtils.instantiate(loaderName);
// Objects that should be pass to the Executor execution
PrefixContext subContext = null;
Object configConnection = null;
Object configJob = null;
if (!isTest) {
switch (ConfigurationUtils.getJobType(conf)) {
case EXPORT:
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnection(conf);
configJob = ConfigurationUtils.getConnectorJob(conf);
break;
case IMPORT:
subContext = new PrefixContext(conf, "");
configConnection = ConfigurationUtils.getFrameworkConnection(conf);
configJob = ConfigurationUtils.getFrameworkJob(conf);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
}
}
LOG.info("Running loader class " + loaderName);
loader.load(subContext, configConnection, configJob, reader);
LOG.info("Loader has finished");