diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java old mode 100644 new mode 100755 index c5353091..8fcef9d1 --- a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java @@ -112,22 +112,36 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) */ public static class AsyncDBOperation { private final PreparedStatement stmt; - private final boolean commitAndClose; private final boolean isBatch; + private final boolean commit; + private final boolean stopThread; + + @Deprecated + /** Do not use AsyncDBOperation(PreparedStatement s, boolean + * commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement + * s, boolean batch, boolean commit, boolean stopThread) instead. + */ + public AsyncDBOperation(PreparedStatement s, boolean commitAndClose, + boolean batch) { + this(s, batch, commitAndClose, commitAndClose); + } /** * Create an asynchronous database operation. * @param s the statement, if any, to execute. - * @param commitAndClose if true, the current transaction should be - * committed, and the executor thread should stop after this operation. * @param batch is true if this is a batch PreparedStatement, or false * if it's a normal singleton statement. + * @param commit is true if this statement should be committed to the + * database. + * @param stopThread if true, the executor thread should stop after this + * operation. */ - public AsyncDBOperation(PreparedStatement s, boolean commitAndClose, - boolean batch) { + public AsyncDBOperation(PreparedStatement s, boolean batch, + boolean commit, boolean stopThread) { this.stmt = s; - this.commitAndClose = commitAndClose; this.isBatch = batch; + this.commit = commit; + this.stopThread = stopThread; } /** @@ -142,14 +156,14 @@ public PreparedStatement getStatement() { * If getStatement() is non-null, the statement is run first. */ public boolean requiresCommit() { - return this.commitAndClose; + return this.commit; } /** * @return true if the executor should stop after this command. */ public boolean stop() { - return this.commitAndClose; + return this.stopThread; } /** @@ -246,12 +260,6 @@ public void run() { // Always check whether we should end the loop, regardless // of the presence of an exception. if (op.stop()) { - // Don't continue processing after this operation. - try { - conn.close(); - } catch (SQLException sqlE) { - setLastError(sqlE); - } return; } } // try .. catch .. finally. diff --git a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java old mode 100644 new mode 100755 index baf68929..da23bbc4 --- a/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java +++ b/src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java @@ -131,7 +131,7 @@ protected abstract PreparedStatement getPreparedStatement( * @param closeConn if true, commits the transaction and closes the * connection. */ - private void execUpdate(boolean closeConn) + private void execUpdate(boolean commit, boolean stopThread) throws InterruptedException, SQLException { if (!startedExecThread) { @@ -150,8 +150,8 @@ private void execUpdate(boolean closeConn) // Pass this operation off to the update thread. This will block if // the update thread is already performing an update. AsyncSqlOutputFormat.AsyncDBOperation op = - new AsyncSqlOutputFormat.AsyncDBOperation(stmt, closeConn, - isBatchExec()); + new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(), + commit, stopThread); execThread.put(op); successfulPut = true; // op has been posted to the other thread. } finally { @@ -173,22 +173,35 @@ private void execUpdate(boolean closeConn) public void close(TaskAttemptContext context) throws IOException, InterruptedException { try { - execUpdate(true); - } catch (SQLException sqle) { - throw new IOException(sqle); - } finally { - execThread.join(); - } + try { + execUpdate(true, true); + } catch (SQLException sqle) { + throw new IOException(sqle); + } finally { + execThread.join(); + } - // If we're not leaving on an error return path already, - // now that execThread is definitely stopped, check that the - // error slot remains empty. - SQLException lastErr = execThread.getLastError(); - if (null != lastErr) { - throw new IOException(lastErr); + // If we're not leaving on an error return path already, + // now that execThread is definitely stopped, check that the + // error slot remains empty. + SQLException lastErr = execThread.getLastError(); + if (null != lastErr) { + throw new IOException(lastErr); + } + } finally { + try { + closeConnection(context); + } catch (SQLException sqle) { + throw new IOException(sqle); + } } } + public void closeConnection(TaskAttemptContext context) + throws SQLException { + this.connection.close(); + } + @Override /** {@inheritDoc} */ public void write(K key, V value) @@ -196,7 +209,7 @@ public void write(K key, V value) try { records.add((SqoopRecord) key.clone()); if (records.size() >= this.rowsPerStmt) { - execUpdate(false); + execUpdate(false, false); } } catch (CloneNotSupportedException cnse) { throw new IOException("Could not buffer record", cnse);