mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 01:00:46 +08:00
SQOOP-1916: Sqoop2: Yarn child leaking in integration tests
(Scott Kuehn via Abraham Fine)
This commit is contained in:
parent
1bd26e7f31
commit
c74dda1b21
@ -59,6 +59,7 @@ public class SqoopOutputFormatLoadExecutor {
|
|||||||
private JobContext context;
|
private JobContext context;
|
||||||
private SqoopRecordWriter writer;
|
private SqoopRecordWriter writer;
|
||||||
private Future<?> consumerFuture;
|
private Future<?> consumerFuture;
|
||||||
|
private ExecutorService executorService;
|
||||||
private Semaphore filled = new Semaphore(0, true);
|
private Semaphore filled = new Semaphore(0, true);
|
||||||
private Semaphore free = new Semaphore(1, true);
|
private Semaphore free = new Semaphore(1, true);
|
||||||
private String loaderName;
|
private String loaderName;
|
||||||
@ -86,9 +87,9 @@ public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
|
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
|
||||||
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
|
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
|
||||||
("OutputFormatLoader-consumer").build()).submit(
|
("OutputFormatLoader-consumer").build());
|
||||||
new ConsumerThread(context));
|
consumerFuture = executorService.submit(new ConsumerThread(context));
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,6 +163,8 @@ private void waitForConsumer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
|
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
|
||||||
|
} finally {
|
||||||
|
executorService.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user