diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index b012d23d..0cd69698 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -139,6 +139,32 @@ public final class FrameworkManager { */ private static final Object submissionMutex = new Object(); + /** + * Base notification URL. + * + * Framework manager will always add job id. + */ + private static String notificationBaseUrl; + + /** + * Set notification base URL. + * + * @param url Base URL + */ + public static void setNotificationBaseUrl(String url) { + LOG.debug("Setting notification base URL to " + url); + notificationBaseUrl = url; + } + + /** + * Get base notification url. + * + * @return String representation of the URL + */ + public static String getNotificationBaseUrl() { + return notificationBaseUrl; + } + static { MConnectionForms connectionForms = new MConnectionForms( FormUtils.toForms(getConnectionConfigurationClass()) @@ -319,6 +345,7 @@ public static MSubmission submit(long jobId) { request.setJobType(job.getType()); request.setJobName(job.getName()); request.setJobId(job.getPersistenceId()); + request.setNotificationUrl(notificationBaseUrl + jobId); // Let's register all important jars // sqoop-common diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index 8392a10d..fb6b6a94 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -92,6 +92,10 @@ public class SubmissionRequest { */ String outputDirectory; + /** + * Optional notification URL for job progress + */ + String notificationUrl; public SubmissionRequest() { this.jars = new LinkedList(); @@ -210,4 +214,12 @@ public String getOutputDirectory() { public void setOutputDirectory(String outputDirectory) { this.outputDirectory = outputDirectory; } + + public String getNotificationUrl() { + return notificationUrl; + } + + public void setNotificationUrl(String url) { + this.notificationUrl = url; + } } diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java index e9e65511..6e541d50 100644 --- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java @@ -39,6 +39,9 @@ * DELETE /v1/submission/action/:jid * Stop last submission for job with id :jid * + * GET /v1/submission/notification/:jid + * Notification endpoint to get job status outside normal interval + * * Possible additions in the future: /v1/submission/history/* for history. */ public class SubmissionRequestHandler implements RequestHandler { @@ -65,10 +68,20 @@ public JsonBean handleEvent(RequestContext ctx) { return handleActionEvent(ctx, urlElements[length - 1]); } + if(action.equals("notification")) { + return handleNotification(ctx, urlElements[length - 1]); + } + throw new SqoopException(ServerError.SERVER_0003, "Do not know what to do."); } + private JsonBean handleNotification(RequestContext ctx, String sjid) { + logger.debug("Received notification request for job " + sjid); + FrameworkManager.status(Long.parseLong(sjid)); + return JsonBean.EMPTY_BEAN; + } + private JsonBean handleActionEvent(RequestContext ctx, String sjid) { long jid = Long.parseLong(sjid); @@ -76,6 +89,12 @@ private JsonBean handleActionEvent(RequestContext ctx, String sjid) { case GET: return submissionStatus(jid); case POST: + // TODO: This should be outsourced somewhere more suitable than here + if(FrameworkManager.getNotificationBaseUrl() == null) { + String url = ctx.getRequest().getRequestURL().toString(); + FrameworkManager.setNotificationBaseUrl( + url.split("v1")[0] + "/v1/submission/notification/"); + } return submissionSubmit(jid); case DELETE: return submissionStop(jid); diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java index ae0735ba..256262bb 100644 --- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java +++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java @@ -26,7 +26,6 @@ import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.repository.RepositoryManager; - /** * Initializes the Sqoop server. This listener is also responsible for * cleaning up any resources occupied by the server during the system shutdown. @@ -53,9 +52,9 @@ public void contextInitialized(ServletContextEvent arg0) { ConnectorManager.initialize(); FrameworkManager.initialize(); LOG.info("Sqoop server has successfully boot up"); - } catch (RuntimeException ex) { + } catch (Exception ex) { LOG.error("Server startup failure", ex); - throw ex; + throw new RuntimeException("Failure in server initialization", ex); } } } diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 68f21fd7..a64a4775 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -169,6 +169,11 @@ public boolean submit(SubmissionRequest generalRequest) { configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB, FormUtils.toJson(request.getConfigFrameworkConnection())); + // Set up notification URL if it's available + if(request.getNotificationUrl() != null) { + configuration.set("job.end.notification.url", request.getNotificationUrl()); + } + // Promote all required jars to the job StringBuilder sb = new StringBuilder(); boolean first = true;