From 190b78fcb83c4870d7dcb7f8af289047f44fefb8 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Wed, 18 Nov 2015 08:06:49 -0800 Subject: [PATCH] SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown (Dian Fu via Jarek Jarcec Cecho) --- .../org/apache/sqoop/driver/JobManager.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 15ca7968..90ee541a 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -139,6 +139,16 @@ public static void setInstance(JobManager newInstance) { */ private UpdateThread updateThread = null; + /** + * Lock for purge thread. + */ + private Object purgeThreadLock = new Object(); + + /** + * Lock for update thread. + */ + private Object updateThreadLock = new Object(); + /** * Synchronization variable between threads. */ @@ -196,20 +206,24 @@ public synchronized void destroy() { running = false; - try { - purgeThread.interrupt(); - purgeThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining purgeThread"); + synchronized(purgeThreadLock) { + try { + purgeThread.interrupt(); + purgeThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining purgeThread"); + } } - try { - updateThread.interrupt(); - updateThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining updateThread"); + synchronized(updateThreadLock) { + try { + updateThread.interrupt(); + updateThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining updateThread"); + } } if (submissionEngine != null) { @@ -763,11 +777,15 @@ public void run() { try { LOG.info("Purging old submissions"); Date threshold = new Date((new Date()).getTime() - purgeThreshold); - RepositoryManager.getInstance().getRepository() - .purgeSubmissions(threshold); + synchronized(purgeThreadLock) { + RepositoryManager.getInstance().getRepository() + .purgeSubmissions(threshold); + } Thread.sleep(purgeSleep); } catch (InterruptedException e) { LOG.debug("Purge thread interrupted", e); + } catch (SqoopException ex) { + LOG.error("Purge thread encountered exception", ex); } } @@ -787,18 +805,21 @@ public void run() { try { LOG.debug("Updating running submissions"); - // Let's get all running submissions from repository to check them out - List unfinishedSubmissions = - RepositoryManager.getInstance().getRepository() - .findUnfinishedSubmissions(); + synchronized(updateThreadLock) { + // Let's get all running submissions from repository to check them out + List unfinishedSubmissions = + RepositoryManager.getInstance().getRepository() + .findUnfinishedSubmissions(); - for (MSubmission submission : unfinishedSubmissions) { - updateSubmission(submission); + for (MSubmission submission : unfinishedSubmissions) { + updateSubmission(submission); + } } - Thread.sleep(updateSleep); } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); + LOG.debug("Update thread interrupted", e); + } catch (SqoopException ex) { + LOG.error("Update thread encountered exception", ex); } }