5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 11:21:39 +08:00

SQOOP-985: Sqoop2: Introduce synchronous job submission to Client API

(Vasanth kumar RJ via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-05-20 06:58:55 -07:00
parent 61335e6e82
commit cd4a822ab7
6 changed files with 144 additions and 17 deletions

View File

@ -17,7 +17,9 @@
*/
package org.apache.sqoop.client;
import org.apache.sqoop.client.core.ClientError;
import org.apache.sqoop.client.request.SqoopRequests;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.FrameworkBean;
import org.apache.sqoop.json.ValidationBean;
@ -78,6 +80,15 @@ public class SqoopClient {
*/
private MFramework framework;
/**
* Status flags used when updating the submission callback status
*/
private enum SubmissionStatus {
SUBMITTED,
UPDATED,
FINISHED
};
public SqoopClient(String serverUrl) {
requests = new SqoopRequests();
setServerUrl(serverUrl);
@ -353,6 +364,60 @@ public MSubmission startSubmission(long jid) {
return requests.createSubmission(jid).getSubmission();
}
/**
* Method used for synchronous job submission.
* Pass null to callback parameter if submission status is not required and after completion
* job execution returns MSubmission which contains final status of submission.
* @param jid - Job ID
* @param callback - User may set null if submission status is not required, else callback methods invoked
* @param pollTime - Server poll time
* @return MSubmission - Final status of job submission
* @throws InterruptedException
*/
public MSubmission startSubmission(long jid, SubmissionCallback callback, long pollTime) throws InterruptedException {
if(pollTime <= 0) {
throw new SqoopException(ClientError.CLIENT_0008);
}
boolean first = true;
MSubmission submission = requests.createSubmission(jid).getSubmission();
while(submission.getStatus().isRunning()) {
if(first) {
submissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
first = false;
} else {
submissionCallback(callback, submission, SubmissionStatus.UPDATED);
}
Thread.sleep(pollTime);
submission = getSubmissionStatus(jid);
}
submissionCallback(callback, submission, SubmissionStatus.FINISHED);
return submission;
}
/**
* Invokes the callback's methods with MSubmission object
* based on SubmissionStatus. If callback is null, no operation performed.
* @param callback
* @param submission
* @param status
*/
private void submissionCallback(SubmissionCallback callback,
MSubmission submission, SubmissionStatus status) {
if(callback == null) {
return;
}
switch (status) {
case SUBMITTED:
callback.submitted(submission);
break;
case UPDATED:
callback.updated(submission);
break;
case FINISHED:
callback.finished(submission);
}
}
/**
* Stop job with given id.
*

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.client;
import org.apache.sqoop.model.MSubmission;
/**
* Callback interface for Synchronous job submission
*/
public interface SubmissionCallback {
/**
* Invoked when job successfully submitted
* @param submission
*/
public void submitted(MSubmission submission);
/**
* Invoked with updated submission status when sqoop server is polled for update
* @param submission
*/
public void updated(MSubmission submission);
/**
* Invoked when job execution is finished
* @param submission
*/
public void finished(MSubmission submission);
}

View File

@ -45,6 +45,8 @@ public enum ClientError implements ErrorCode {
/** Command not compatible with batch mode */
CLIENT_0007("Command not compatible with batch mode"),
/** Polling time of submission status cannot be negative */
CLIENT_0008("Polling time of submission status cannot be negative"),
;
private final String message;

View File

@ -64,6 +64,8 @@ public class Constants {
public static final char OPT_SERVER_CHAR = 's';
public static final char OPT_CLIENT_CHAR = 'c';
public static final char OPT_PROTOCOL_CHAR = 'p';
public static final char OPT_SYNCHRONOUS_CHAR = 's';
public static final char OPT_POLL_TIMEOUT_CHAR = 'p';
// Resource keys for various commands, command options,
// functions and descriptions

View File

@ -20,6 +20,7 @@
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.sqoop.client.SubmissionCallback;
import org.apache.sqoop.client.core.Constants;
import org.apache.sqoop.client.utils.SubmissionDisplayer;
import org.apache.sqoop.model.MSubmission;
@ -43,12 +44,12 @@ public SubmissionStartFunction() {
this.addOption(OptionBuilder
.withDescription(resourceString(Constants.RES_PROMPT_SYNCHRONOUS))
.withLongOpt(Constants.OPT_SYNCHRONOUS)
.create());
.create(Constants.OPT_SYNCHRONOUS_CHAR));
this.addOption(OptionBuilder
.withDescription(resourceString(Constants.RES_PROMPT_POLL_TIMEOUT))
.withLongOpt(Constants.OPT_POLL_TIMEOUT)
.hasArg()
.create());
.create(Constants.OPT_POLL_TIMEOUT_CHAR));
}
public Object executeFunction(CommandLine line) {
@ -57,26 +58,36 @@ public Object executeFunction(CommandLine line) {
return null;
}
MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID));
SubmissionDisplayer.display(submission);
// Poll until finished
if (line.hasOption(Constants.OPT_SYNCHRONOUS)) {
long pollTimeout = POLL_TIMEOUT;
if (line.hasOption(Constants.OPT_POLL_TIMEOUT)) {
pollTimeout = Long.getLong(line.getOptionValue(Constants.OPT_POLL_TIMEOUT)).longValue();
}
while (submission.getStatus().isRunning()) {
submission = client.getSubmissionStatus(getLong(line, Constants.OPT_JID));
SubmissionDisplayer.display(submission);
// Wait some time
try {
Thread.sleep(pollTimeout);
} catch (InterruptedException e) {
LOG.error("Could not sleep");
SubmissionCallback callback = new SubmissionCallback() {
@Override
public void submitted(MSubmission submission) {
SubmissionDisplayer.display(submission);
}
@Override
public void updated(MSubmission submission) {
SubmissionDisplayer.display(submission);
}
@Override
public void finished(MSubmission submission) {
SubmissionDisplayer.display(submission);
}
};
if (line.hasOption(Constants.OPT_POLL_TIMEOUT)) {
pollTimeout = getLong(line,Constants.OPT_POLL_TIMEOUT);
}
try {
client.startSubmission(getLong(line, Constants.OPT_JID), callback, pollTimeout);
} catch (InterruptedException e) {
LOG.error("Could not sleep");
}
} else {
MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID));
SubmissionDisplayer.display(submission);
}
return null;
}

View File

@ -290,6 +290,9 @@ Job submission requires a job id. On successful submission, getStatus() method r
//Stop a running job
submission.stopSubmission(jid);
Above code block, job submission is asynchronous. For synchronous job submission, use startSubmission(jid, callback, pollTime) method. If user is not interested in getting submission status, then invoke method with null for callback parameter and returns final submission status. Polltime is request interval for getting submission status from sqoop server and value should be greater than zero. Frequently hit the sqoop server if the low value is set to pollTime.
When a synchronous job is submission started with callback, first invokes the callback's submitted(MSubmission) method on successful submission, after every poll time interval invokes updated(MSubmission) and finally on finished executing the job invokes finished(MSubmission) method.
Describe Forms
==========================