mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 02:52:19 +08:00
SQOOP-2343: AsyncSqlRecordWriter stucks if any exception is thrown out in its close method
(Yibing Shi via Jarek Jarcec Cecho)
This commit is contained in:
parent
fa4e903650
commit
89366b49b3
@ -61,6 +61,8 @@ public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V>
|
|||||||
private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
|
private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
|
||||||
private boolean startedExecThread;
|
private boolean startedExecThread;
|
||||||
|
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
public AsyncSqlRecordWriter(TaskAttemptContext context)
|
public AsyncSqlRecordWriter(TaskAttemptContext context)
|
||||||
throws ClassNotFoundException, SQLException {
|
throws ClassNotFoundException, SQLException {
|
||||||
this.conf = context.getConfiguration();
|
this.conf = context.getConfiguration();
|
||||||
@ -82,6 +84,8 @@ public AsyncSqlRecordWriter(TaskAttemptContext context)
|
|||||||
connection, stmtsPerTx);
|
connection, stmtsPerTx);
|
||||||
this.execThread.setDaemon(true);
|
this.execThread.setDaemon(true);
|
||||||
this.startedExecThread = false;
|
this.startedExecThread = false;
|
||||||
|
|
||||||
|
this.closed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -176,6 +180,15 @@ private void execUpdate(boolean commit, boolean stopThread)
|
|||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
public void close(TaskAttemptContext context)
|
public void close(TaskAttemptContext context)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
// If any exception is thrown out in this method, mapreduce framework catches the exception and
|
||||||
|
// calls this method again in case the recorder hasn't bee closed properly. Without the
|
||||||
|
// protection below, it can make the main thread stuck in execThread.put since there is no
|
||||||
|
// receiver for the synchronous queue any more.
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
execUpdate(true, true);
|
execUpdate(true, true);
|
||||||
|
Loading…
Reference in New Issue
Block a user