diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java index 33a0c3cf..e139132d 100644 --- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java +++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java @@ -81,6 +81,8 @@ public class SqoopClient { /** * Status flags used when updating the submission callback status */ + //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): Why do wee need a duplicate status enum in client when shell is using the server status? + // NOTE: the getStatus method is on the job resource and this needs to be revisited private enum SubmissionStatus { SUBMITTED, UPDATED, @@ -438,8 +440,8 @@ public void deleteJob(long jobId) { * @param jobId Job id * @return */ - public MSubmission startSubmission(long jobId) { - return resourceRequests.createSubmission(jobId).getSubmissions().get(0); + public MSubmission startJob(long jobId) { + return resourceRequests.startJob(jobId).getSubmissions().get(0); } /** @@ -452,24 +454,27 @@ public MSubmission startSubmission(long jobId) { * @return MSubmission - Final status of job submission * @throws InterruptedException */ - public MSubmission startSubmission(long jobId, SubmissionCallback callback, long pollTime) + public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime) throws InterruptedException { if(pollTime <= 0) { throw new SqoopException(ClientError.CLIENT_0002); } + //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start/first terminology difference + // What does first even mean in s distributed client/server model? boolean first = true; - MSubmission submission = resourceRequests.createSubmission(jobId).getSubmissions().get(0); + MSubmission submission = resourceRequests.startJob(jobId).getSubmissions().get(0); + // what happens when the server fails, do we just say finished? while(submission.getStatus().isRunning()) { if(first) { - submissionCallback(callback, submission, SubmissionStatus.SUBMITTED); + invokeSubmissionCallback(callback, submission, SubmissionStatus.SUBMITTED); first = false; } else { - submissionCallback(callback, submission, SubmissionStatus.UPDATED); + invokeSubmissionCallback(callback, submission, SubmissionStatus.UPDATED); } Thread.sleep(pollTime); - submission = getSubmissionStatus(jobId); + submission = getJobStatus(jobId); } - submissionCallback(callback, submission, SubmissionStatus.FINISHED); + invokeSubmissionCallback(callback, submission, SubmissionStatus.FINISHED); return submission; } @@ -480,7 +485,7 @@ public MSubmission startSubmission(long jobId, SubmissionCallback callback, long * @param submission * @param status */ - private void submissionCallback(SubmissionCallback callback, MSubmission submission, + private void invokeSubmissionCallback(SubmissionCallback callback, MSubmission submission, SubmissionStatus status) { if (callback == null) { return; @@ -494,17 +499,19 @@ private void submissionCallback(SubmissionCallback callback, MSubmission submiss break; case FINISHED: callback.finished(submission); + default: + break; } } /** - * Stop job with given id. + * stop job with given id. * * @param jid Job id * @return */ - public MSubmission stopSubmission(long jid) { - return resourceRequests.deleteSubmission(jid).getSubmissions().get(0); + public MSubmission stopJob(long jid) { + return resourceRequests.stopJob(jid).getSubmissions().get(0); } /** @@ -513,8 +520,8 @@ public MSubmission stopSubmission(long jid) { * @param jid Job id * @return */ - public MSubmission getSubmissionStatus(long jid) { - return resourceRequests.readSubmission(jid).getSubmissions().get(0); + public MSubmission getJobStatus(long jid) { + return resourceRequests.getJobStatus(jid).getSubmissions().get(0); } /** @@ -523,7 +530,7 @@ public MSubmission getSubmissionStatus(long jid) { * @return */ public List getSubmissions() { - return resourceRequests.readHistory(null).getSubmissions(); + return resourceRequests.readSubmission(null).getSubmissions(); } /** @@ -533,7 +540,7 @@ public List getSubmissions() { * @return */ public List getSubmissionsForJob(long jobId) { - return resourceRequests.readHistory(jobId).getSubmissions(); + return resourceRequests.readSubmission(jobId).getSubmissions(); } private Status applyLinkValidations(ValidationResultBean bean, MLink link) { @@ -541,12 +548,13 @@ private Status applyLinkValidations(ValidationResultBean bean, MLink link) { // Apply validation results ConfigUtils.applyValidation(link.getConnectorLinkConfig().getConfigs(), linkConfig); Long id = bean.getId(); - if(id != null) { + if (id != null) { link.setPersistenceId(id); } return Status.getWorstStatus(linkConfig.getStatus()); } + private Status applyJobValidations(ValidationResultBean bean, MJob job) { ConfigValidationResult fromConfig = bean.getValidationResults()[0]; ConfigValidationResult toConfig = bean.getValidationResults()[1]; diff --git a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java index de7211ad..e2e48608 100644 --- a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java +++ b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java @@ -20,8 +20,9 @@ import org.apache.sqoop.model.MSubmission; /** - * Callback interface for Synchronous job submission + * Callback interface for synchronous job submission */ +//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start consistent usage public interface SubmissionCallback { /** diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java index 83c08b32..55c8db22 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java +++ b/client/src/main/java/org/apache/sqoop/client/request/JobResourceRequest.java @@ -18,14 +18,16 @@ package org.apache.sqoop.client.request; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.JobsBean; +import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.ValidationResultBean; import org.apache.sqoop.model.MJob; import org.json.simple.JSONObject; import org.json.simple.JSONValue; /** - * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations - * are normally supported. + * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations are + * normally supported. */ public class JobResourceRequest extends ResourceRequest { @@ -33,18 +35,25 @@ public class JobResourceRequest extends ResourceRequest { private static final String ENABLE = "/enable"; private static final String DISABLE = "/disable"; + private static final String START = "/start"; + private static final String STOP = "/stop"; + private static final String STATUS = "/status"; - public JobBean read(String serverUrl, Long linkId) { + public JobBean read(String serverUrl, Long jobId) { String response; - if (linkId == null) { + if (jobId == null) { response = super.get(serverUrl + RESOURCE + "all"); } else { - response = super.get(serverUrl + RESOURCE + linkId); + response = super.get(serverUrl + RESOURCE + jobId); } JSONObject jsonObject = (JSONObject) JSONValue.parse(response); - JobBean jobBean = new JobBean(); - jobBean.restore(jsonObject); - return jobBean; + // defaults to all + JobBean bean = new JobsBean(); + if (jobId != null) { + bean = new JobBean(); + } + bean.restore(jsonObject); + return bean; } public ValidationResultBean create(String serverUrl, MJob job) { @@ -61,21 +70,43 @@ public ValidationResultBean update(String serverUrl, MJob job) { JobBean jobBean = new JobBean(job); // Extract all config inputs including sensitive inputs JSONObject jobJson = jobBean.extract(false); - String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), jobJson.toJSONString()); + String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), + jobJson.toJSONString()); ValidationResultBean validationBean = new ValidationResultBean(); validationBean.restore((JSONObject) JSONValue.parse(response)); return validationBean; } - public void delete(String serverUrl, Long id) { - super.delete(serverUrl + RESOURCE + id); + public void delete(String serverUrl, Long jobId) { + super.delete(serverUrl + RESOURCE + jobId); } - public void enable(String serverUrl, Long id, Boolean enabled) { + public void enable(String serverUrl, Long jobId, Boolean enabled) { if (enabled) { - super.put(serverUrl + RESOURCE + id + ENABLE, null); + super.put(serverUrl + RESOURCE + jobId + ENABLE, null); } else { - super.put(serverUrl + RESOURCE + id + DISABLE, null); + super.put(serverUrl + RESOURCE + jobId + DISABLE, null); } } + + public SubmissionBean start(String serverUrl, Long jobId) { + String response = super.put(serverUrl + RESOURCE + jobId + START, null); + return createJobSubmissionResponse(response); + } + + public SubmissionBean stop(String serverUrl, Long jobId) { + String response = super.put(serverUrl + RESOURCE + jobId + STOP, null); + return createJobSubmissionResponse(response); + } + + public SubmissionBean status(String serverUrl, Long jobId) { + String response = super.get(serverUrl + RESOURCE + jobId + STATUS); + return createJobSubmissionResponse(response); + } + + private SubmissionBean createJobSubmissionResponse(String response) { + SubmissionBean submissionBean = new SubmissionBean(); + submissionBean.restore((JSONObject) JSONValue.parse(response)); + return submissionBean; + } } diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java index 4a56bb7a..fe528f26 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java +++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopResourceRequests.java @@ -17,14 +17,15 @@ */ package org.apache.sqoop.client.request; -import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.ConnectorBean; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.SubmissionBean; +import org.apache.sqoop.json.SubmissionsBean; import org.apache.sqoop.json.ValidationResultBean; -import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; /** * Unified class for all request objects. @@ -131,19 +132,19 @@ public void deleteJob(Long jid) { getJobResourceRequest().delete(serverUrl, jid); } - public SubmissionBean readHistory(Long jid) { - return getSubmissionResourceRequest().readHistory(serverUrl, jid); + public SubmissionBean getJobStatus(Long jid) { + return getJobResourceRequest().status(serverUrl, jid); } - public SubmissionBean readSubmission(Long jid) { + public SubmissionBean startJob(Long jid) { + return getJobResourceRequest().start(serverUrl, jid); + } + + public SubmissionBean stopJob(Long jid) { + return getJobResourceRequest().stop(serverUrl, jid); + } + + public SubmissionsBean readSubmission(Long jid) { return getSubmissionResourceRequest().read(serverUrl, jid); } - - public SubmissionBean createSubmission(Long jid) { - return getSubmissionResourceRequest().create(serverUrl, jid); - } - - public SubmissionBean deleteSubmission(Long jid) { - return getSubmissionResourceRequest().delete(serverUrl, jid); - } } diff --git a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java index 50557833..e3b70bc0 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java +++ b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java @@ -17,7 +17,7 @@ */ package org.apache.sqoop.client.request; -import org.apache.sqoop.json.SubmissionBean; +import org.apache.sqoop.json.SubmissionsBean; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -27,54 +27,18 @@ */ public class SubmissionResourceRequest extends ResourceRequest { - public static final String RESOURCE = "v1/submission/"; + public static final String RESOURCE = "v1/submissions/"; - public static final String ACTION = RESOURCE + "action/"; - - public static final String HISTORY = RESOURCE + "history/"; - - public SubmissionBean readHistory(String serverUrl, Long jid) { + public SubmissionsBean read(String serverUrl, Long jid) { String response; if (jid == null) { - response = super.get(serverUrl + HISTORY + "all"); + response = super.get(serverUrl + RESOURCE); } else { - response = super.get(serverUrl + HISTORY + jid); + response = super.get(serverUrl + RESOURCE + jid); } - JSONObject jsonObject = (JSONObject) JSONValue.parse(response); - - SubmissionBean submissionBean = new SubmissionBean(); + SubmissionsBean submissionBean = new SubmissionsBean(); submissionBean.restore(jsonObject); - - return submissionBean; - } - - public SubmissionBean read(String serverUrl, Long jid) { - String response = super.get(serverUrl + ACTION + jid); - - JSONObject jsonObject = (JSONObject) JSONValue.parse(response); - - SubmissionBean submissionBean = new SubmissionBean(); - submissionBean.restore(jsonObject); - - return submissionBean; - } - - public SubmissionBean create(String serverUrl, Long jid) { - String response = super.post(serverUrl + ACTION + jid, null); - - SubmissionBean submissionBean = new SubmissionBean(); - submissionBean.restore((JSONObject) JSONValue.parse(response)); - - return submissionBean; - } - - public SubmissionBean delete(String serverUrl, Long id) { - String response = super.delete(serverUrl + ACTION + id); - - SubmissionBean submissionBean = new SubmissionBean(); - submissionBean.restore((JSONObject) JSONValue.parse(response)); - return submissionBean; } } diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java index 082d5915..efc2efcd 100644 --- a/common/src/main/java/org/apache/sqoop/json/JobBean.java +++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java @@ -19,8 +19,6 @@ import static org.apache.sqoop.json.util.ConfigInputSerialization.extractConfigList; import static org.apache.sqoop.json.util.ConfigInputSerialization.restoreConfigList; -import static org.apache.sqoop.json.util.ConfigBundleSerialization.extractConfigParamBundle; -import static org.apache.sqoop.json.util.ConfigBundleSerialization.restoreConfigParamBundle; import java.util.ArrayList; import java.util.Date; @@ -39,7 +37,7 @@ import org.json.simple.JSONObject; /** - * Json representation of the job config + * Json representation of the job */ public class JobBean implements JsonBean { @@ -50,6 +48,7 @@ public class JobBean implements JsonBean { static final String FROM_CONFIG = "from-config"; static final String TO_CONFIG = "to-config"; static final String DRIVER_CONFIG = "driver-config"; + private static final String JOB = "job"; // Required private List jobs; @@ -102,9 +101,16 @@ public ResourceBundle getDriverConfigBundle() { @Override @SuppressWarnings("unchecked") public JSONObject extract(boolean skipSensitive) { - JSONArray array = new JSONArray(); + JSONArray jobArray = extractJobs(skipSensitive); + JSONObject job = new JSONObject(); + job.put(JOB, jobArray); + return job; + } - for(MJob job : jobs) { + @SuppressWarnings("unchecked") + protected JSONArray extractJobs(boolean skipSensitive) { + JSONArray jobArray = new JSONArray(); + for (MJob job : jobs) { JSONObject object = new JSONObject(); object.put(ID, job.getPersistenceId()); @@ -115,6 +121,7 @@ public JSONObject extract(boolean skipSensitive) { object.put(UPDATE_USER, job.getLastUpdateUser()); object.put(UPDATE_DATE, job.getLastUpdateDate().getTime()); // job link associated connectors + // TODO(SQOOP-1634): fix not to require the connectorIds in the post data object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM)); object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO)); // job associated links @@ -122,25 +129,30 @@ public JSONObject extract(boolean skipSensitive) { object.put(TO_LINK_ID, job.getLinkId(Direction.TO)); // job configs MFromConfig fromConfigList = job.getFromJobConfig(); - object.put(FROM_CONFIG, extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive)); + object.put(FROM_CONFIG, + extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive)); MToConfig toConfigList = job.getToJobConfig(); - object.put(TO_CONFIG, extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive)); + object.put(TO_CONFIG, + extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive)); MDriverConfig driverConfigList = job.getDriverConfig(); - object.put(DRIVER_CONFIG, extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(), skipSensitive)); + object.put( + DRIVER_CONFIG, + extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(), + skipSensitive)); - array.add(object); + jobArray.add(object); } - - JSONObject all = new JSONObject(); - all.put(ALL, array); - return all; + return jobArray; } @Override public void restore(JSONObject jsonObject) { - jobs = new ArrayList(); + JSONArray array = (JSONArray) jsonObject.get(JOB); + restoreJobs(array); + } - JSONArray array = (JSONArray) jsonObject.get(ALL); + protected void restoreJobs(JSONArray array) { + jobs = new ArrayList(); for (Object obj : array) { JSONObject object = (JSONObject) obj; @@ -178,4 +190,4 @@ public void restore(JSONObject jsonObject) { jobs.add(job); } } -} +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/sqoop/json/JobsBean.java b/common/src/main/java/org/apache/sqoop/json/JobsBean.java new file mode 100644 index 00000000..3c454ea4 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/json/JobsBean.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.json; + +import java.util.List; + +import org.apache.sqoop.model.MJob; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +/** + * Json representation of the jobs + */ +public class JobsBean extends JobBean { + + private static final String JOBS = "jobs"; + + public JobsBean(MJob job) { + super(job); + } + + public JobsBean(List jobs) { + super(jobs); + } + + // For "restore" + public JobsBean() { + } + + @Override + @SuppressWarnings("unchecked") + public JSONObject extract(boolean skipSensitive) { + JSONArray jobArray = super.extractJobs(skipSensitive); + JSONObject jobs = new JSONObject(); + jobs.put(JOBS, jobArray); + return jobs; + } + + @Override + public void restore(JSONObject jsonObject) { + JSONArray array = (JSONArray) jsonObject.get(JOBS); + restoreJobs(array); + } +} diff --git a/common/src/main/java/org/apache/sqoop/json/JsonBean.java b/common/src/main/java/org/apache/sqoop/json/JsonBean.java index ba865110..164b6047 100644 --- a/common/src/main/java/org/apache/sqoop/json/JsonBean.java +++ b/common/src/main/java/org/apache/sqoop/json/JsonBean.java @@ -25,6 +25,7 @@ public interface JsonBean { static final String CONFIGURABLE_VERSION = "version"; static final String ALL_CONFIGS = "all-configs"; + @Deprecated // should not be used anymore in the rest api static final String ALL = "all"; static final String ID = "id"; static final String NAME = "name"; diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java index 4b803380..b7bdaadf 100644 --- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java +++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java @@ -17,6 +17,15 @@ */ package org.apache.sqoop.json; +import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema; +import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; @@ -25,21 +34,12 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema; -import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema; - /** * */ public class SubmissionBean implements JsonBean { - private static final String ALL = "all"; + private static final String SUBMISSION = "submission"; private static final String JOB = "job"; private static final String CREATION_USER = "creation-user"; private static final String CREATION_DATE = "creation-date"; @@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean { private static final String EXCEPTION_TRACE = "exception-trace"; private static final String PROGRESS = "progress"; private static final String COUNTERS = "counters"; - private static final String FROM_SCHEMA = "schema-from"; - private static final String TO_SCHEMA = "schema-to"; + private static final String FROM_SCHEMA = "from-schema"; + private static final String TO_SCHEMA = "to-schema"; private List submissions; @@ -80,79 +80,83 @@ public SubmissionBean() { @Override @SuppressWarnings("unchecked") public JSONObject extract(boolean skipSensitive) { - JSONArray array = new JSONArray(); + JSONArray submissionArray = extractSubmissions(); + JSONObject submission = new JSONObject(); + submission.put(SUBMISSION, submissionArray); + return submission; + } - for(MSubmission submission : this.submissions) { + @SuppressWarnings("unchecked") + protected JSONArray extractSubmissions() { + JSONArray submissionsArray = new JSONArray(); + + for (MSubmission submission : this.submissions) { JSONObject object = new JSONObject(); object.put(JOB, submission.getJobId()); object.put(STATUS, submission.getStatus().name()); object.put(PROGRESS, submission.getProgress()); - if(submission.getCreationUser() != null) { + if (submission.getCreationUser() != null) { object.put(CREATION_USER, submission.getCreationUser()); } - if(submission.getCreationDate() != null) { + if (submission.getCreationDate() != null) { object.put(CREATION_DATE, submission.getCreationDate().getTime()); } - if(submission.getLastUpdateUser() != null) { + if (submission.getLastUpdateUser() != null) { object.put(LAST_UPDATE_USER, submission.getLastUpdateUser()); } - if(submission.getLastUpdateDate() != null) { + if (submission.getLastUpdateDate() != null) { object.put(LAST_UPDATE_DATE, submission.getLastUpdateDate().getTime()); } - if(submission.getExternalId() != null) { + if (submission.getExternalId() != null) { object.put(EXTERNAL_ID, submission.getExternalId()); } - if(submission.getExternalLink() != null) { + if (submission.getExternalLink() != null) { object.put(EXTERNAL_LINK, submission.getExternalLink()); } - if(submission.getExceptionInfo() != null) { + if (submission.getExceptionInfo() != null) { object.put(EXCEPTION, submission.getExceptionInfo()); } - if(submission.getExceptionStackTrace() != null) { + if (submission.getExceptionStackTrace() != null) { object.put(EXCEPTION_TRACE, submission.getExceptionStackTrace()); } - if(submission.getCounters() != null) { + if (submission.getCounters() != null) { object.put(COUNTERS, extractCounters(submission.getCounters())); } - if(submission.getFromSchema() != null) { + if (submission.getFromSchema() != null) { object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema())); } - if(submission.getToSchema() != null) { + if (submission.getToSchema() != null) { object.put(TO_SCHEMA, extractSchema(submission.getToSchema())); } - - array.add(object); + submissionsArray.add(object); } - - JSONObject all = new JSONObject(); - all.put(ALL, array); - - return all; + return submissionsArray; } @SuppressWarnings("unchecked") - public JSONObject extractCounters(Counters counters) { - JSONObject ret = new JSONObject(); - for(CounterGroup group : counters) { + private JSONObject extractCounters(Counters counters) { + JSONObject counterArray = new JSONObject(); + for (CounterGroup group : counters) { JSONObject counterGroup = new JSONObject(); - for(Counter counter : group) { + for (Counter counter : group) { counterGroup.put(counter.getName(), counter.getValue()); } - - ret.put(group.getName(), counterGroup); + counterArray.put(group.getName(), counterGroup); } - return ret; + return counterArray; } @Override public void restore(JSONObject json) { + JSONArray submissionArray = (JSONArray) json.get(SUBMISSION); + restoreSubmissions(submissionArray); + } + + protected void restoreSubmissions(JSONArray array) { this.submissions = new ArrayList(); - - JSONArray array = (JSONArray) json.get(ALL); - for (Object obj : array) { JSONObject object = (JSONObject) obj; MSubmission submission = new MSubmission(); @@ -161,38 +165,38 @@ public void restore(JSONObject json) { submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS))); submission.setProgress((Double) object.get(PROGRESS)); - if(object.containsKey(CREATION_USER)) { + if (object.containsKey(CREATION_USER)) { submission.setCreationUser((String) object.get(CREATION_USER)); } - if(object.containsKey(CREATION_DATE)) { + if (object.containsKey(CREATION_DATE)) { submission.setCreationDate(new Date((Long) object.get(CREATION_DATE))); } - if(object.containsKey(LAST_UPDATE_USER)) { + if (object.containsKey(LAST_UPDATE_USER)) { submission.setLastUpdateUser((String) object.get(LAST_UPDATE_USER)); } - if(object.containsKey(LAST_UPDATE_DATE)) { + if (object.containsKey(LAST_UPDATE_DATE)) { submission.setLastUpdateDate(new Date((Long) object.get(LAST_UPDATE_DATE))); } - if(object.containsKey(EXTERNAL_ID)) { + if (object.containsKey(EXTERNAL_ID)) { submission.setExternalId((String) object.get(EXTERNAL_ID)); } - if(object.containsKey(EXTERNAL_LINK)) { + if (object.containsKey(EXTERNAL_LINK)) { submission.setExternalLink((String) object.get(EXTERNAL_LINK)); } - if(object.containsKey(EXCEPTION)) { + if (object.containsKey(EXCEPTION)) { submission.setExceptionInfo((String) object.get(EXCEPTION)); } - if(object.containsKey(EXCEPTION_TRACE)) { + if (object.containsKey(EXCEPTION_TRACE)) { submission.setExceptionStackTrace((String) object.get(EXCEPTION_TRACE)); } - if(object.containsKey(COUNTERS)) { + if (object.containsKey(COUNTERS)) { submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS))); } - if(object.containsKey(FROM_SCHEMA)) { + if (object.containsKey(FROM_SCHEMA)) { submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA))); } - if(object.containsKey(TO_SCHEMA)) { + if (object.containsKey(TO_SCHEMA)) { submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA))); } @@ -200,24 +204,20 @@ public void restore(JSONObject json) { } } + @SuppressWarnings("unchecked") public Counters restoreCounters(JSONObject object) { Set> groupSet = object.entrySet(); Counters counters = new Counters(); - for(Map.Entry groupEntry: groupSet) { - + for (Map.Entry groupEntry : groupSet) { CounterGroup group = new CounterGroup(groupEntry.getKey()); - Set> counterSet = groupEntry.getValue().entrySet(); - - for(Map.Entry counterEntry: counterSet) { + for (Map.Entry counterEntry : counterSet) { Counter counter = new Counter(counterEntry.getKey(), counterEntry.getValue()); group.addCounter(counter); } - counters.addCounterGroup(group); } - return counters; } } diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java new file mode 100644 index 00000000..74b61797 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/json/SubmissionsBean.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.json; + +import java.util.List; + +import org.apache.sqoop.model.MSubmission; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +public class SubmissionsBean extends SubmissionBean { + + private static final String SUBMISSIONS = "submissions"; + + // For "extract" + public SubmissionsBean(MSubmission submission) { + super(submission); + } + + public SubmissionsBean(List submissions) { + super(submissions); + + } + + // For "restore" + public SubmissionsBean() { + } + + @Override + @SuppressWarnings("unchecked") + public JSONObject extract(boolean skipSensitive) { + JSONArray submissionsArray = super.extractSubmissions(); + JSONObject submissions = new JSONObject(); + submissions.put(SUBMISSIONS, submissionsArray); + return submissions; + } + + @Override + public void restore(JSONObject json) { + JSONArray submissionsArray = (JSONArray) json.get(SUBMISSIONS); + restoreSubmissions(submissionsArray); + } + +} diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java index 7290df50..26487120 100644 --- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java +++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java @@ -36,9 +36,6 @@ public class MSubmission extends MAccountableEntity { /** * Job id that this submission object belongs. * - * By transitivity of metadata structure you can get also connection and - * connector ids. - * * This property is required and will be always present. */ private long jobId; diff --git a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java index 1fc8dbd2..923ad474 100644 --- a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java +++ b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java @@ -55,13 +55,13 @@ public void testSerialization() throws ParseException { // Serialize it to JSON object JobBean jobBean = new JobBean(job); - JSONObject jobJson = jobBean.extract(false); + JSONObject json = jobBean.extract(false); // "Move" it across network in text form - String jobJsonString = jobJson.toJSONString(); + String jobJsonString = json.toJSONString(); // Retrieved transferred object - JSONObject parsedJobJson = (JSONObject)JSONValue.parseWithException(jobJsonString); + JSONObject parsedJobJson = (JSONObject)JSONValue.parse(jobJsonString); JobBean parsedJobBean = new JobBean(); parsedJobBean.restore(parsedJobJson); MJob target = parsedJobBean.getJobs().get(0); diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java index e4d50bf6..c5b8781b 100644 --- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java +++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java @@ -457,13 +457,13 @@ private Schema getSchema() { * @return */ private MSubmission transfer(MSubmission submission) { - SubmissionBean bean = new SubmissionBean(submission); + SubmissionsBean bean = new SubmissionsBean(submission); JSONObject json = bean.extract(false); String string = json.toString(); JSONObject retrievedJson = (JSONObject) JSONValue.parse(string); - SubmissionBean retrievedBean = new SubmissionBean(); + SubmissionsBean retrievedBean = new SubmissionsBean(); retrievedBean.restore(retrievedJson); return retrievedBean.getSubmissions().get(0); @@ -476,13 +476,13 @@ private MSubmission transfer(MSubmission submission) { * @return */ private List transfer(List submissions) { - SubmissionBean bean = new SubmissionBean(submissions); + SubmissionsBean bean = new SubmissionsBean(submissions); JSONObject json = bean.extract(false); String string = json.toString(); JSONObject retrievedJson = (JSONObject) JSONValue.parse(string); - SubmissionBean retrievedBean = new SubmissionBean(); + SubmissionsBean retrievedBean = new SubmissionsBean(); retrievedBean.restore(retrievedJson); return retrievedBean.getSubmissions(); diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverError.java b/core/src/main/java/org/apache/sqoop/driver/DriverError.java index ddee2822..25a1b70f 100644 --- a/core/src/main/java/org/apache/sqoop/driver/DriverError.java +++ b/core/src/main/java/org/apache/sqoop/driver/DriverError.java @@ -22,6 +22,8 @@ /** * */ +//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): why is this called Driver Error since it is used in JobManager? + public enum DriverError implements ErrorCode { DRIVER_0001("Invalid submission engine"), @@ -40,6 +42,7 @@ public enum DriverError implements ErrorCode { DRIVER_0008("Invalid combination of submission and execution engines"), + //TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start terminology difference DRIVER_0009("Job has been disabled. Cannot submit this job."), DRIVER_0010("Link for this job has been disabled. Cannot submit this job."), diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index f6447c65..3ffbaade 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -269,11 +269,11 @@ public synchronized void initialize() { LOG.info("Submission manager initialized: OK"); } - public MSubmission submit(long jobId, HttpEventContext ctx) { + public MSubmission start(long jobId, HttpEventContext ctx) { MSubmission mSubmission = createJobSubmission(ctx, jobId); JobRequest jobRequest = createJobRequest(jobId, mSubmission); - // Bootstrap job to execute + // Bootstrap job to execute in the configured execution engine prepareJob(jobRequest); // Make sure that this job id is not currently running and submit the job // only if it's not. @@ -283,14 +283,17 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); } - // TODO(jarcec): We might need to catch all exceptions here to ensure - // that Destroyer will be executed in all cases. // NOTE: the following is a blocking call boolean success = submissionEngine.submit(jobRequest); if (!success) { - destroySubmission(jobRequest); + // TODO(jarcec): We might need to catch all exceptions here to ensure + // that Destroyer will be executed in all cases. + invokeDestroyerOnJobFailure(jobRequest); mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); } + // persist submission record to repository. + // on failure we persist the FAILURE status, on success it is the SUCCESS + // status ( which is the default one) RepositoryManager.getInstance().getRepository().createSubmission(mSubmission); } return mSubmission; @@ -435,6 +438,7 @@ MJob getJob(long jobId) { return job; } + @SuppressWarnings({ "unchecked", "rawtypes" }) private void initializeConnector(JobRequest jobRequest, Direction direction) { Initializer initializer = getConnectorInitializer(jobRequest, direction); InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); @@ -444,6 +448,7 @@ private void initializeConnector(JobRequest jobRequest, Direction direction) { jobRequest.getJobConfig(direction)); } + @SuppressWarnings({ "unchecked", "rawtypes" }) private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) { Initializer initializer = getConnectorInitializer(jobRequest, direction); @@ -453,6 +458,7 @@ private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) jobRequest.getJobConfig(direction)); } + @SuppressWarnings({ "unchecked", "rawtypes" }) private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { Initializer initializer = getConnectorInitializer(jobRequest, direction); @@ -462,6 +468,7 @@ private void addConnectorInitializerJars(JobRequest jobRequest, Direction direct jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction))); } + @SuppressWarnings({ "rawtypes" }) private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo(); Class initializerClass = transferable.getInitializer(); @@ -494,7 +501,8 @@ void prepareJob(JobRequest request) { * Callback that will be called only if we failed to submit the job to the * remote cluster. */ - void destroySubmission(JobRequest request) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + void invokeDestroyerOnJobFailure(JobRequest request) { Transferable from = request.getFrom(); Transferable to = request.getTo(); @@ -520,7 +528,6 @@ void destroySubmission(JobRequest request) { request.getConnectorContext(Direction.TO), false, request.getSummary() .getToSchema()); - // destroy submission from connector perspective fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM), request.getJobConfig(Direction.FROM)); toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO), @@ -534,7 +541,7 @@ public MSubmission stop(long jobId, HttpEventContext ctx) { if (mSubmission == null || !mSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId - + " is not running"); + + " is not running hence cannot stop"); } submissionEngine.stop(mSubmission.getExternalId()); @@ -554,8 +561,7 @@ public MSubmission status(long jobId) { if (mSubmission == null) { return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); } - - // If the submission is in running state, let's update it + // If the submission isin running state, let's update it if (mSubmission.getStatus().isRunning()) { update(mSubmission); } @@ -696,4 +702,4 @@ public void run() { LOG.info("Ending submission manager update thread"); } } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java index 2666320f..eed79a52 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java @@ -91,7 +91,7 @@ public class JobRequest { MutableMapContext toConnectorContext; /** - * Framework context (submission specific configuration) + * Driver context (submission specific configuration) */ MutableMapContext driverContext; diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 2aeb07eb..4fb9afdf 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -488,6 +488,20 @@ public Object doIt(Connection conn) { }); } + + /** + * {@inheritDoc} + */ + @Override + public MJob findJob(final String name) { + return (MJob) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + return handler.findJob(name, conn); + } + }); + } + /** * {@inheritDoc} */ diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index ad380d39..8aee603e 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -334,6 +334,15 @@ public abstract class JdbcRepositoryHandler { */ public abstract MJob findJob(long jobId, Connection conn); + /** + * Find job with given name in repository. + * + * @param name unique name for the job + * @param conn Connection to the repository + * @return job for a given name that is present in the repository or null if not present + */ + public abstract MJob findJob(String name, Connection conn); + /** * Get all job objects. * diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index be053832..d4528b11 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -246,6 +246,14 @@ public abstract class Repository { */ public abstract MJob findJob(long id); + /** + * Find job object with given name. + * + * @param name unique name for the job + * @return job with given name loaded from repository or null if not present + */ + public abstract MJob findJob(String name); + /** * Get all job objects. * diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 514b5ac2..dcdc7a2d 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -1586,6 +1586,33 @@ public MJob findJob(long id, Connection conn) { } } + /** + * {@inheritDoc} + */ + @Override + public MJob findJob(String name, Connection conn) { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_NAME); + stmt.setString(1, name); + + List jobs = loadJobs(stmt, conn); + + if (jobs.size() != 1) { + return null; + } + + // Return the first and only one link object + return jobs.get(0); + + } catch (SQLException ex) { + logException(ex, name); + throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex); + } finally { + closeStatements(stmt); + } + } + /** * {@inheritDoc} */ diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java index c894d063..6c5fad77 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java @@ -435,6 +435,10 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery { public static final String STMT_SELECT_JOB_SINGLE_BY_ID = STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?"; +// DML: Select one specific job + public static final String STMT_SELECT_JOB_SINGLE_BY_NAME = + STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_NAME + " = ?"; + // DML: Select all jobs for a Connector public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE = STMT_SELECT_JOB diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index 55479880..81308058 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -18,6 +18,7 @@ package org.apache.sqoop.handler; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -28,16 +29,22 @@ import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.driver.Driver; +import org.apache.sqoop.driver.JobManager; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.JobsBean; import org.apache.sqoop.json.JsonBean; +import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.ValidationResultBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MPersistableEntity; +import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; +import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.common.ServerError; @@ -46,41 +53,38 @@ import org.json.simple.JSONObject; import org.json.simple.JSONValue; -/** - * Job request handler is supporting following resources: - * - * GET /v1/job/:jid - * Return details about one particular job with id :jid or about all of - * them if :jid equals to "all". - * - * POST /v1/job - * Create new job - * - * PUT /v1/job/:jid - * Update job with id :jid. - * - * PUT /v1/job/:jid/enable - * Enable job with id :jid - * - * PUT /v1/job/:jid/disable - * Disable job with id :jid - * - * DELETE /v1/job/:jid - * Remove job with id :jid - * - * Planned resources: - * - * GET /v1/job - * Get brief list of all jobs present in the system. This resource is not yet - * implemented. - */ public class JobRequestHandler implements RequestHandler { - private static final Logger LOG = - Logger.getLogger(JobRequestHandler.class); + /** enum for representing the actions supported on the job resource*/ + enum JobAction { + ENABLE("enable"), + DISABLE("disable"), + START("start"), + STOP("stop"), + ; + JobAction(String name) { + this.name = name; + } - private static final String ENABLE = "enable"; - private static final String DISABLE = "disable"; + String name; + + public static JobAction fromString(String name) { + if (name != null) { + for (JobAction action : JobAction.values()) { + if (name.equalsIgnoreCase(action.name)) { + return action; + } + } + } + return null; + } + } + + private static final Logger LOG = Logger.getLogger(JobRequestHandler.class); + + static final String JOBS_PATH = "jobs"; + static final String JOB_PATH = "job"; + static final String STATUS = "status"; public JobRequestHandler() { LOG.info("JobRequestHandler initialized"); @@ -89,208 +93,306 @@ public JobRequestHandler() { @Override public JsonBean handleEvent(RequestContext ctx) { switch (ctx.getMethod()) { - case GET: - return getJobs(ctx); - case POST: - return createUpdateJob(ctx, false); - case PUT: - if (ctx.getLastURLElement().equals(ENABLE)) { - return enableJob(ctx, true); - } else if (ctx.getLastURLElement().equals(DISABLE)) { - return enableJob(ctx, false); - } else { - return createUpdateJob(ctx, true); - } - case DELETE: - return deleteJob(ctx); + case GET: + if (STATUS.equals(ctx.getLastURLElement())) { + return getJobStatus(ctx); + } + return getJobs(ctx); + case POST: + return createUpdateJob(ctx, true); + case PUT: + JobAction action = JobAction.fromString(ctx.getLastURLElement()); + switch (action) { + case ENABLE: + return enableJob(ctx, true); + case DISABLE: + return enableJob(ctx, false); + case START: + return startJob(ctx); + case STOP: + return stopJob(ctx); + default: + return createUpdateJob(ctx, false); + } + case DELETE: + return deleteJob(ctx); } return null; } /** - * Delete job from repository. + * Delete job from repository. * - * @param ctx Context object + * @param ctx + * Context object * @return Empty bean */ private JsonBean deleteJob(RequestContext ctx) { - String sxid = ctx.getLastURLElement(); - long jid = Long.valueOf(sxid); - - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "delete", "job", sxid); Repository repository = RepositoryManager.getInstance().getRepository(); - repository.deleteJob(jid); + String jobIdentifier = ctx.getLastURLElement(); + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier); + repository.deleteJob(jobId); return JsonBean.EMPTY_BEAN; } /** * Update or create job in repository. * - * @param ctx Context object + * @param ctx + * Context object * @return Validation bean object */ - private JsonBean createUpdateJob(RequestContext ctx, boolean update) { + private JsonBean createUpdateJob(RequestContext ctx, boolean create) { + + Repository repository = RepositoryManager.getInstance().getRepository(); String username = ctx.getUserName(); JobBean bean = new JobBean(); try { - JSONObject json = - (JSONObject) JSONValue.parse(ctx.getRequest().getReader()); + JSONObject json = (JSONObject) JSONValue.parse(ctx.getRequest().getReader()); bean.restore(json); } catch (IOException e) { - throw new SqoopException(ServerError.SERVER_0003, - "Can't read request content", e); + throw new SqoopException(ServerError.SERVER_0003, "Can't read request content", e); } // Get job object List jobs = bean.getJobs(); - if(jobs.size() != 1) { - throw new SqoopException(ServerError.SERVER_0003, - "Expected one job but got " + jobs.size()); + if (jobs.size() != 1) { + throw new SqoopException(ServerError.SERVER_0003, "Expected one job but got " + jobs.size()); } // Job object - MJob job = jobs.get(0); + MJob postedJob = jobs.get(0); // Verify that user is not trying to spoof us MFromConfig fromConfig = ConnectorManager.getInstance() - .getConnectorConfigurable(job.getConnectorId(Direction.FROM)) - .getFromConfig(); + .getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig(); MToConfig toConfig = ConnectorManager.getInstance() - .getConnectorConfigurable(job.getConnectorId(Direction.TO)) - .getToConfig(); + .getConnectorConfigurable(postedJob.getConnectorId(Direction.TO)).getToConfig(); MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig(); - if(!fromConfig.equals(job.getJobConfig(Direction.FROM)) - || !driverConfig.equals(job.getDriverConfig()) - || !toConfig.equals(job.getJobConfig(Direction.TO))) { - throw new SqoopException(ServerError.SERVER_0003, - "Detected incorrect config structure"); + if (!fromConfig.equals(postedJob.getJobConfig(Direction.FROM)) + || !driverConfig.equals(postedJob.getDriverConfig()) + || !toConfig.equals(postedJob.getJobConfig(Direction.TO))) { + throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure"); + } + + // if update get the job id from the request URI + if (!create) { + String jobIdentifier = ctx.getLastURLElement(); + // support jobName or jobId for the api + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) { + MJob existingJob = repository.findJob(jobId); + postedJob.setPersistenceId(existingJob.getPersistenceId()); + } } // Corresponding connectors for this - SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM)); - SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO)); + SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector( + postedJob.getConnectorId(Direction.FROM)); + SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector( + postedJob.getConnectorId(Direction.TO)); if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) { - throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName() - + " does not support FROM direction."); + throw new SqoopException(ServerError.SERVER_0004, "Connector " + + fromConnector.getClass().getCanonicalName() + " does not support FROM direction."); } if (!toConnector.getSupportedDirections().contains(Direction.TO)) { - throw new SqoopException(ServerError.SERVER_0004, "Connector " + toConnector.getClass().getCanonicalName() - + " does not support TO direction."); + throw new SqoopException(ServerError.SERVER_0004, "Connector " + + toConnector.getClass().getCanonicalName() + " does not support TO direction."); } // Validate user supplied data ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs( - job.getJobConfig(Direction.FROM).getConfigs(), - fromConnector.getJobConfigurationClass(Direction.FROM) - ); + postedJob.getJobConfig(Direction.FROM).getConfigs(), + fromConnector.getJobConfigurationClass(Direction.FROM)); ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs( - job.getJobConfig(Direction.TO).getConfigs(), - toConnector.getJobConfigurationClass(Direction.TO) - ); - ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs( - job.getDriverConfig().getConfigs(), - Driver.getInstance().getDriverJobConfigurationClass() - ); - - Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(), toConfigValidator.getStatus(), driverConfigValidator.getStatus()); - + postedJob.getJobConfig(Direction.TO).getConfigs(), + toConnector.getJobConfigurationClass(Direction.TO)); + ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob + .getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass()); + Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(), + toConfigValidator.getStatus(), driverConfigValidator.getStatus()); // Return back validations in all cases - ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, toConfigValidator); + ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, + toConfigValidator); // If we're good enough let's perform the action - if(finalStatus.canProceed()) { - if(update) { - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "update", "job", String.valueOf(job.getPersistenceId())); + if (finalStatus.canProceed()) { + if (create) { + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "create", "job", + String.valueOf(postedJob.getPersistenceId())); - job.setLastUpdateUser(username); - RepositoryManager.getInstance().getRepository().updateJob(job); + postedJob.setCreationUser(username); + postedJob.setLastUpdateUser(username); + repository.createJob(postedJob); + validationResultBean.setId(postedJob.getPersistenceId()); } else { - job.setCreationUser(username); - job.setLastUpdateUser(username); - RepositoryManager.getInstance().getRepository().createJob(job); - validationResultBean.setId(job.getPersistenceId()); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "update", "job", + String.valueOf(postedJob.getPersistenceId())); - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "create", "job", String.valueOf(job.getPersistenceId())); + postedJob.setLastUpdateUser(username); + repository.updateJob(postedJob); } - } - return validationResultBean; } private JsonBean getJobs(RequestContext ctx) { - String sjid = ctx.getLastURLElement(); - JobBean bean; - - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "get", "job", sjid); - + String identifier = ctx.getLastURLElement(); + JobBean jobBean; Locale locale = ctx.getAcceptLanguageHeader(); Repository repository = RepositoryManager.getInstance().getRepository(); - if (sjid.equals(JsonBean.ALL)) { - - List jobs = repository.findJobs(); - bean = new JobBean(jobs); - - // Add associated resources into the bean - for( MJob job : jobs) { - long fromConnectorId = job.getConnectorId(Direction.FROM); - long toConnectorId = job.getConnectorId(Direction.TO); - if(!bean.hasConnectorConfigBundle(fromConnectorId)) { - bean.addConnectorConfigBundle(fromConnectorId, - ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale)); - } - if(!bean.hasConnectorConfigBundle(toConnectorId)) { - bean.addConnectorConfigBundle(toConnectorId, - ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale)); - } + // jobs by connector + if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) { + identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier); + if (repository.findConnector(identifier) != null) { + long connectorId = repository.findConnector(identifier).getPersistenceId(); + jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale); + } else { + // this means name nor Id existed + throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier + + " name for jobs given"); } + } else + // all jobs in the system + if (ctx.getPath().contains(JOBS_PATH) + || (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) { + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "get", "jobs", "all"); + jobBean = createJobsBean(repository.findJobs(), locale); + } + // job by Id + else { + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "get", "job", identifier); + + long jobId = getJobIdFromIdentifier(identifier, repository); + List jobList = new ArrayList(); + // a list of single element + jobList.add(repository.findJob(jobId)); + jobBean = createJobBean(jobList, locale); + } + return jobBean; + } + + private long getJobIdFromIdentifier(String identifier, Repository repository) { + // support jobName or jobId for the api + // NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames + long jobId; + if (repository.findJob(identifier) != null) { + jobId = repository.findJob(identifier).getPersistenceId(); } else { - long jid = Long.valueOf(sjid); - - MJob job = repository.findJob(jid); - long fromConnectorId = job.getConnectorId(Direction.FROM); - long toConnectorId = job.getConnectorId(Direction.TO); - bean = new JobBean(job); - if(!bean.hasConnectorConfigBundle(fromConnectorId)) { - bean.addConnectorConfigBundle(fromConnectorId, - ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale)); - } - if(!bean.hasConnectorConfigBundle(toConnectorId)) { - bean.addConnectorConfigBundle(toConnectorId, - ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale)); + try { + jobId = Long.valueOf(identifier); + } catch (NumberFormatException ex) { + // this means name nor Id existed and we want to throw a user friendly message than a number format exception + throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier + + " requested"); } } + return jobId; + } - // set driver config bundle - bean.setDriverConfigBundle(Driver.getInstance().getBundle(locale)); - return bean; + private JobBean createJobBean(List jobs, Locale locale) { + JobBean jobBean = new JobBean(jobs); + addJob(jobs, locale, jobBean); + return jobBean; + } + + private JobsBean createJobsBean(List jobs, Locale locale) { + JobsBean jobsBean = new JobsBean(jobs); + addJob(jobs, locale, jobsBean); + return jobsBean; + } + + private void addJob(List jobs, Locale locale, JobBean bean) { + // Add associated resources into the bean + for (MJob job : jobs) { + long fromConnectorId = job.getConnectorId(Direction.FROM); + long toConnectorId = job.getConnectorId(Direction.TO); + // replace it only if it does not already exist + if (!bean.hasConnectorConfigBundle(fromConnectorId)) { + bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance() + .getResourceBundle(fromConnectorId, locale)); + } + if (!bean.hasConnectorConfigBundle(toConnectorId)) { + bean.addConnectorConfigBundle(toConnectorId, ConnectorManager.getInstance() + .getResourceBundle(toConnectorId, locale)); + } + } } private JsonBean enableJob(RequestContext ctx, boolean enabled) { - String[] elements = ctx.getUrlElements(); - String sjid = elements[elements.length - 2]; - long xid = Long.valueOf(sjid); - Repository repository = RepositoryManager.getInstance().getRepository(); - repository.enableJob(xid, enabled); + String[] elements = ctx.getUrlElements(); + String jobIdentifier = elements[elements.length - 2]; + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + repository.enableJob(jobId, enabled); return JsonBean.EMPTY_BEAN; } + + private JsonBean startJob(RequestContext ctx) { + Repository repository = RepositoryManager.getInstance().getRepository(); + String[] elements = ctx.getUrlElements(); + String jobIdentifier = elements[elements.length - 2]; + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId)); + // TODO(SQOOP-1638): This should be outsourced somewhere more suitable than + // here + if (JobManager.getInstance().getNotificationBaseUrl() == null) { + String url = ctx.getRequest().getRequestURL().toString(); + JobManager.getInstance().setNotificationBaseUrl( + url.split("v1")[0] + "/v1/job/status/notification/"); + } + + MSubmission submission = JobManager.getInstance() + .start(jobId, prepareRequestEventContext(ctx)); + return new SubmissionBean(submission); + } + + private JsonBean stopJob(RequestContext ctx) { + Repository repository = RepositoryManager.getInstance().getRepository(); + String[] elements = ctx.getUrlElements(); + String jobIdentifier = elements[elements.length - 2]; + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId)); + MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx)); + return new SubmissionBean(submission); + } + + private JsonBean getJobStatus(RequestContext ctx) { + Repository repository = RepositoryManager.getInstance().getRepository(); + String[] elements = ctx.getUrlElements(); + String jobIdentifier = elements[elements.length - 2]; + long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId)); + MSubmission submission = JobManager.getInstance().status(jobId); + return new SubmissionBean(submission); + } + + private HttpEventContext prepareRequestEventContext(RequestContext ctx) { + HttpEventContext httpEventContext = new HttpEventContext(); + httpEventContext.setUsername(ctx.getUserName()); + return httpEventContext; + } + } diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java index 8555b0cf..cfbb5245 100644 --- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java @@ -22,37 +22,19 @@ import org.apache.log4j.Logger; import org.apache.sqoop.audit.AuditLoggerManager; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.driver.JobManager; import org.apache.sqoop.json.JsonBean; -import org.apache.sqoop.json.SubmissionBean; +import org.apache.sqoop.json.SubmissionsBean; import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; -import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.server.RequestContext; +import org.apache.sqoop.server.RequestContext.Method; import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.common.ServerError; -/** - * Submission request handler is supporting following resources: - * - * GET /v1/submission/action/:jid - * Get status of last submission for job with id :jid - * - * POST /v1/submission/action/:jid - * Create new submission for job with id :jid - * - * DELETE /v1/submission/action/:jid - * Stop last submission for job with id :jid - * - * GET /v1/submission/notification/:jid - * Notification endpoint to get job status outside normal interval - * - * Possible additions in the future: /v1/submission/history/* for history. - */ public class SubmissionRequestHandler implements RequestHandler { - private static final Logger LOG = - Logger.getLogger(SubmissionRequestHandler.class); + private static final Logger LOG = Logger.getLogger(SubmissionRequestHandler.class); public SubmissionRequestHandler() { LOG.info("SubmissionRequestHandler initialized"); @@ -60,110 +42,45 @@ public SubmissionRequestHandler() { @Override public JsonBean handleEvent(RequestContext ctx) { - String[] urlElements = ctx.getUrlElements(); - if (urlElements.length < 2) { - throw new SqoopException(ServerError.SERVER_0003, - "Invalid URL, too few arguments for this servlet."); + + // submission only support GET requests + if (ctx.getMethod() != Method.GET) { + throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:" + + ctx.getMethod()); } - - // Let's check - int length = urlElements.length; - String action = urlElements[length - 2]; - - if(action.equals("action")) { - return handleActionEvent(ctx, urlElements[length - 1]); - } - - if(action.equals("notification")) { - return handleNotification(ctx, urlElements[length - 1]); - } - - if(action.equals("history")) { - return handleHistoryEvent(ctx, urlElements[length - 1]); - } - - throw new SqoopException(ServerError.SERVER_0003, - "Do not know what to do."); - } - - private JsonBean handleNotification(RequestContext ctx, String sjid) { - LOG.debug("Received notification request for job " + sjid); - JobManager.getInstance().status(Long.parseLong(sjid)); - return JsonBean.EMPTY_BEAN; - } - - private JsonBean handleActionEvent(RequestContext ctx, String sjid) { - long jid = Long.parseLong(sjid); - - String username = ctx.getUserName(); - HttpEventContext ectx = new HttpEventContext(); - ectx.setUsername(username); - - switch (ctx.getMethod()) { - case GET: - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "status", "submission", String.valueOf(jid)); - - return submissionStatus(jid); - case POST: - // TODO: This should be outsourced somewhere more suitable than here - if(JobManager.getInstance().getNotificationBaseUrl() == null) { - String url = ctx.getRequest().getRequestURL().toString(); - JobManager.getInstance().setNotificationBaseUrl( - url.split("v1")[0] + "/v1/submission/notification/"); - } - - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "submit", "submission", String.valueOf(jid)); - - return submissionSubmit(jid, ectx); - case DELETE: - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "stop", "submission", String.valueOf(jid)); - - return submissionStop(jid, ectx); - } - - return null; - } - - private JsonBean handleHistoryEvent(RequestContext ctx, String sjid) { - AuditLoggerManager.getInstance() - .logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), - "get", "submission", sjid); - - if (sjid.equals("all")) { - return getSubmissions(); + String identifier = ctx.getLastURLElement(); + Repository repository = RepositoryManager.getInstance().getRepository(); + // links by connector ordered by updated time + // hence the latest submission is on the top + if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) { + identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM); + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier); + if (repository.findJob(identifier) != null) { + long jobId = repository.findJob(identifier).getPersistenceId(); + return getSubmissionsForJob(jobId); + } else { + // this means name nor Id existed + throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier + + " name given"); + } } else { - return getSubmissionsForJob(Long.parseLong(sjid)); + // all submissions in the system + AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), + ctx.getRequest().getRemoteAddr(), "get", "submissions", "all"); + return getSubmissions(); } } - private JsonBean submissionStop(long jid, HttpEventContext ctx) { - MSubmission submission = JobManager.getInstance().stop(jid, ctx); - return new SubmissionBean(submission); - } - - private JsonBean submissionSubmit(long jid, HttpEventContext ctx) { - MSubmission submission = JobManager.getInstance().submit(jid, ctx); - return new SubmissionBean(submission); - } - - private JsonBean submissionStatus(long jid) { - MSubmission submission = JobManager.getInstance().status(jid); - return new SubmissionBean(submission); - } - private JsonBean getSubmissions() { - List submissions = RepositoryManager.getInstance().getRepository().findSubmissions(); - return new SubmissionBean(submissions); + List submissions = RepositoryManager.getInstance().getRepository() + .findSubmissions(); + return new SubmissionsBean(submissions); } private JsonBean getSubmissionsForJob(long jid) { - List submissions = RepositoryManager.getInstance().getRepository().findSubmissionsForJob(jid); - return new SubmissionBean(submissions); + List submissions = RepositoryManager.getInstance().getRepository() + .findSubmissionsForJob(jid); + return new SubmissionsBean(submissions); } } diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java index d2952373..0d15d0a8 100644 --- a/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java +++ b/server/src/main/java/org/apache/sqoop/server/v1/JobServlet.java @@ -23,7 +23,53 @@ import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.SqoopProtocolServlet; + /** + * Provides operations for job resource + * + * GET /v1/job/{jid} + * Return details about one particular job with id:jid or about all of + * them if jid equals to "all" + * + * POST /v1/job + * Create new job + * POST /v1/job/ with {from-link-id}, {to-link-id} and other job details in the post data + * Create job with from and to link + * PUT /v1/link/ with {from-link-id}, {to-link-id} and other job details in the post data + * Edit/Update job for the from and to link + * + * PUT /v1/job/{jid} and the job details in the post data + * Update job with id jid. + * + * PUT /v1/job/{jid}/enable + * Enable job with id jid + * PUT /v1/job/{jname}s/disable + * Enable job with name jname + * + * PUT /v1/job/{jid}/disable + * Disable job with id jid + * PUT /v1/job/{jname}/disable + * Disable job with name jname + * + * DELETE /v1/job/{jid} + * Remove job with id jid + * DELETE /v1/job/{jname} + * Remove job with name jname + * + * PUT /v1/job/{jid}/submit + * Submit job with id jid to create a submission record + * PUT /v1/job/{jname}/submit + * Submit job with name jname to create a submission record + * + * PUT /v1/job/{jid}/stop + * Abort/Stop last running associated submission with job id jid + * PUT /v1/job/{jname}/stop + * Abort/Stop last running associated submission with job name jname + * + * GET /v1/job/{jid}/status + * get status of running job with job id jid + * GET /v1/job/{jname}/status + * get status of running job with job name jname * */ @SuppressWarnings("serial") diff --git a/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java new file mode 100644 index 00000000..5184a0bd --- /dev/null +++ b/server/src/main/java/org/apache/sqoop/server/v1/JobsServlet.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.server.v1; + +import org.apache.sqoop.handler.JobRequestHandler; +import org.apache.sqoop.json.JsonBean; +import org.apache.sqoop.server.RequestContext; +import org.apache.sqoop.server.RequestHandler; +import org.apache.sqoop.server.SqoopProtocolServlet; + + +/** + * Displays all or jobs per connector in sqoop + * + * GET /v1/jobs + * Return details about every jobs that exists in the sqoop system + * GET /v1/jobs?cname= + * Return details about job(s) for a given connector name {cname} +*/ +@SuppressWarnings("serial") +public class JobsServlet extends SqoopProtocolServlet { + + private RequestHandler jobRequestHandler; + + public JobsServlet() { + jobRequestHandler = new JobRequestHandler(); + } + + @Override + protected JsonBean handleGetRequest(RequestContext ctx) throws Exception { + return jobRequestHandler.handleEvent(ctx); + } +} diff --git a/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionsServlet.java similarity index 78% rename from server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java rename to server/src/main/java/org/apache/sqoop/server/v1/SubmissionsServlet.java index 5c1d8832..5337fdde 100644 --- a/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java +++ b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionsServlet.java @@ -24,14 +24,21 @@ import org.apache.sqoop.server.SqoopProtocolServlet; /** + * Display job submissions in the sqoop system + * + * GET /v1/submissions + * Get all submissions in the system + * + * GET /v1/submissions?jname= + * Get all submissions for the given job ordered by updated time * */ @SuppressWarnings("serial") -public class SubmissionServlet extends SqoopProtocolServlet { +public class SubmissionsServlet extends SqoopProtocolServlet { private RequestHandler submissionRequestHandler; - public SubmissionServlet() { + public SubmissionsServlet() { submissionRequestHandler = new SubmissionRequestHandler(); } @@ -39,14 +46,4 @@ public SubmissionServlet() { protected JsonBean handleGetRequest(RequestContext ctx) throws Exception { return submissionRequestHandler.handleEvent(ctx); } - - @Override - protected JsonBean handlePostRequest(RequestContext ctx) throws Exception { - return submissionRequestHandler.handleEvent(ctx); - } - - @Override - protected JsonBean handleDeleteRequest(RequestContext ctx) throws Exception { - return submissionRequestHandler.handleEvent(ctx); - } } diff --git a/server/src/main/webapp/WEB-INF/web.xml b/server/src/main/webapp/WEB-INF/web.xml index 6ad90d2f..d405c883 100644 --- a/server/src/main/webapp/WEB-INF/web.xml +++ b/server/src/main/webapp/WEB-INF/web.xml @@ -123,16 +123,28 @@ limitations under the License. /v1/job/* - + - v1.SubmissionServlet - org.apache.sqoop.server.v1.SubmissionServlet + v1.JobsServlet + org.apache.sqoop.server.v1.JobsServlet 1 - v1.SubmissionServlet - /v1/submission/* + v1.JobsServlet + /v1/jobs/* + + + + + v1.SubmissionsServlet + org.apache.sqoop.server.v1.SubmissionsServlet + 1 + + + + v1.SubmissionsServlet + /v1/submissions/* diff --git a/shell/src/main/java/org/apache/sqoop/shell/StatusJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowJobStatusFunction.java similarity index 92% rename from shell/src/main/java/org/apache/sqoop/shell/StatusJobFunction.java rename to shell/src/main/java/org/apache/sqoop/shell/ShowJobStatusFunction.java index 922977af..8e57d338 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/StatusJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/ShowJobStatusFunction.java @@ -29,10 +29,10 @@ import org.apache.sqoop.validation.Status; @SuppressWarnings("serial") -public class StatusJobFunction extends SqoopFunction { +public class ShowJobStatusFunction extends SqoopFunction { @SuppressWarnings("static-access") - public StatusJobFunction() { + public ShowJobStatusFunction() { this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_JID) .withDescription(resourceString(Constants.RES_PROMPT_JOB_ID)) .withLongOpt(Constants.OPT_JID) @@ -42,7 +42,7 @@ public StatusJobFunction() { @Override public Object executeFunction(CommandLine line, boolean isInteractive) { if (line.hasOption(Constants.OPT_JID)) { - MSubmission submission = client.getSubmissionStatus(getLong(line, Constants.OPT_JID)); + MSubmission submission = client.getJobStatus(getLong(line, Constants.OPT_JID)); if(submission.getStatus().isFailure() || submission.getStatus().equals(SubmissionStatus.SUCCEEDED)) { SubmissionDisplayer.displayHeader(submission); SubmissionDisplayer.displayFooter(submission); diff --git a/shell/src/main/java/org/apache/sqoop/shell/StartJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/StartJobFunction.java index 0dc4e781..dd61d7a1 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/StartJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/StartJobFunction.java @@ -72,12 +72,12 @@ public void finished(MSubmission submission) { }; try { - client.startSubmission(getLong(line, Constants.OPT_JID), callback, pollTimeout); + client.startJob(getLong(line, Constants.OPT_JID), callback, pollTimeout); } catch (InterruptedException e) { throw new SqoopException(ShellError.SHELL_0007, e); } } else if (line.hasOption(Constants.OPT_JID)) { - MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID)); + MSubmission submission = client.startJob(getLong(line, Constants.OPT_JID)); if(submission.getStatus().isFailure()) { SubmissionDisplayer.displayFooter(submission); } else { diff --git a/shell/src/main/java/org/apache/sqoop/shell/StatusCommand.java b/shell/src/main/java/org/apache/sqoop/shell/StatusCommand.java index 3447a87c..5b4ef1fa 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/StatusCommand.java +++ b/shell/src/main/java/org/apache/sqoop/shell/StatusCommand.java @@ -28,7 +28,7 @@ protected StatusCommand(Shell shell) { Constants.CMD_STATUS, Constants.CMD_STATUS_SC, new ImmutableMap.Builder>() - .put(Constants.FN_JOB, StatusJobFunction.class) + .put(Constants.FN_JOB, ShowJobStatusFunction.class) .build() ); } diff --git a/shell/src/main/java/org/apache/sqoop/shell/StopJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/StopJobFunction.java index c34152e3..2b28b3b7 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/StopJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/StopJobFunction.java @@ -40,7 +40,7 @@ public StopJobFunction() { @Override public Object executeFunction(CommandLine line, boolean isInteractive) { if (line.hasOption(Constants.OPT_JID)) { - MSubmission submission = client.stopSubmission(getLong(line, Constants.OPT_JID)); + MSubmission submission = client.stopJob(getLong(line, Constants.OPT_JID)); if(submission.getStatus().isFailure()) { SubmissionDisplayer.displayFooter(submission); } else { diff --git a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java index 79a97944..02e8c8c4 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java @@ -61,13 +61,13 @@ private Status updateJob(Long jobId, List args, boolean isInteractive) t ConsoleReader reader = new ConsoleReader(); + // TODO(SQOOP-1634): using from/to and driver config id, this call can be avoided MJob job = client.getJob(jobId); ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle( job.getConnectorId(Direction.FROM)); ResourceBundle toConnectorBundle = client.getConnectorConfigBundle( job.getConnectorId(Direction.TO)); - ResourceBundle driverConfigBundle = client.getDriverConfigBundle(); Status status = Status.OK; diff --git a/shell/src/main/java/org/apache/sqoop/shell/UpdateLinkFunction.java b/shell/src/main/java/org/apache/sqoop/shell/UpdateLinkFunction.java index c3775add..c0c47e14 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/UpdateLinkFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/UpdateLinkFunction.java @@ -104,4 +104,4 @@ private Status updateLink(Long linkId, List args, boolean isInteractive) return status; } -} \ No newline at end of file +} diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index c81364b7..e9c15a12 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -242,7 +242,7 @@ protected void saveJob(MJob job) { * @throws Exception */ protected void executeJob(long jid) throws Exception { - getClient().startSubmission(jid, DEFAULT_SUBMISSION_CALLBACKS, 100); + getClient().startJob(jid, DEFAULT_SUBMISSION_CALLBACKS, 100); } /** diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 36f74431..e482ac59 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -101,14 +101,14 @@ public void testColumns() throws Exception { fillHdfsToConfig(job, ToFormat.TEXT_FILE); saveJob(job); - MSubmission submission = getClient().startSubmission(job.getPersistenceId()); + MSubmission submission = getClient().startJob(job.getPersistenceId()); assertTrue(submission.getStatus().isRunning()); // Wait until the job finish - this active waiting will be removed once // Sqoop client API will get blocking support. do { Thread.sleep(5000); - submission = getClient().getSubmissionStatus(job.getPersistenceId()); + submission = getClient().getJobStatus(job.getPersistenceId()); } while(submission.getStatus().isRunning()); // Assert correct output diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java index c219e684..53dac1b5 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java @@ -34,7 +34,7 @@ import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JsonBean; import org.apache.sqoop.json.LinkBean; -import org.apache.sqoop.json.SubmissionBean; +import org.apache.sqoop.json.SubmissionsBean; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.tools.ConfiguredTool; @@ -110,7 +110,7 @@ private JSONObject dump(boolean skipSensitive) { result.put(JSONConstants.JOBS, addConnectorName(jobs.extract(skipSensitive))); LOG.info("Dumping Submissions with skipSensitive=" + String.valueOf(skipSensitive)); - SubmissionBean submissions = new SubmissionBean(repository.findSubmissions()); + SubmissionsBean submissions = new SubmissionsBean(repository.findSubmissions()); result.put(JSONConstants.SUBMISSIONS, submissions.extract(skipSensitive)); result.put(JSONConstants.METADATA, repoMetadata(skipSensitive));