mirror of
https://github.com/apache/sqoop.git
synced 2025-05-16 17:00:53 +08:00
SQOOP-716 Create server notification REST callback
(Jarek Jarcec Cecho)
This commit is contained in:
parent
ef12bf508f
commit
0e7451f94f
@ -139,6 +139,32 @@ public final class FrameworkManager {
|
||||
*/
|
||||
private static final Object submissionMutex = new Object();
|
||||
|
||||
/**
|
||||
* Base notification URL.
|
||||
*
|
||||
* Framework manager will always add job id.
|
||||
*/
|
||||
private static String notificationBaseUrl;
|
||||
|
||||
/**
|
||||
* Set notification base URL.
|
||||
*
|
||||
* @param url Base URL
|
||||
*/
|
||||
public static void setNotificationBaseUrl(String url) {
|
||||
LOG.debug("Setting notification base URL to " + url);
|
||||
notificationBaseUrl = url;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get base notification url.
|
||||
*
|
||||
* @return String representation of the URL
|
||||
*/
|
||||
public static String getNotificationBaseUrl() {
|
||||
return notificationBaseUrl;
|
||||
}
|
||||
|
||||
static {
|
||||
MConnectionForms connectionForms = new MConnectionForms(
|
||||
FormUtils.toForms(getConnectionConfigurationClass())
|
||||
@ -319,6 +345,7 @@ public static MSubmission submit(long jobId) {
|
||||
request.setJobType(job.getType());
|
||||
request.setJobName(job.getName());
|
||||
request.setJobId(job.getPersistenceId());
|
||||
request.setNotificationUrl(notificationBaseUrl + jobId);
|
||||
|
||||
// Let's register all important jars
|
||||
// sqoop-common
|
||||
|
@ -92,6 +92,10 @@ public class SubmissionRequest {
|
||||
*/
|
||||
String outputDirectory;
|
||||
|
||||
/**
|
||||
* Optional notification URL for job progress
|
||||
*/
|
||||
String notificationUrl;
|
||||
|
||||
public SubmissionRequest() {
|
||||
this.jars = new LinkedList<String>();
|
||||
@ -210,4 +214,12 @@ public String getOutputDirectory() {
|
||||
public void setOutputDirectory(String outputDirectory) {
|
||||
this.outputDirectory = outputDirectory;
|
||||
}
|
||||
|
||||
public String getNotificationUrl() {
|
||||
return notificationUrl;
|
||||
}
|
||||
|
||||
public void setNotificationUrl(String url) {
|
||||
this.notificationUrl = url;
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,9 @@
|
||||
* DELETE /v1/submission/action/:jid
|
||||
* Stop last submission for job with id :jid
|
||||
*
|
||||
* GET /v1/submission/notification/:jid
|
||||
* Notification endpoint to get job status outside normal interval
|
||||
*
|
||||
* Possible additions in the future: /v1/submission/history/* for history.
|
||||
*/
|
||||
public class SubmissionRequestHandler implements RequestHandler {
|
||||
@ -65,10 +68,20 @@ public JsonBean handleEvent(RequestContext ctx) {
|
||||
return handleActionEvent(ctx, urlElements[length - 1]);
|
||||
}
|
||||
|
||||
if(action.equals("notification")) {
|
||||
return handleNotification(ctx, urlElements[length - 1]);
|
||||
}
|
||||
|
||||
throw new SqoopException(ServerError.SERVER_0003,
|
||||
"Do not know what to do.");
|
||||
}
|
||||
|
||||
private JsonBean handleNotification(RequestContext ctx, String sjid) {
|
||||
logger.debug("Received notification request for job " + sjid);
|
||||
FrameworkManager.status(Long.parseLong(sjid));
|
||||
return JsonBean.EMPTY_BEAN;
|
||||
}
|
||||
|
||||
private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
|
||||
long jid = Long.parseLong(sjid);
|
||||
|
||||
@ -76,6 +89,12 @@ private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
|
||||
case GET:
|
||||
return submissionStatus(jid);
|
||||
case POST:
|
||||
// TODO: This should be outsourced somewhere more suitable than here
|
||||
if(FrameworkManager.getNotificationBaseUrl() == null) {
|
||||
String url = ctx.getRequest().getRequestURL().toString();
|
||||
FrameworkManager.setNotificationBaseUrl(
|
||||
url.split("v1")[0] + "/v1/submission/notification/");
|
||||
}
|
||||
return submissionSubmit(jid);
|
||||
case DELETE:
|
||||
return submissionStop(jid);
|
||||
|
@ -26,7 +26,6 @@
|
||||
import org.apache.sqoop.framework.FrameworkManager;
|
||||
import org.apache.sqoop.repository.RepositoryManager;
|
||||
|
||||
|
||||
/**
|
||||
* Initializes the Sqoop server. This listener is also responsible for
|
||||
* cleaning up any resources occupied by the server during the system shutdown.
|
||||
@ -53,9 +52,9 @@ public void contextInitialized(ServletContextEvent arg0) {
|
||||
ConnectorManager.initialize();
|
||||
FrameworkManager.initialize();
|
||||
LOG.info("Sqoop server has successfully boot up");
|
||||
} catch (RuntimeException ex) {
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Server startup failure", ex);
|
||||
throw ex;
|
||||
throw new RuntimeException("Failure in server initialization", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -169,6 +169,11 @@ public boolean submit(SubmissionRequest generalRequest) {
|
||||
configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
|
||||
FormUtils.toJson(request.getConfigFrameworkConnection()));
|
||||
|
||||
// Set up notification URL if it's available
|
||||
if(request.getNotificationUrl() != null) {
|
||||
configuration.set("job.end.notification.url", request.getNotificationUrl());
|
||||
}
|
||||
|
||||
// Promote all required jars to the job
|
||||
StringBuilder sb = new StringBuilder();
|
||||
boolean first = true;
|
||||
|
Loading…
Reference in New Issue
Block a user