diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java index a4e8b881..dc1c1263 100644 --- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java @@ -64,6 +64,18 @@ public class MySQLExportMapper // Configured value for MSYQL_CHECKPOINT_BYTES_KEY. protected long checkpointDistInBytes; + /** Configuration key that specifies the number of milliseconds + * to sleep at the end of each checkpoint commit + * Default is 0, no sleep. + */ + public static final String MYSQL_CHECKPOINT_SLEEP_KEY = + "sqoop.mysql.export.sleep.ms"; + + public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0; + + // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY. + protected long checkpointSleepMs; + protected Configuration conf; /** The FIFO being used to communicate with mysqlimport. */ @@ -314,6 +326,21 @@ protected void setup(Context context) { LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY); this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES; } + + this.checkpointSleepMs = conf.getLong( + MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS); + + if (this.checkpointSleepMs < 0) { + LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY); + this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; + } + + if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) { + LOG.warn("Value for " + + MYSQL_CHECKPOINT_SLEEP_KEY + + " has to be smaller than mapred.task.timeout"); + this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; + } } /** @@ -347,6 +374,12 @@ protected void writeRecord(String record, String terminator) if (this.checkpointDistInBytes != 0 && this.bytesWritten > this.checkpointDistInBytes) { LOG.info("Checkpointing current export."); + + if (this.checkpointSleepMs != 0) { + LOG.info("Pausing."); + Thread.sleep(this.checkpointSleepMs); + } + closeExportHandles(); initMySQLImportProcess(); this.bytesWritten = 0;