5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 04:22:49 +08:00

SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown

(Dian Fu via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-11-18 08:06:49 -08:00
parent 4f0e28625a
commit 190b78fcb8

View File

@ -139,6 +139,16 @@ public static void setInstance(JobManager newInstance) {
*/ */
private UpdateThread updateThread = null; private UpdateThread updateThread = null;
/**
* Lock for purge thread.
*/
private Object purgeThreadLock = new Object();
/**
* Lock for update thread.
*/
private Object updateThreadLock = new Object();
/** /**
* Synchronization variable between threads. * Synchronization variable between threads.
*/ */
@ -196,20 +206,24 @@ public synchronized void destroy() {
running = false; running = false;
try { synchronized(purgeThreadLock) {
purgeThread.interrupt(); try {
purgeThread.join(); purgeThread.interrupt();
} catch (InterruptedException e) { purgeThread.join();
// TODO(jarcec): Do I want to wait until it actually finish here? } catch (InterruptedException e) {
LOG.error("Interrupted joining purgeThread"); // TODO(jarcec): Do I want to wait until it actually finish here?
LOG.error("Interrupted joining purgeThread");
}
} }
try { synchronized(updateThreadLock) {
updateThread.interrupt(); try {
updateThread.join(); updateThread.interrupt();
} catch (InterruptedException e) { updateThread.join();
// TODO(jarcec): Do I want to wait until it actually finish here? } catch (InterruptedException e) {
LOG.error("Interrupted joining updateThread"); // TODO(jarcec): Do I want to wait until it actually finish here?
LOG.error("Interrupted joining updateThread");
}
} }
if (submissionEngine != null) { if (submissionEngine != null) {
@ -763,11 +777,15 @@ public void run() {
try { try {
LOG.info("Purging old submissions"); LOG.info("Purging old submissions");
Date threshold = new Date((new Date()).getTime() - purgeThreshold); Date threshold = new Date((new Date()).getTime() - purgeThreshold);
RepositoryManager.getInstance().getRepository() synchronized(purgeThreadLock) {
.purgeSubmissions(threshold); RepositoryManager.getInstance().getRepository()
.purgeSubmissions(threshold);
}
Thread.sleep(purgeSleep); Thread.sleep(purgeSleep);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Purge thread interrupted", e); LOG.debug("Purge thread interrupted", e);
} catch (SqoopException ex) {
LOG.error("Purge thread encountered exception", ex);
} }
} }
@ -787,18 +805,21 @@ public void run() {
try { try {
LOG.debug("Updating running submissions"); LOG.debug("Updating running submissions");
// Let's get all running submissions from repository to check them out synchronized(updateThreadLock) {
List<MSubmission> unfinishedSubmissions = // Let's get all running submissions from repository to check them out
RepositoryManager.getInstance().getRepository() List<MSubmission> unfinishedSubmissions =
.findUnfinishedSubmissions(); RepositoryManager.getInstance().getRepository()
.findUnfinishedSubmissions();
for (MSubmission submission : unfinishedSubmissions) { for (MSubmission submission : unfinishedSubmissions) {
updateSubmission(submission); updateSubmission(submission);
}
} }
Thread.sleep(updateSleep); Thread.sleep(updateSleep);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Purge thread interrupted", e); LOG.debug("Update thread interrupted", e);
} catch (SqoopException ex) {
LOG.error("Update thread encountered exception", ex);
} }
} }