mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 07:42:00 +08:00
SQOOP-128. Allow custom export writers to cleanup.
This change allows customized ExportRecordWriters the opportunity to execute code after the last commit is performed and before the database connection is closed. (Guy le Mar via Arvind Prabhakar) From: Arvind Prabhakar <arvind@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149991 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5dd36c62da
commit
bf0deb178e
36
src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java
Normal file → Executable file
36
src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java
Normal file → Executable file
@ -112,22 +112,36 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
||||
*/
|
||||
public static class AsyncDBOperation {
|
||||
private final PreparedStatement stmt;
|
||||
private final boolean commitAndClose;
|
||||
private final boolean isBatch;
|
||||
private final boolean commit;
|
||||
private final boolean stopThread;
|
||||
|
||||
@Deprecated
|
||||
/** Do not use AsyncDBOperation(PreparedStatement s, boolean
|
||||
* commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement
|
||||
* s, boolean batch, boolean commit, boolean stopThread) instead.
|
||||
*/
|
||||
public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
|
||||
boolean batch) {
|
||||
this(s, batch, commitAndClose, commitAndClose);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an asynchronous database operation.
|
||||
* @param s the statement, if any, to execute.
|
||||
* @param commitAndClose if true, the current transaction should be
|
||||
* committed, and the executor thread should stop after this operation.
|
||||
* @param batch is true if this is a batch PreparedStatement, or false
|
||||
* if it's a normal singleton statement.
|
||||
* @param commit is true if this statement should be committed to the
|
||||
* database.
|
||||
* @param stopThread if true, the executor thread should stop after this
|
||||
* operation.
|
||||
*/
|
||||
public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
|
||||
boolean batch) {
|
||||
public AsyncDBOperation(PreparedStatement s, boolean batch,
|
||||
boolean commit, boolean stopThread) {
|
||||
this.stmt = s;
|
||||
this.commitAndClose = commitAndClose;
|
||||
this.isBatch = batch;
|
||||
this.commit = commit;
|
||||
this.stopThread = stopThread;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -142,14 +156,14 @@ public PreparedStatement getStatement() {
|
||||
* If getStatement() is non-null, the statement is run first.
|
||||
*/
|
||||
public boolean requiresCommit() {
|
||||
return this.commitAndClose;
|
||||
return this.commit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the executor should stop after this command.
|
||||
*/
|
||||
public boolean stop() {
|
||||
return this.commitAndClose;
|
||||
return this.stopThread;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -246,12 +260,6 @@ public void run() {
|
||||
// Always check whether we should end the loop, regardless
|
||||
// of the presence of an exception.
|
||||
if (op.stop()) {
|
||||
// Don't continue processing after this operation.
|
||||
try {
|
||||
conn.close();
|
||||
} catch (SQLException sqlE) {
|
||||
setLastError(sqlE);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} // try .. catch .. finally.
|
||||
|
45
src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
Normal file → Executable file
45
src/shims/common/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
Normal file → Executable file
@ -131,7 +131,7 @@ protected abstract PreparedStatement getPreparedStatement(
|
||||
* @param closeConn if true, commits the transaction and closes the
|
||||
* connection.
|
||||
*/
|
||||
private void execUpdate(boolean closeConn)
|
||||
private void execUpdate(boolean commit, boolean stopThread)
|
||||
throws InterruptedException, SQLException {
|
||||
|
||||
if (!startedExecThread) {
|
||||
@ -150,8 +150,8 @@ private void execUpdate(boolean closeConn)
|
||||
// Pass this operation off to the update thread. This will block if
|
||||
// the update thread is already performing an update.
|
||||
AsyncSqlOutputFormat.AsyncDBOperation op =
|
||||
new AsyncSqlOutputFormat.AsyncDBOperation(stmt, closeConn,
|
||||
isBatchExec());
|
||||
new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(),
|
||||
commit, stopThread);
|
||||
execThread.put(op);
|
||||
successfulPut = true; // op has been posted to the other thread.
|
||||
} finally {
|
||||
@ -173,22 +173,35 @@ private void execUpdate(boolean closeConn)
|
||||
public void close(TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
execUpdate(true);
|
||||
} catch (SQLException sqle) {
|
||||
throw new IOException(sqle);
|
||||
} finally {
|
||||
execThread.join();
|
||||
}
|
||||
try {
|
||||
execUpdate(true, true);
|
||||
} catch (SQLException sqle) {
|
||||
throw new IOException(sqle);
|
||||
} finally {
|
||||
execThread.join();
|
||||
}
|
||||
|
||||
// If we're not leaving on an error return path already,
|
||||
// now that execThread is definitely stopped, check that the
|
||||
// error slot remains empty.
|
||||
SQLException lastErr = execThread.getLastError();
|
||||
if (null != lastErr) {
|
||||
throw new IOException(lastErr);
|
||||
// If we're not leaving on an error return path already,
|
||||
// now that execThread is definitely stopped, check that the
|
||||
// error slot remains empty.
|
||||
SQLException lastErr = execThread.getLastError();
|
||||
if (null != lastErr) {
|
||||
throw new IOException(lastErr);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
closeConnection(context);
|
||||
} catch (SQLException sqle) {
|
||||
throw new IOException(sqle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void closeConnection(TaskAttemptContext context)
|
||||
throws SQLException {
|
||||
this.connection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
/** {@inheritDoc} */
|
||||
public void write(K key, V value)
|
||||
@ -196,7 +209,7 @@ public void write(K key, V value)
|
||||
try {
|
||||
records.add((SqoopRecord) key.clone());
|
||||
if (records.size() >= this.rowsPerStmt) {
|
||||
execUpdate(false);
|
||||
execUpdate(false, false);
|
||||
}
|
||||
} catch (CloneNotSupportedException cnse) {
|
||||
throw new IOException("Could not buffer record", cnse);
|
||||
|
Loading…
Reference in New Issue
Block a user