mirror of
https://github.com/apache/sqoop.git
synced 2025-05-12 23:11:43 +08:00
SQOOP-604: Easy throttling feature for MySQL exports
(Zoltan Toth-Czifra via Abhijeet Gaikwad)
This commit is contained in:
parent
5eb987a785
commit
c499f49097
@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
|
|||||||
// Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
|
// Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
|
||||||
protected long checkpointDistInBytes;
|
protected long checkpointDistInBytes;
|
||||||
|
|
||||||
|
/** Configuration key that specifies the number of milliseconds
|
||||||
|
* to sleep at the end of each checkpoint commit
|
||||||
|
* Default is 0, no sleep.
|
||||||
|
*/
|
||||||
|
public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
|
||||||
|
"sqoop.mysql.export.sleep.ms";
|
||||||
|
|
||||||
|
public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
|
||||||
|
|
||||||
|
// Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
|
||||||
|
protected long checkpointSleepMs;
|
||||||
|
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
|
|
||||||
/** The FIFO being used to communicate with mysqlimport. */
|
/** The FIFO being used to communicate with mysqlimport. */
|
||||||
@ -314,6 +326,21 @@ protected void setup(Context context) {
|
|||||||
LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
|
LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
|
||||||
this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
|
this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.checkpointSleepMs = conf.getLong(
|
||||||
|
MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
|
||||||
|
|
||||||
|
if (this.checkpointSleepMs < 0) {
|
||||||
|
LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
|
||||||
|
this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) {
|
||||||
|
LOG.warn("Value for "
|
||||||
|
+ MYSQL_CHECKPOINT_SLEEP_KEY
|
||||||
|
+ " has to be smaller than mapred.task.timeout");
|
||||||
|
this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -347,6 +374,12 @@ protected void writeRecord(String record, String terminator)
|
|||||||
if (this.checkpointDistInBytes != 0
|
if (this.checkpointDistInBytes != 0
|
||||||
&& this.bytesWritten > this.checkpointDistInBytes) {
|
&& this.bytesWritten > this.checkpointDistInBytes) {
|
||||||
LOG.info("Checkpointing current export.");
|
LOG.info("Checkpointing current export.");
|
||||||
|
|
||||||
|
if (this.checkpointSleepMs != 0) {
|
||||||
|
LOG.info("Pausing.");
|
||||||
|
Thread.sleep(this.checkpointSleepMs);
|
||||||
|
}
|
||||||
|
|
||||||
closeExportHandles();
|
closeExportHandles();
|
||||||
initMySQLImportProcess();
|
initMySQLImportProcess();
|
||||||
this.bytesWritten = 0;
|
this.bytesWritten = 0;
|
||||||
|
Loading…
Reference in New Issue
Block a user