diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index b2839822..48bfb321 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -59,6 +59,7 @@ public class SqoopOutputFormatLoadExecutor { private JobContext context; private SqoopRecordWriter writer; private Future consumerFuture; + private ExecutorService executorService; private Semaphore filled = new Semaphore(0, true); private Semaphore free = new Semaphore(1, true); private String loaderName; @@ -86,9 +87,9 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) { } public RecordWriter getRecordWriter() { - consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat - ("OutputFormatLoader-consumer").build()).submit( - new ConsumerThread(context)); + executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat + ("OutputFormatLoader-consumer").build()); + consumerFuture = executorService.submit(new ConsumerThread(context)); return writer; } @@ -162,6 +163,8 @@ private void waitForConsumer() { } throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); + } finally { + executorService.shutdownNow(); } }