5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 06:12:26 +08:00

SQOOP-723: Add possibility to have synchronous submissions in Sqoop 2

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-04-13 12:44:50 -07:00
parent 135f00cf12
commit 6ceb22497c
3 changed files with 41 additions and 1 deletions

View File

@ -43,6 +43,8 @@ public class Constants {
public static final String OPT_SERVER = "server";
public static final String OPT_CLIENT = "client";
public static final String OPT_PROTOCOL = "protocol";
public static final String OPT_SYNCHRONOUS = "synchronous";
public static final String OPT_POLL_TIMEOUT = "poll-timeout";
public static final char OPT_XID_CHAR = 'x';
public static final char OPT_ALL_CHAR = 'a';
@ -295,6 +297,10 @@ public class Constants {
public static final String RES_SUBMISSION_USAGE =
"submission.usage";
public static final String RES_PROMPT_SYNCHRONOUS =
"submission.prompt_synchronous";
public static final String RES_PROMPT_POLL_TIMEOUT =
"submission.prompt_poll_timeout";
public static final String RES_UPDATE_USAGE =
"update.usage";

View File

@ -19,6 +19,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.core.Constants;
import org.apache.sqoop.client.utils.SubmissionDisplayer;
import org.apache.sqoop.model.MSubmission;
@ -29,6 +30,9 @@
*
*/
public class SubmissionStartFunction extends SqoopFunction {
public static final Logger LOG = Logger.getLogger(SubmissionStartFunction.class);
public static final long POLL_TIMEOUT = 10000;
@SuppressWarnings("static-access")
public SubmissionStartFunction() {
this.addOption(OptionBuilder
@ -36,6 +40,15 @@ public SubmissionStartFunction() {
.withLongOpt(Constants.OPT_JID)
.hasArg()
.create(Constants.OPT_JID_CHAR));
this.addOption(OptionBuilder
.withDescription(resourceString(Constants.RES_PROMPT_SYNCHRONOUS))
.withLongOpt(Constants.OPT_SYNCHRONOUS)
.create());
this.addOption(OptionBuilder
.withDescription(resourceString(Constants.RES_PROMPT_POLL_TIMEOUT))
.withLongOpt(Constants.OPT_POLL_TIMEOUT)
.hasArg()
.create());
}
public Object executeFunction(CommandLine line) {
@ -46,6 +59,25 @@ public Object executeFunction(CommandLine line) {
MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID));
SubmissionDisplayer.display(submission);
// Poll until finished
if (line.hasOption(Constants.OPT_SYNCHRONOUS)) {
long pollTimeout = POLL_TIMEOUT;
if (line.hasOption(Constants.OPT_POLL_TIMEOUT)) {
pollTimeout = Long.getLong(line.getOptionValue(Constants.OPT_POLL_TIMEOUT)).longValue();
}
while (submission.getStatus().isRunning()) {
submission = client.getSubmissionStatus(getLong(line, Constants.OPT_JID));
SubmissionDisplayer.display(submission);
// Wait some time
try {
Thread.sleep(pollTimeout);
} catch (InterruptedException e) {
LOG.error("Could not sleep");
}
}
}
return null;
}
}

View File

@ -164,6 +164,8 @@ sqoop.prompt_shell_loadrc = Loading resource file {0}
sqoop.prompt_shell_loadedrc = Resource file loaded.
submission.usage = Usage: submission {0}
submission.prompt_synchronous = Wait for submission to finish
submission.prompt_poll_timeout = How often the client should communicate with the server in milliseconds (Default: 10000)
# Various Table headers
table.header.id = Id
@ -190,4 +192,4 @@ formdisplayer.possible_values = Possible values
formdisplayer.unsupported_datatype = Unsupported data type
formdisplayer.input_sensitive = This input is sensitive
formdisplayer.warning_message = There were warnings while create or update, but saved successfully.
formdisplayer.warning_message = There were warnings while create or update, but saved successfully.