diff --git a/ivy.xml b/ivy.xml index fa96d135..82fe1ac2 100644 --- a/ivy.xml +++ b/ivy.xml @@ -68,5 +68,17 @@ name="avro" rev="1.0.0" conf="common->default"/> + + + diff --git a/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java b/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java new file mode 100644 index 00000000..e1691700 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/mapred/AutoProgressMapRunner.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapRunnable; +import org.apache.hadoop.mapred.MapRunner; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * MapRunnable implementation that spawns a background thread + * to periodically increment the progress of the operation. + * This is used because queries to the database can be expected + * to block for more than 10 minutes when performing the initial + * record input. + * + * The background thread can be configured to stop providing + * progress after a fixed period of time; after this time period, + * some other means (e.g., emitting records) must be used to sustain + * Mapper progress. + */ +public class AutoProgressMapRunner + extends MapRunner implements MapRunnable { + + public static final Log LOG = LogFactory.getLog(AutoProgressMapRunner.class.getName()); + + /** Total number of millis for which progress will be reported + by the auto-progress thread. If this is zero, then the auto-progress + thread will never voluntarily exit. + */ + private int maxProgressPeriod; + + /** Number of milliseconds to sleep for between loop iterations. Must be less + than report interval. + */ + private int sleepInterval; + + /** Number of milliseconds between calls to Reporter.progress(). Should be a multiple + of the sleepInterval. + */ + private int reportInterval; + + public static final String MAX_PROGRESS_PERIOD_KEY = "sqoop.mapred.auto.progress.max"; + public static final String SLEEP_INTERVAL_KEY = "sqoop.mapred.auto.progress.sleep"; + public static final String REPORT_INTERVAL_KEY = "sqoop.mapred.auto.progress.report"; + + // Sleep for 10 seconds at a time. + static final int DEFAULT_SLEEP_INTERVAL = 10000; + + // Report progress every 30 seconds. + static final int DEFAULT_REPORT_INTERVAL = 30000; + + // Disable max progress, by default. + static final int DEFAULT_MAX_PROGRESS = 0; + + private class ProgressThread extends Thread { + + private boolean keepGoing; // while this is true, thread runs. + private Reporter reporter; + private long startTimeMillis; + private long lastReportMillis; + + public ProgressThread(final Reporter r) { + this.reporter = r; + this.keepGoing = true; + } + + public void signalShutdown() { + synchronized(this) { + // Synchronize this to ensure a fence before interrupt. + this.keepGoing = false; + } + this.interrupt(); + } + + public void run() { + boolean doKeepGoing = true; + + this.lastReportMillis = System.currentTimeMillis(); + this.startTimeMillis = this.lastReportMillis; + + final long MAX_PROGRESS = AutoProgressMapRunner.this.maxProgressPeriod; + final long REPORT_INTERVAL = AutoProgressMapRunner.this.reportInterval; + final long SLEEP_INTERVAL = AutoProgressMapRunner.this.sleepInterval; + + // in a loop: + // * Check that we haven't run for too long (maxProgressPeriod) + // * If it's been a report interval since we last made progress, make more. + // * Sleep for a bit. + // * If the parent thread has signaled for exit, do so. + while (doKeepGoing) { + long curTimeMillis = System.currentTimeMillis(); + + if (MAX_PROGRESS != 0 && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) { + synchronized(this) { + this.keepGoing = false; + } + LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS + " ms."); + break; + } + + if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) { + // It's been a full report interval -- claim progress. + LOG.debug("Auto-progress thread reporting progress"); + this.reporter.progress(); + this.lastReportMillis = curTimeMillis; + } + + // Unless we got an interrupt while we were working, + // sleep a bit before doing more work. + if (!this.interrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException ie) { + // we were notified on something; not necessarily an error. + } + } + + synchronized(this) { + // Read shared field in a synchronized block. + doKeepGoing = this.keepGoing; + } + } + + LOG.info("Auto-progress thread is finished. keepGoing=" + doKeepGoing); + } + } + + public void configure(JobConf job) { + this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY, DEFAULT_MAX_PROGRESS); + this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); + this.reportInterval = job.getInt(REPORT_INTERVAL_KEY, DEFAULT_REPORT_INTERVAL); + + if (this.reportInterval < 1) { + LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to " + DEFAULT_REPORT_INTERVAL); + this.reportInterval = DEFAULT_REPORT_INTERVAL; + } + + if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) { + LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to " + DEFAULT_SLEEP_INTERVAL); + this.sleepInterval = DEFAULT_SLEEP_INTERVAL; + } + + if (this.maxProgressPeriod < 0) { + LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to " + DEFAULT_MAX_PROGRESS); + this.maxProgressPeriod = DEFAULT_MAX_PROGRESS; + } + + super.configure(job); + } + + public void run(RecordReader input, OutputCollector output, Reporter reporter) + throws IOException { + + ProgressThread thread = this.new ProgressThread(reporter); + + try { + thread.setDaemon(true); + thread.start(); + // Use default MapRunner to actually drive the mapping. + super.run(input, output, reporter); + } finally { + // Tell the progress thread to exit.. + LOG.debug("Instructing auto-progress thread to quit."); + thread.signalShutdown(); + try { + // And wait for that to happen. + LOG.debug("Waiting for progress thread shutdown..."); + thread.join(); + LOG.debug("Progress thread shutdown detected."); + } catch (InterruptedException ie) { + LOG.warn("Interrupted when waiting on auto-progress thread: " + ie.toString()); + } + } + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java b/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java index b13105b8..a8bd3e19 100644 --- a/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java +++ b/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java @@ -114,6 +114,7 @@ public void runImport(String tableName, String ormJarFile, String orderByCol, job.setNumReduceTasks(0); job.setNumMapTasks(1); job.setInputFormat(DBInputFormat.class); + job.setMapRunnerClass(AutoProgressMapRunner.class); FileOutputFormat.setOutputPath(job, outputPath); @@ -130,7 +131,7 @@ public void runImport(String tableName, String ormJarFile, String orderByCol, if (null == colNames) { colNames = mgr.getColumnNames(tableName); } - + // It's ok if the where clause is null in DBInputFormat.setInput. String whereClause = options.getWhereClause(); diff --git a/src/test/org/apache/hadoop/sqoop/AllTests.java b/src/test/org/apache/hadoop/sqoop/AllTests.java index 303bf316..213fb0ef 100644 --- a/src/test/org/apache/hadoop/sqoop/AllTests.java +++ b/src/test/org/apache/hadoop/sqoop/AllTests.java @@ -23,6 +23,7 @@ import org.apache.hadoop.sqoop.lib.TestRecordParser; import org.apache.hadoop.sqoop.manager.TestHsqldbManager; import org.apache.hadoop.sqoop.manager.TestSqlManager; +import org.apache.hadoop.sqoop.mapred.TestAutoProgressMapRunner; import org.apache.hadoop.sqoop.orm.TestClassWriter; import org.apache.hadoop.sqoop.orm.TestParseMethods; @@ -41,6 +42,7 @@ private AllTests() { } public static Test suite() { TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop"); + suite.addTestSuite(TestAutoProgressMapRunner.class); suite.addTestSuite(TestAllTables.class); suite.addTestSuite(TestHsqldbManager.class); suite.addTestSuite(TestSqlManager.class); diff --git a/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java b/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java new file mode 100644 index 00000000..7b236e68 --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/mapred/TestAutoProgressMapRunner.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.manager.ManagerFactory; + +import junit.framework.TestCase; + +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Test the AutoProgressMapRunner implementation and prove that it makes + * progress updates when the mapper itself isn't. + */ +public class TestAutoProgressMapRunner extends TestCase { + + /** Parameter: how long should each map() call sleep for? */ + public static final String MAPPER_SLEEP_INTERVAL_KEY = "sqoop.test.mapper.sleep.ival"; + + + /** Mapper that just sleeps for a configurable amount of time. */ + public static class SleepingMapper extends MapReduceBase + implements Mapper { + + private int sleepInterval; + + public void configure(JobConf job) { + this.sleepInterval = job.getInt(MAPPER_SLEEP_INTERVAL_KEY, 100); + } + + public void map(K1 k, V1 v, OutputCollector out, Reporter r) throws IOException { + while (true) { + try { + Thread.sleep(this.sleepInterval); + break; + } catch (InterruptedException ie) { + } + } + } + } + + /** Mapper that sleeps for 1 second then fails. */ + public static class FailingMapper extends MapReduceBase + implements Mapper { + + public void map(K1 k, V1 v, OutputCollector out, Reporter r) throws IOException { + throw new IOException("Causing job failure."); + } + } + + private final Path inPath = new Path("./input"); + private final Path outPath = new Path("./output"); + + private MiniMRCluster mr = null; + private FileSystem fs = null; + + private final static int NUM_NODES = 1; + + public void setUp() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + fs = FileSystem.get(conf); + mr = new MiniMRCluster(NUM_NODES, fs.getUri().toString(), 1, null, null, new JobConf(conf)); + + // Create a file to use as a dummy input + DataOutputStream os = fs.create(new Path(inPath, "part-0")); + os.writeBytes("This is a line of text."); + os.close(); + } + + public void tearDown() throws IOException { + if (null != fs) { + fs.delete(inPath, true); + fs.delete(outPath, true); + } + + if (null != mr) { + mr.shutdown(); + this.mr = null; + } + } + + + /** + * Test that even if the mapper just sleeps, the auto-progress thread keeps it all alive + */ + public void testBackgroundProgress() throws IOException { + // Report progress every 2.5 seconds. + final int REPORT_INTERVAL = 2500; + + // Tasks need to report progress once every ten seconds. + final int TASK_KILL_TIMEOUT = 4 * REPORT_INTERVAL; + + // Create and run the job. + JobConf job = mr.createJobConf(); + + // Set the task timeout to be pretty strict. + job.setInt("mapred.task.timeout", TASK_KILL_TIMEOUT); + + // Set the mapper itself to block for long enough that it should be killed on its own. + job.setInt(MAPPER_SLEEP_INTERVAL_KEY, 2 * TASK_KILL_TIMEOUT); + + // Report progress frequently.. + job.setInt(AutoProgressMapRunner.SLEEP_INTERVAL_KEY, REPORT_INTERVAL); + job.setInt(AutoProgressMapRunner.REPORT_INTERVAL_KEY, REPORT_INTERVAL); + + job.setMapRunnerClass(AutoProgressMapRunner.class); + job.setMapperClass(SleepingMapper.class); + + job.setNumReduceTasks(0); + job.setNumMapTasks(1); + + FileInputFormat.addInputPath(job, inPath); + FileOutputFormat.setOutputPath(job, outPath); + + RunningJob runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + + assertEquals("Sleep job failed!", JobStatus.SUCCEEDED, runningJob.getJobState()); + } + + /** Test that if the mapper bails early, we shut down the progress thread + in a timely fashion. + */ + public void testEarlyExit() throws IOException { + JobConf job = mr.createJobConf(); + + final int REPORT_INTERVAL = 30000; + + job.setInt(AutoProgressMapRunner.SLEEP_INTERVAL_KEY, REPORT_INTERVAL); + job.setInt(AutoProgressMapRunner.REPORT_INTERVAL_KEY, REPORT_INTERVAL); + + job.setNumReduceTasks(0); + job.setNumMapTasks(1); + + job.setInt("mapred.map.max.attempts", 1); + + job.setMapRunnerClass(AutoProgressMapRunner.class); + job.setMapperClass(FailingMapper.class); + + FileInputFormat.addInputPath(job, inPath); + FileOutputFormat.setOutputPath(job, outPath); + + RunningJob runningJob = null; + long startTime = System.currentTimeMillis(); + try { + runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + assertEquals("Failing job succeded!", JobStatus.FAILED, runningJob.getJobState()); + } catch(IOException ioe) { + // Expected + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + assertTrue("Job took too long to clean up (" + duration + ")", + duration < (REPORT_INTERVAL * 2)); + } +}