diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index d1583275..c5f3abdd 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -175,44 +175,39 @@ private class ConsumerThread implements Runnable { @Override public void run() { - DataReader reader = new OutputFormatDataReader(); - - Configuration conf = null; - if (!isTest) { - conf = context.getConfiguration(); - - - loaderName = conf.get(JobConstants.JOB_ETL_LOADER); - } - Loader loader = (Loader) ClassUtils.instantiate(loaderName); - - // Objects that should be pass to the Executor execution - PrefixContext subContext = null; - Object configConnection = null; - Object configJob = null; - - if (!isTest) { - switch (ConfigurationUtils.getJobType(conf)) { - case EXPORT: - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnection(conf); - configJob = ConfigurationUtils.getConnectorJob(conf); - break; - case IMPORT: - subContext = new PrefixContext(conf, ""); - configConnection = ConfigurationUtils.getFrameworkConnection(conf); - configJob = ConfigurationUtils.getFrameworkJob(conf); - break; - default: - readerFinished = true; - // Release so that the writer can tell the framework something went - // wrong. - free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); - } - } - + LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); try { + DataReader reader = new OutputFormatDataReader(); + + Configuration conf = null; + if (!isTest) { + conf = context.getConfiguration(); + loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + } + Loader loader = (Loader) ClassUtils.instantiate(loaderName); + + // Objects that should be pass to the Executor execution + PrefixContext subContext = null; + Object configConnection = null; + Object configJob = null; + + if (!isTest) { + switch (ConfigurationUtils.getJobType(conf)) { + case EXPORT: + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnection(conf); + configJob = ConfigurationUtils.getConnectorJob(conf); + break; + case IMPORT: + subContext = new PrefixContext(conf, ""); + configConnection = ConfigurationUtils.getFrameworkConnection(conf); + configJob = ConfigurationUtils.getFrameworkJob(conf); + break; + default: + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + } + } + LOG.info("Running loader class " + loaderName); loader.load(subContext, configConnection, configJob, reader); LOG.info("Loader has finished");