From 317562642f490bf3e2a0fe201c7503faba15fa6e Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 14 Aug 2012 09:32:50 +0000 Subject: [PATCH] Reverting back accidentally committed file. git-svn-id: https://svn.apache.org/repos/asf/sqoop/trunk@1372804 13f79535-47bb-0310-9956-ffa450edef68 --- .../sqoop/mapreduce/AutoProgressMapper.java | 122 +++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java index 95086417..4b61321e 100644 --- a/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; /** * Identity mapper that continuously reports progress via a background thread. @@ -31,6 +32,25 @@ public class AutoProgressMapper public static final Log LOG = LogFactory.getLog( AutoProgressMapper.class.getName()); + /** + * Total number of millis for which progress will be reported by the + * auto-progress thread. If this is zero, then the auto-progress thread will + * never voluntarily exit. + */ + private int maxProgressPeriod; + + /** + * Number of milliseconds to sleep for between loop iterations. Must be less + * than report interval. + */ + private int sleepInterval; + + /** + * Number of milliseconds between calls to Reporter.progress(). + * Should be a multiple of the sleepInterval. + */ + private int reportInterval; + public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max"; public static final String SLEEP_INTERVAL_KEY = @@ -47,14 +67,112 @@ public class AutoProgressMapper // Disable max progress, by default. public static final int DEFAULT_MAX_PROGRESS = 0; + private class ProgressThread extends Thread { + + private volatile boolean keepGoing; // While this is true, thread runs. + + private Context context; + private long startTimeMillis; + private long lastReportMillis; + + public ProgressThread(final Context ctxt) { + this.context = ctxt; + this.keepGoing = true; + } + + public void signalShutdown() { + this.keepGoing = false; // volatile update. + this.interrupt(); + } + + public void run() { + this.lastReportMillis = System.currentTimeMillis(); + this.startTimeMillis = this.lastReportMillis; + + final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod; + final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval; + final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval; + + // In a loop: + // * Check that we haven't run for too long (maxProgressPeriod). + // * If it's been a report interval since we last made progress, + // make more. + // * Sleep for a bit. + // * If the parent thread has signaled for exit, do so. + while (this.keepGoing) { + long curTimeMillis = System.currentTimeMillis(); + + if (MAX_PROGRESS != 0 + && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) { + this.keepGoing = false; + LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + + " ms."); + break; + } + + if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) { + // It's been a full report interval -- claim progress. + LOG.debug("Auto-progress thread reporting progress"); + this.context.progress(); + this.lastReportMillis = curTimeMillis; + } + + // Unless we got an interrupt while we were working, + // sleep a bit before doing more work. + if (!Thread.interrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException ie) { + // we were notified on something; not necessarily an error. + } + } + } + + LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing); + } + } + + /** + * Set configuration parameters for the auto-progress thread. + */ + private void configureAutoProgress(Configuration job) { + this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, + DEFAULT_MAX_PROGRESS); + this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, + DEFAULT_SLEEP_INTERVAL); + this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, + DEFAULT_REPORT_INTERVAL); + + if (this.reportInterval < 1) { + LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + + DEFAULT_REPORT_INTERVAL); + this.reportInterval = DEFAULT_REPORT_INTERVAL; + } + + if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) { + LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + + DEFAULT_SLEEP_INTERVAL); + this.sleepInterval = DEFAULT_SLEEP_INTERVAL; + } + + if (this.maxProgressPeriod < 0) { + LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + + DEFAULT_MAX_PROGRESS); + this.maxProgressPeriod = DEFAULT_MAX_PROGRESS; + } + } + + // map() method intentionally omitted; Mapper.map() is the identity mapper. + /** * Run the mapping process for this task, wrapped in an auto-progress system. */ @Override public void run(Context context) throws IOException, InterruptedException { - ProgressThread thread = new ProgressThread(context, LOG); + configureAutoProgress(context.getConfiguration()); + ProgressThread thread = this.new ProgressThread(context); try { thread.setDaemon(true); @@ -73,7 +191,7 @@ public void run(Context context) throws IOException, InterruptedException { LOG.debug("Progress thread shutdown detected."); } catch (InterruptedException ie) { LOG.warn("Interrupted when waiting on auto-progress thread: " - + ie.toString(), ie); + + ie.toString()); } } }