diff --git a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java index d0e17117..15a62a60 100644 --- a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java +++ b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java @@ -61,6 +61,8 @@ public abstract class AsyncSqlRecordWriter private AsyncSqlOutputFormat.AsyncSqlExecThread execThread; private boolean startedExecThread; + private boolean closed; + public AsyncSqlRecordWriter(TaskAttemptContext context) throws ClassNotFoundException, SQLException { this.conf = context.getConfiguration(); @@ -82,6 +84,8 @@ public AsyncSqlRecordWriter(TaskAttemptContext context) connection, stmtsPerTx); this.execThread.setDaemon(true); this.startedExecThread = false; + + this.closed = false; } /** @@ -176,6 +180,15 @@ private void execUpdate(boolean commit, boolean stopThread) /** {@inheritDoc} */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { + // If any exception is thrown out in this method, mapreduce framework catches the exception and + // calls this method again in case the recorder hasn't bee closed properly. Without the + // protection below, it can make the main thread stuck in execThread.put since there is no + // receiver for the synchronous queue any more. + if (closed) { + return; + } + closed = true; + try { try { execUpdate(true, true);