5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-08 01:11:29 +08:00

SQOOP-1510: Sqoop2: Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation only

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-03 21:23:18 -08:00
parent f62e161dc2
commit fedc12a65e
36 changed files with 815 additions and 489 deletions

View File

@ -81,6 +81,8 @@ public class SqoopClient {
/** /**
* Status flags used when updating the submission callback status * Status flags used when updating the submission callback status
*/ */
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): Why do wee need a duplicate status enum in client when shell is using the server status?
// NOTE: the getStatus method is on the job resource and this needs to be revisited
private enum SubmissionStatus { private enum SubmissionStatus {
SUBMITTED, SUBMITTED,
UPDATED, UPDATED,
@ -438,8 +440,8 @@ public void deleteJob(long jobId) {
* @param jobId Job id * @param jobId Job id
* @return * @return
*/ */
public MSubmission startSubmission(long jobId) { public MSubmission startJob(long jobId) {
return resourceRequests.createSubmission(jobId).getSubmissions().get(0); return resourceRequests.startJob(jobId).getSubmissions().get(0);
} }
/** /**
@ -452,24 +454,27 @@ public MSubmission startSubmission(long jobId) {
* @return MSubmission - Final status of job submission * @return MSubmission - Final status of job submission
* @throws InterruptedException * @throws InterruptedException
*/ */
public MSubmission startSubmission(long jobId, SubmissionCallback callback, long pollTime) public MSubmission startJob(long jobId, SubmissionCallback callback, long pollTime)
throws InterruptedException { throws InterruptedException {
if(pollTime <= 0) { if(pollTime <= 0) {
throw new SqoopException(ClientError.CLIENT_0002); throw new SqoopException(ClientError.CLIENT_0002);
} }
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start/first terminology difference
// What does first even mean in s distributed client/server model?
boolean first = true; boolean first = true;
MSubmission submission = resourceRequests.createSubmission(jobId).getSubmissions().get(0); MSubmission submission = resourceRequests.startJob(jobId).getSubmissions().get(0);
// what happens when the server fails, do we just say finished?
while(submission.getStatus().isRunning()) { while(submission.getStatus().isRunning()) {
if(first) { if(first) {
submissionCallback(callback, submission, SubmissionStatus.SUBMITTED); invokeSubmissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
first = false; first = false;
} else { } else {
submissionCallback(callback, submission, SubmissionStatus.UPDATED); invokeSubmissionCallback(callback, submission, SubmissionStatus.UPDATED);
} }
Thread.sleep(pollTime); Thread.sleep(pollTime);
submission = getSubmissionStatus(jobId); submission = getJobStatus(jobId);
} }
submissionCallback(callback, submission, SubmissionStatus.FINISHED); invokeSubmissionCallback(callback, submission, SubmissionStatus.FINISHED);
return submission; return submission;
} }
@ -480,7 +485,7 @@ public MSubmission startSubmission(long jobId, SubmissionCallback callback, long
* @param submission * @param submission
* @param status * @param status
*/ */
private void submissionCallback(SubmissionCallback callback, MSubmission submission, private void invokeSubmissionCallback(SubmissionCallback callback, MSubmission submission,
SubmissionStatus status) { SubmissionStatus status) {
if (callback == null) { if (callback == null) {
return; return;
@ -494,17 +499,19 @@ private void submissionCallback(SubmissionCallback callback, MSubmission submiss
break; break;
case FINISHED: case FINISHED:
callback.finished(submission); callback.finished(submission);
default:
break;
} }
} }
/** /**
* Stop job with given id. * stop job with given id.
* *
* @param jid Job id * @param jid Job id
* @return * @return
*/ */
public MSubmission stopSubmission(long jid) { public MSubmission stopJob(long jid) {
return resourceRequests.deleteSubmission(jid).getSubmissions().get(0); return resourceRequests.stopJob(jid).getSubmissions().get(0);
} }
/** /**
@ -513,8 +520,8 @@ public MSubmission stopSubmission(long jid) {
* @param jid Job id * @param jid Job id
* @return * @return
*/ */
public MSubmission getSubmissionStatus(long jid) { public MSubmission getJobStatus(long jid) {
return resourceRequests.readSubmission(jid).getSubmissions().get(0); return resourceRequests.getJobStatus(jid).getSubmissions().get(0);
} }
/** /**
@ -523,7 +530,7 @@ public MSubmission getSubmissionStatus(long jid) {
* @return * @return
*/ */
public List<MSubmission> getSubmissions() { public List<MSubmission> getSubmissions() {
return resourceRequests.readHistory(null).getSubmissions(); return resourceRequests.readSubmission(null).getSubmissions();
} }
/** /**
@ -533,7 +540,7 @@ public List<MSubmission> getSubmissions() {
* @return * @return
*/ */
public List<MSubmission> getSubmissionsForJob(long jobId) { public List<MSubmission> getSubmissionsForJob(long jobId) {
return resourceRequests.readHistory(jobId).getSubmissions(); return resourceRequests.readSubmission(jobId).getSubmissions();
} }
private Status applyLinkValidations(ValidationResultBean bean, MLink link) { private Status applyLinkValidations(ValidationResultBean bean, MLink link) {
@ -541,12 +548,13 @@ private Status applyLinkValidations(ValidationResultBean bean, MLink link) {
// Apply validation results // Apply validation results
ConfigUtils.applyValidation(link.getConnectorLinkConfig().getConfigs(), linkConfig); ConfigUtils.applyValidation(link.getConnectorLinkConfig().getConfigs(), linkConfig);
Long id = bean.getId(); Long id = bean.getId();
if(id != null) { if (id != null) {
link.setPersistenceId(id); link.setPersistenceId(id);
} }
return Status.getWorstStatus(linkConfig.getStatus()); return Status.getWorstStatus(linkConfig.getStatus());
} }
private Status applyJobValidations(ValidationResultBean bean, MJob job) { private Status applyJobValidations(ValidationResultBean bean, MJob job) {
ConfigValidationResult fromConfig = bean.getValidationResults()[0]; ConfigValidationResult fromConfig = bean.getValidationResults()[0];
ConfigValidationResult toConfig = bean.getValidationResults()[1]; ConfigValidationResult toConfig = bean.getValidationResults()[1];

View File

@ -20,8 +20,9 @@
import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MSubmission;
/** /**
* Callback interface for Synchronous job submission * Callback interface for synchronous job submission
*/ */
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start consistent usage
public interface SubmissionCallback { public interface SubmissionCallback {
/** /**

View File

@ -18,14 +18,16 @@
package org.apache.sqoop.client.request; package org.apache.sqoop.client.request;
import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.JobsBean;
import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.json.ValidationResultBean; import org.apache.sqoop.json.ValidationResultBean;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import org.json.simple.JSONValue; import org.json.simple.JSONValue;
/** /**
* Provide CRUD semantics over RESTfull HTTP API for jobs. All operations * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations are
* are normally supported. * normally supported.
*/ */
public class JobResourceRequest extends ResourceRequest { public class JobResourceRequest extends ResourceRequest {
@ -33,18 +35,25 @@ public class JobResourceRequest extends ResourceRequest {
private static final String ENABLE = "/enable"; private static final String ENABLE = "/enable";
private static final String DISABLE = "/disable"; private static final String DISABLE = "/disable";
private static final String START = "/start";
private static final String STOP = "/stop";
private static final String STATUS = "/status";
public JobBean read(String serverUrl, Long linkId) { public JobBean read(String serverUrl, Long jobId) {
String response; String response;
if (linkId == null) { if (jobId == null) {
response = super.get(serverUrl + RESOURCE + "all"); response = super.get(serverUrl + RESOURCE + "all");
} else { } else {
response = super.get(serverUrl + RESOURCE + linkId); response = super.get(serverUrl + RESOURCE + jobId);
} }
JSONObject jsonObject = (JSONObject) JSONValue.parse(response); JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
JobBean jobBean = new JobBean(); // defaults to all
jobBean.restore(jsonObject); JobBean bean = new JobsBean();
return jobBean; if (jobId != null) {
bean = new JobBean();
}
bean.restore(jsonObject);
return bean;
} }
public ValidationResultBean create(String serverUrl, MJob job) { public ValidationResultBean create(String serverUrl, MJob job) {
@ -61,21 +70,43 @@ public ValidationResultBean update(String serverUrl, MJob job) {
JobBean jobBean = new JobBean(job); JobBean jobBean = new JobBean(job);
// Extract all config inputs including sensitive inputs // Extract all config inputs including sensitive inputs
JSONObject jobJson = jobBean.extract(false); JSONObject jobJson = jobBean.extract(false);
String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), jobJson.toJSONString()); String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
jobJson.toJSONString());
ValidationResultBean validationBean = new ValidationResultBean(); ValidationResultBean validationBean = new ValidationResultBean();
validationBean.restore((JSONObject) JSONValue.parse(response)); validationBean.restore((JSONObject) JSONValue.parse(response));
return validationBean; return validationBean;
} }
public void delete(String serverUrl, Long id) { public void delete(String serverUrl, Long jobId) {
super.delete(serverUrl + RESOURCE + id); super.delete(serverUrl + RESOURCE + jobId);
} }
public void enable(String serverUrl, Long id, Boolean enabled) { public void enable(String serverUrl, Long jobId, Boolean enabled) {
if (enabled) { if (enabled) {
super.put(serverUrl + RESOURCE + id + ENABLE, null); super.put(serverUrl + RESOURCE + jobId + ENABLE, null);
} else { } else {
super.put(serverUrl + RESOURCE + id + DISABLE, null); super.put(serverUrl + RESOURCE + jobId + DISABLE, null);
} }
} }
public SubmissionBean start(String serverUrl, Long jobId) {
String response = super.put(serverUrl + RESOURCE + jobId + START, null);
return createJobSubmissionResponse(response);
}
public SubmissionBean stop(String serverUrl, Long jobId) {
String response = super.put(serverUrl + RESOURCE + jobId + STOP, null);
return createJobSubmissionResponse(response);
}
public SubmissionBean status(String serverUrl, Long jobId) {
String response = super.get(serverUrl + RESOURCE + jobId + STATUS);
return createJobSubmissionResponse(response);
}
private SubmissionBean createJobSubmissionResponse(String response) {
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore((JSONObject) JSONValue.parse(response));
return submissionBean;
}
} }

View File

@ -17,14 +17,15 @@
*/ */
package org.apache.sqoop.client.request; package org.apache.sqoop.client.request;
import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.ConnectorBean; import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.DriverBean;
import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.json.ValidationResultBean; import org.apache.sqoop.json.ValidationResultBean;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
/** /**
* Unified class for all request objects. * Unified class for all request objects.
@ -131,19 +132,19 @@ public void deleteJob(Long jid) {
getJobResourceRequest().delete(serverUrl, jid); getJobResourceRequest().delete(serverUrl, jid);
} }
public SubmissionBean readHistory(Long jid) { public SubmissionBean getJobStatus(Long jid) {
return getSubmissionResourceRequest().readHistory(serverUrl, jid); return getJobResourceRequest().status(serverUrl, jid);
} }
public SubmissionBean readSubmission(Long jid) { public SubmissionBean startJob(Long jid) {
return getJobResourceRequest().start(serverUrl, jid);
}
public SubmissionBean stopJob(Long jid) {
return getJobResourceRequest().stop(serverUrl, jid);
}
public SubmissionsBean readSubmission(Long jid) {
return getSubmissionResourceRequest().read(serverUrl, jid); return getSubmissionResourceRequest().read(serverUrl, jid);
} }
public SubmissionBean createSubmission(Long jid) {
return getSubmissionResourceRequest().create(serverUrl, jid);
}
public SubmissionBean deleteSubmission(Long jid) {
return getSubmissionResourceRequest().delete(serverUrl, jid);
}
} }

View File

@ -17,7 +17,7 @@
*/ */
package org.apache.sqoop.client.request; package org.apache.sqoop.client.request;
import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.SubmissionsBean;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import org.json.simple.JSONValue; import org.json.simple.JSONValue;
@ -27,54 +27,18 @@
*/ */
public class SubmissionResourceRequest extends ResourceRequest { public class SubmissionResourceRequest extends ResourceRequest {
public static final String RESOURCE = "v1/submission/"; public static final String RESOURCE = "v1/submissions/";
public static final String ACTION = RESOURCE + "action/"; public SubmissionsBean read(String serverUrl, Long jid) {
public static final String HISTORY = RESOURCE + "history/";
public SubmissionBean readHistory(String serverUrl, Long jid) {
String response; String response;
if (jid == null) { if (jid == null) {
response = super.get(serverUrl + HISTORY + "all"); response = super.get(serverUrl + RESOURCE);
} else { } else {
response = super.get(serverUrl + HISTORY + jid); response = super.get(serverUrl + RESOURCE + jid);
} }
JSONObject jsonObject = (JSONObject) JSONValue.parse(response); JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
SubmissionsBean submissionBean = new SubmissionsBean();
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore(jsonObject); submissionBean.restore(jsonObject);
return submissionBean;
}
public SubmissionBean read(String serverUrl, Long jid) {
String response = super.get(serverUrl + ACTION + jid);
JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore(jsonObject);
return submissionBean;
}
public SubmissionBean create(String serverUrl, Long jid) {
String response = super.post(serverUrl + ACTION + jid, null);
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore((JSONObject) JSONValue.parse(response));
return submissionBean;
}
public SubmissionBean delete(String serverUrl, Long id) {
String response = super.delete(serverUrl + ACTION + id);
SubmissionBean submissionBean = new SubmissionBean();
submissionBean.restore((JSONObject) JSONValue.parse(response));
return submissionBean; return submissionBean;
} }
} }

View File

@ -19,8 +19,6 @@
import static org.apache.sqoop.json.util.ConfigInputSerialization.extractConfigList; import static org.apache.sqoop.json.util.ConfigInputSerialization.extractConfigList;
import static org.apache.sqoop.json.util.ConfigInputSerialization.restoreConfigList; import static org.apache.sqoop.json.util.ConfigInputSerialization.restoreConfigList;
import static org.apache.sqoop.json.util.ConfigBundleSerialization.extractConfigParamBundle;
import static org.apache.sqoop.json.util.ConfigBundleSerialization.restoreConfigParamBundle;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -39,7 +37,7 @@
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
/** /**
* Json representation of the job config * Json representation of the job
*/ */
public class JobBean implements JsonBean { public class JobBean implements JsonBean {
@ -50,6 +48,7 @@ public class JobBean implements JsonBean {
static final String FROM_CONFIG = "from-config"; static final String FROM_CONFIG = "from-config";
static final String TO_CONFIG = "to-config"; static final String TO_CONFIG = "to-config";
static final String DRIVER_CONFIG = "driver-config"; static final String DRIVER_CONFIG = "driver-config";
private static final String JOB = "job";
// Required // Required
private List<MJob> jobs; private List<MJob> jobs;
@ -102,9 +101,16 @@ public ResourceBundle getDriverConfigBundle() {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) { public JSONObject extract(boolean skipSensitive) {
JSONArray array = new JSONArray(); JSONArray jobArray = extractJobs(skipSensitive);
JSONObject job = new JSONObject();
job.put(JOB, jobArray);
return job;
}
for(MJob job : jobs) { @SuppressWarnings("unchecked")
protected JSONArray extractJobs(boolean skipSensitive) {
JSONArray jobArray = new JSONArray();
for (MJob job : jobs) {
JSONObject object = new JSONObject(); JSONObject object = new JSONObject();
object.put(ID, job.getPersistenceId()); object.put(ID, job.getPersistenceId());
@ -115,6 +121,7 @@ public JSONObject extract(boolean skipSensitive) {
object.put(UPDATE_USER, job.getLastUpdateUser()); object.put(UPDATE_USER, job.getLastUpdateUser());
object.put(UPDATE_DATE, job.getLastUpdateDate().getTime()); object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
// job link associated connectors // job link associated connectors
// TODO(SQOOP-1634): fix not to require the connectorIds in the post data
object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM)); object.put(FROM_CONNECTOR_ID, job.getConnectorId(Direction.FROM));
object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO)); object.put(TO_CONNECTOR_ID, job.getConnectorId(Direction.TO));
// job associated links // job associated links
@ -122,25 +129,30 @@ public JSONObject extract(boolean skipSensitive) {
object.put(TO_LINK_ID, job.getLinkId(Direction.TO)); object.put(TO_LINK_ID, job.getLinkId(Direction.TO));
// job configs // job configs
MFromConfig fromConfigList = job.getFromJobConfig(); MFromConfig fromConfigList = job.getFromJobConfig();
object.put(FROM_CONFIG, extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive)); object.put(FROM_CONFIG,
extractConfigList(fromConfigList.getConfigs(), fromConfigList.getType(), skipSensitive));
MToConfig toConfigList = job.getToJobConfig(); MToConfig toConfigList = job.getToJobConfig();
object.put(TO_CONFIG, extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive)); object.put(TO_CONFIG,
extractConfigList(toConfigList.getConfigs(), toConfigList.getType(), skipSensitive));
MDriverConfig driverConfigList = job.getDriverConfig(); MDriverConfig driverConfigList = job.getDriverConfig();
object.put(DRIVER_CONFIG, extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(), skipSensitive)); object.put(
DRIVER_CONFIG,
extractConfigList(driverConfigList.getConfigs(), driverConfigList.getType(),
skipSensitive));
array.add(object); jobArray.add(object);
} }
return jobArray;
JSONObject all = new JSONObject();
all.put(ALL, array);
return all;
} }
@Override @Override
public void restore(JSONObject jsonObject) { public void restore(JSONObject jsonObject) {
jobs = new ArrayList<MJob>(); JSONArray array = (JSONArray) jsonObject.get(JOB);
restoreJobs(array);
}
JSONArray array = (JSONArray) jsonObject.get(ALL); protected void restoreJobs(JSONArray array) {
jobs = new ArrayList<MJob>();
for (Object obj : array) { for (Object obj : array) {
JSONObject object = (JSONObject) obj; JSONObject object = (JSONObject) obj;

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json;
import java.util.List;
import org.apache.sqoop.model.MJob;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
/**
* Json representation of the jobs
*/
public class JobsBean extends JobBean {
private static final String JOBS = "jobs";
public JobsBean(MJob job) {
super(job);
}
public JobsBean(List<MJob> jobs) {
super(jobs);
}
// For "restore"
public JobsBean() {
}
@Override
@SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) {
JSONArray jobArray = super.extractJobs(skipSensitive);
JSONObject jobs = new JSONObject();
jobs.put(JOBS, jobArray);
return jobs;
}
@Override
public void restore(JSONObject jsonObject) {
JSONArray array = (JSONArray) jsonObject.get(JOBS);
restoreJobs(array);
}
}

View File

@ -25,6 +25,7 @@ public interface JsonBean {
static final String CONFIGURABLE_VERSION = "version"; static final String CONFIGURABLE_VERSION = "version";
static final String ALL_CONFIGS = "all-configs"; static final String ALL_CONFIGS = "all-configs";
@Deprecated // should not be used anymore in the rest api
static final String ALL = "all"; static final String ALL = "all";
static final String ID = "id"; static final String ID = "id";
static final String NAME = "name"; static final String NAME = "name";

View File

@ -17,6 +17,15 @@
*/ */
package org.apache.sqoop.json; package org.apache.sqoop.json;
import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.Counter;
@ -25,21 +34,12 @@
import org.json.simple.JSONArray; import org.json.simple.JSONArray;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.sqoop.json.util.SchemaSerialization.extractSchema;
import static org.apache.sqoop.json.util.SchemaSerialization.restoreSchema;
/** /**
* *
*/ */
public class SubmissionBean implements JsonBean { public class SubmissionBean implements JsonBean {
private static final String ALL = "all"; private static final String SUBMISSION = "submission";
private static final String JOB = "job"; private static final String JOB = "job";
private static final String CREATION_USER = "creation-user"; private static final String CREATION_USER = "creation-user";
private static final String CREATION_DATE = "creation-date"; private static final String CREATION_DATE = "creation-date";
@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
private static final String EXCEPTION_TRACE = "exception-trace"; private static final String EXCEPTION_TRACE = "exception-trace";
private static final String PROGRESS = "progress"; private static final String PROGRESS = "progress";
private static final String COUNTERS = "counters"; private static final String COUNTERS = "counters";
private static final String FROM_SCHEMA = "schema-from"; private static final String FROM_SCHEMA = "from-schema";
private static final String TO_SCHEMA = "schema-to"; private static final String TO_SCHEMA = "to-schema";
private List<MSubmission> submissions; private List<MSubmission> submissions;
@ -80,79 +80,83 @@ public SubmissionBean() {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) { public JSONObject extract(boolean skipSensitive) {
JSONArray array = new JSONArray(); JSONArray submissionArray = extractSubmissions();
JSONObject submission = new JSONObject();
submission.put(SUBMISSION, submissionArray);
return submission;
}
for(MSubmission submission : this.submissions) { @SuppressWarnings("unchecked")
protected JSONArray extractSubmissions() {
JSONArray submissionsArray = new JSONArray();
for (MSubmission submission : this.submissions) {
JSONObject object = new JSONObject(); JSONObject object = new JSONObject();
object.put(JOB, submission.getJobId()); object.put(JOB, submission.getJobId());
object.put(STATUS, submission.getStatus().name()); object.put(STATUS, submission.getStatus().name());
object.put(PROGRESS, submission.getProgress()); object.put(PROGRESS, submission.getProgress());
if(submission.getCreationUser() != null) { if (submission.getCreationUser() != null) {
object.put(CREATION_USER, submission.getCreationUser()); object.put(CREATION_USER, submission.getCreationUser());
} }
if(submission.getCreationDate() != null) { if (submission.getCreationDate() != null) {
object.put(CREATION_DATE, submission.getCreationDate().getTime()); object.put(CREATION_DATE, submission.getCreationDate().getTime());
} }
if(submission.getLastUpdateUser() != null) { if (submission.getLastUpdateUser() != null) {
object.put(LAST_UPDATE_USER, submission.getLastUpdateUser()); object.put(LAST_UPDATE_USER, submission.getLastUpdateUser());
} }
if(submission.getLastUpdateDate() != null) { if (submission.getLastUpdateDate() != null) {
object.put(LAST_UPDATE_DATE, submission.getLastUpdateDate().getTime()); object.put(LAST_UPDATE_DATE, submission.getLastUpdateDate().getTime());
} }
if(submission.getExternalId() != null) { if (submission.getExternalId() != null) {
object.put(EXTERNAL_ID, submission.getExternalId()); object.put(EXTERNAL_ID, submission.getExternalId());
} }
if(submission.getExternalLink() != null) { if (submission.getExternalLink() != null) {
object.put(EXTERNAL_LINK, submission.getExternalLink()); object.put(EXTERNAL_LINK, submission.getExternalLink());
} }
if(submission.getExceptionInfo() != null) { if (submission.getExceptionInfo() != null) {
object.put(EXCEPTION, submission.getExceptionInfo()); object.put(EXCEPTION, submission.getExceptionInfo());
} }
if(submission.getExceptionStackTrace() != null) { if (submission.getExceptionStackTrace() != null) {
object.put(EXCEPTION_TRACE, submission.getExceptionStackTrace()); object.put(EXCEPTION_TRACE, submission.getExceptionStackTrace());
} }
if(submission.getCounters() != null) { if (submission.getCounters() != null) {
object.put(COUNTERS, extractCounters(submission.getCounters())); object.put(COUNTERS, extractCounters(submission.getCounters()));
} }
if(submission.getFromSchema() != null) { if (submission.getFromSchema() != null) {
object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema())); object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
} }
if(submission.getToSchema() != null) { if (submission.getToSchema() != null) {
object.put(TO_SCHEMA, extractSchema(submission.getToSchema())); object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
} }
submissionsArray.add(object);
array.add(object);
} }
return submissionsArray;
JSONObject all = new JSONObject();
all.put(ALL, array);
return all;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public JSONObject extractCounters(Counters counters) { private JSONObject extractCounters(Counters counters) {
JSONObject ret = new JSONObject(); JSONObject counterArray = new JSONObject();
for(CounterGroup group : counters) { for (CounterGroup group : counters) {
JSONObject counterGroup = new JSONObject(); JSONObject counterGroup = new JSONObject();
for(Counter counter : group) { for (Counter counter : group) {
counterGroup.put(counter.getName(), counter.getValue()); counterGroup.put(counter.getName(), counter.getValue());
} }
counterArray.put(group.getName(), counterGroup);
ret.put(group.getName(), counterGroup);
} }
return ret; return counterArray;
} }
@Override @Override
public void restore(JSONObject json) { public void restore(JSONObject json) {
JSONArray submissionArray = (JSONArray) json.get(SUBMISSION);
restoreSubmissions(submissionArray);
}
protected void restoreSubmissions(JSONArray array) {
this.submissions = new ArrayList<MSubmission>(); this.submissions = new ArrayList<MSubmission>();
JSONArray array = (JSONArray) json.get(ALL);
for (Object obj : array) { for (Object obj : array) {
JSONObject object = (JSONObject) obj; JSONObject object = (JSONObject) obj;
MSubmission submission = new MSubmission(); MSubmission submission = new MSubmission();
@ -161,38 +165,38 @@ public void restore(JSONObject json) {
submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS))); submission.setStatus(SubmissionStatus.valueOf((String) object.get(STATUS)));
submission.setProgress((Double) object.get(PROGRESS)); submission.setProgress((Double) object.get(PROGRESS));
if(object.containsKey(CREATION_USER)) { if (object.containsKey(CREATION_USER)) {
submission.setCreationUser((String) object.get(CREATION_USER)); submission.setCreationUser((String) object.get(CREATION_USER));
} }
if(object.containsKey(CREATION_DATE)) { if (object.containsKey(CREATION_DATE)) {
submission.setCreationDate(new Date((Long) object.get(CREATION_DATE))); submission.setCreationDate(new Date((Long) object.get(CREATION_DATE)));
} }
if(object.containsKey(LAST_UPDATE_USER)) { if (object.containsKey(LAST_UPDATE_USER)) {
submission.setLastUpdateUser((String) object.get(LAST_UPDATE_USER)); submission.setLastUpdateUser((String) object.get(LAST_UPDATE_USER));
} }
if(object.containsKey(LAST_UPDATE_DATE)) { if (object.containsKey(LAST_UPDATE_DATE)) {
submission.setLastUpdateDate(new Date((Long) object.get(LAST_UPDATE_DATE))); submission.setLastUpdateDate(new Date((Long) object.get(LAST_UPDATE_DATE)));
} }
if(object.containsKey(EXTERNAL_ID)) { if (object.containsKey(EXTERNAL_ID)) {
submission.setExternalId((String) object.get(EXTERNAL_ID)); submission.setExternalId((String) object.get(EXTERNAL_ID));
} }
if(object.containsKey(EXTERNAL_LINK)) { if (object.containsKey(EXTERNAL_LINK)) {
submission.setExternalLink((String) object.get(EXTERNAL_LINK)); submission.setExternalLink((String) object.get(EXTERNAL_LINK));
} }
if(object.containsKey(EXCEPTION)) { if (object.containsKey(EXCEPTION)) {
submission.setExceptionInfo((String) object.get(EXCEPTION)); submission.setExceptionInfo((String) object.get(EXCEPTION));
} }
if(object.containsKey(EXCEPTION_TRACE)) { if (object.containsKey(EXCEPTION_TRACE)) {
submission.setExceptionStackTrace((String) object.get(EXCEPTION_TRACE)); submission.setExceptionStackTrace((String) object.get(EXCEPTION_TRACE));
} }
if(object.containsKey(COUNTERS)) { if (object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS))); submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
} }
if(object.containsKey(FROM_SCHEMA)) { if (object.containsKey(FROM_SCHEMA)) {
submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA))); submission.setFromSchema(restoreSchema((JSONObject) object.get(FROM_SCHEMA)));
} }
if(object.containsKey(TO_SCHEMA)) { if (object.containsKey(TO_SCHEMA)) {
submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA))); submission.setToSchema(restoreSchema((JSONObject) object.get(TO_SCHEMA)));
} }
@ -200,24 +204,20 @@ public void restore(JSONObject json) {
} }
} }
@SuppressWarnings("unchecked")
public Counters restoreCounters(JSONObject object) { public Counters restoreCounters(JSONObject object) {
Set<Map.Entry<String, JSONObject>> groupSet = object.entrySet(); Set<Map.Entry<String, JSONObject>> groupSet = object.entrySet();
Counters counters = new Counters(); Counters counters = new Counters();
for(Map.Entry<String, JSONObject> groupEntry: groupSet) { for (Map.Entry<String, JSONObject> groupEntry : groupSet) {
CounterGroup group = new CounterGroup(groupEntry.getKey()); CounterGroup group = new CounterGroup(groupEntry.getKey());
Set<Map.Entry<String, Long>> counterSet = groupEntry.getValue().entrySet(); Set<Map.Entry<String, Long>> counterSet = groupEntry.getValue().entrySet();
for (Map.Entry<String, Long> counterEntry : counterSet) {
for(Map.Entry<String, Long> counterEntry: counterSet) {
Counter counter = new Counter(counterEntry.getKey(), counterEntry.getValue()); Counter counter = new Counter(counterEntry.getKey(), counterEntry.getValue());
group.addCounter(counter); group.addCounter(counter);
} }
counters.addCounterGroup(group); counters.addCounterGroup(group);
} }
return counters; return counters;
} }
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.json;
import java.util.List;
import org.apache.sqoop.model.MSubmission;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
public class SubmissionsBean extends SubmissionBean {
private static final String SUBMISSIONS = "submissions";
// For "extract"
public SubmissionsBean(MSubmission submission) {
super(submission);
}
public SubmissionsBean(List<MSubmission> submissions) {
super(submissions);
}
// For "restore"
public SubmissionsBean() {
}
@Override
@SuppressWarnings("unchecked")
public JSONObject extract(boolean skipSensitive) {
JSONArray submissionsArray = super.extractSubmissions();
JSONObject submissions = new JSONObject();
submissions.put(SUBMISSIONS, submissionsArray);
return submissions;
}
@Override
public void restore(JSONObject json) {
JSONArray submissionsArray = (JSONArray) json.get(SUBMISSIONS);
restoreSubmissions(submissionsArray);
}
}

View File

@ -36,9 +36,6 @@ public class MSubmission extends MAccountableEntity {
/** /**
* Job id that this submission object belongs. * Job id that this submission object belongs.
* *
* By transitivity of metadata structure you can get also connection and
* connector ids.
*
* This property is required and will be always present. * This property is required and will be always present.
*/ */
private long jobId; private long jobId;

View File

@ -55,13 +55,13 @@ public void testSerialization() throws ParseException {
// Serialize it to JSON object // Serialize it to JSON object
JobBean jobBean = new JobBean(job); JobBean jobBean = new JobBean(job);
JSONObject jobJson = jobBean.extract(false); JSONObject json = jobBean.extract(false);
// "Move" it across network in text form // "Move" it across network in text form
String jobJsonString = jobJson.toJSONString(); String jobJsonString = json.toJSONString();
// Retrieved transferred object // Retrieved transferred object
JSONObject parsedJobJson = (JSONObject)JSONValue.parseWithException(jobJsonString); JSONObject parsedJobJson = (JSONObject)JSONValue.parse(jobJsonString);
JobBean parsedJobBean = new JobBean(); JobBean parsedJobBean = new JobBean();
parsedJobBean.restore(parsedJobJson); parsedJobBean.restore(parsedJobJson);
MJob target = parsedJobBean.getJobs().get(0); MJob target = parsedJobBean.getJobs().get(0);

View File

@ -457,13 +457,13 @@ private Schema getSchema() {
* @return * @return
*/ */
private MSubmission transfer(MSubmission submission) { private MSubmission transfer(MSubmission submission) {
SubmissionBean bean = new SubmissionBean(submission); SubmissionsBean bean = new SubmissionsBean(submission);
JSONObject json = bean.extract(false); JSONObject json = bean.extract(false);
String string = json.toString(); String string = json.toString();
JSONObject retrievedJson = (JSONObject) JSONValue.parse(string); JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
SubmissionBean retrievedBean = new SubmissionBean(); SubmissionsBean retrievedBean = new SubmissionsBean();
retrievedBean.restore(retrievedJson); retrievedBean.restore(retrievedJson);
return retrievedBean.getSubmissions().get(0); return retrievedBean.getSubmissions().get(0);
@ -476,13 +476,13 @@ private MSubmission transfer(MSubmission submission) {
* @return * @return
*/ */
private List<MSubmission> transfer(List<MSubmission> submissions) { private List<MSubmission> transfer(List<MSubmission> submissions) {
SubmissionBean bean = new SubmissionBean(submissions); SubmissionsBean bean = new SubmissionsBean(submissions);
JSONObject json = bean.extract(false); JSONObject json = bean.extract(false);
String string = json.toString(); String string = json.toString();
JSONObject retrievedJson = (JSONObject) JSONValue.parse(string); JSONObject retrievedJson = (JSONObject) JSONValue.parse(string);
SubmissionBean retrievedBean = new SubmissionBean(); SubmissionsBean retrievedBean = new SubmissionsBean();
retrievedBean.restore(retrievedJson); retrievedBean.restore(retrievedJson);
return retrievedBean.getSubmissions(); return retrievedBean.getSubmissions();

View File

@ -22,6 +22,8 @@
/** /**
* *
*/ */
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): why is this called Driver Error since it is used in JobManager?
public enum DriverError implements ErrorCode { public enum DriverError implements ErrorCode {
DRIVER_0001("Invalid submission engine"), DRIVER_0001("Invalid submission engine"),
@ -40,6 +42,7 @@ public enum DriverError implements ErrorCode {
DRIVER_0008("Invalid combination of submission and execution engines"), DRIVER_0008("Invalid combination of submission and execution engines"),
//TODO(https://issues.apache.org/jira/browse/SQOOP-1652): address the submit/start terminology difference
DRIVER_0009("Job has been disabled. Cannot submit this job."), DRIVER_0009("Job has been disabled. Cannot submit this job."),
DRIVER_0010("Link for this job has been disabled. Cannot submit this job."), DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

View File

@ -269,11 +269,11 @@ public synchronized void initialize() {
LOG.info("Submission manager initialized: OK"); LOG.info("Submission manager initialized: OK");
} }
public MSubmission submit(long jobId, HttpEventContext ctx) { public MSubmission start(long jobId, HttpEventContext ctx) {
MSubmission mSubmission = createJobSubmission(ctx, jobId); MSubmission mSubmission = createJobSubmission(ctx, jobId);
JobRequest jobRequest = createJobRequest(jobId, mSubmission); JobRequest jobRequest = createJobRequest(jobId, mSubmission);
// Bootstrap job to execute // Bootstrap job to execute in the configured execution engine
prepareJob(jobRequest); prepareJob(jobRequest);
// Make sure that this job id is not currently running and submit the job // Make sure that this job id is not currently running and submit the job
// only if it's not. // only if it's not.
@ -283,14 +283,17 @@ public MSubmission submit(long jobId, HttpEventContext ctx) {
if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
} }
// TODO(jarcec): We might need to catch all exceptions here to ensure
// that Destroyer will be executed in all cases.
// NOTE: the following is a blocking call // NOTE: the following is a blocking call
boolean success = submissionEngine.submit(jobRequest); boolean success = submissionEngine.submit(jobRequest);
if (!success) { if (!success) {
destroySubmission(jobRequest); // TODO(jarcec): We might need to catch all exceptions here to ensure
// that Destroyer will be executed in all cases.
invokeDestroyerOnJobFailure(jobRequest);
mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
} }
// persist submission record to repository.
// on failure we persist the FAILURE status, on success it is the SUCCESS
// status ( which is the default one)
RepositoryManager.getInstance().getRepository().createSubmission(mSubmission); RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
} }
return mSubmission; return mSubmission;
@ -435,6 +438,7 @@ MJob getJob(long jobId) {
return job; return job;
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void initializeConnector(JobRequest jobRequest, Direction direction) { private void initializeConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction); Initializer initializer = getConnectorInitializer(jobRequest, direction);
InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
@ -444,6 +448,7 @@ private void initializeConnector(JobRequest jobRequest, Direction direction) {
jobRequest.getJobConfig(direction)); jobRequest.getJobConfig(direction));
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) { private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction); Initializer initializer = getConnectorInitializer(jobRequest, direction);
@ -453,6 +458,7 @@ private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction)
jobRequest.getJobConfig(direction)); jobRequest.getJobConfig(direction));
} }
@SuppressWarnings({ "unchecked", "rawtypes" })
private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction); Initializer initializer = getConnectorInitializer(jobRequest, direction);
@ -462,6 +468,7 @@ private void addConnectorInitializerJars(JobRequest jobRequest, Direction direct
jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction))); jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
} }
@SuppressWarnings({ "rawtypes" })
private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo(); Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
Class<? extends Initializer> initializerClass = transferable.getInitializer(); Class<? extends Initializer> initializerClass = transferable.getInitializer();
@ -494,7 +501,8 @@ void prepareJob(JobRequest request) {
* Callback that will be called only if we failed to submit the job to the * Callback that will be called only if we failed to submit the job to the
* remote cluster. * remote cluster.
*/ */
void destroySubmission(JobRequest request) { @SuppressWarnings({ "unchecked", "rawtypes" })
void invokeDestroyerOnJobFailure(JobRequest request) {
Transferable from = request.getFrom(); Transferable from = request.getFrom();
Transferable to = request.getTo(); Transferable to = request.getTo();
@ -520,7 +528,6 @@ void destroySubmission(JobRequest request) {
request.getConnectorContext(Direction.TO), false, request.getSummary() request.getConnectorContext(Direction.TO), false, request.getSummary()
.getToSchema()); .getToSchema());
// destroy submission from connector perspective
fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM), fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
request.getJobConfig(Direction.FROM)); request.getJobConfig(Direction.FROM));
toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO), toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO),
@ -534,7 +541,7 @@ public MSubmission stop(long jobId, HttpEventContext ctx) {
if (mSubmission == null || !mSubmission.getStatus().isRunning()) { if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
+ " is not running"); + " is not running hence cannot stop");
} }
submissionEngine.stop(mSubmission.getExternalId()); submissionEngine.stop(mSubmission.getExternalId());
@ -554,8 +561,7 @@ public MSubmission status(long jobId) {
if (mSubmission == null) { if (mSubmission == null) {
return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
} }
// If the submission isin running state, let's update it
// If the submission is in running state, let's update it
if (mSubmission.getStatus().isRunning()) { if (mSubmission.getStatus().isRunning()) {
update(mSubmission); update(mSubmission);
} }

View File

@ -91,7 +91,7 @@ public class JobRequest {
MutableMapContext toConnectorContext; MutableMapContext toConnectorContext;
/** /**
* Framework context (submission specific configuration) * Driver context (submission specific configuration)
*/ */
MutableMapContext driverContext; MutableMapContext driverContext;

View File

@ -488,6 +488,20 @@ public Object doIt(Connection conn) {
}); });
} }
/**
* {@inheritDoc}
*/
@Override
public MJob findJob(final String name) {
return (MJob) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
return handler.findJob(name, conn);
}
});
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -334,6 +334,15 @@ public abstract class JdbcRepositoryHandler {
*/ */
public abstract MJob findJob(long jobId, Connection conn); public abstract MJob findJob(long jobId, Connection conn);
/**
* Find job with given name in repository.
*
* @param name unique name for the job
* @param conn Connection to the repository
* @return job for a given name that is present in the repository or null if not present
*/
public abstract MJob findJob(String name, Connection conn);
/** /**
* Get all job objects. * Get all job objects.
* *

View File

@ -246,6 +246,14 @@ public abstract class Repository {
*/ */
public abstract MJob findJob(long id); public abstract MJob findJob(long id);
/**
* Find job object with given name.
*
* @param name unique name for the job
* @return job with given name loaded from repository or null if not present
*/
public abstract MJob findJob(String name);
/** /**
* Get all job objects. * Get all job objects.
* *

View File

@ -1586,6 +1586,33 @@ public MJob findJob(long id, Connection conn) {
} }
} }
/**
* {@inheritDoc}
*/
@Override
public MJob findJob(String name, Connection conn) {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_NAME);
stmt.setString(1, name);
List<MJob> jobs = loadJobs(stmt, conn);
if (jobs.size() != 1) {
return null;
}
// Return the first and only one link object
return jobs.get(0);
} catch (SQLException ex) {
logException(ex, name);
throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
} finally {
closeStatements(stmt);
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -435,6 +435,10 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
public static final String STMT_SELECT_JOB_SINGLE_BY_ID = public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?"; STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE_BY_NAME =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_NAME + " = ?";
// DML: Select all jobs for a Connector // DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE = public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
STMT_SELECT_JOB STMT_SELECT_JOB

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.handler; package org.apache.sqoop.handler;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -28,16 +29,22 @@
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.driver.Driver; import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.JobManager;
import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.JobsBean;
import org.apache.sqoop.json.JsonBean; import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.json.SubmissionBean;
import org.apache.sqoop.json.ValidationResultBean; import org.apache.sqoop.json.ValidationResultBean;
import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError; import org.apache.sqoop.server.common.ServerError;
@ -46,41 +53,38 @@
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import org.json.simple.JSONValue; import org.json.simple.JSONValue;
/**
* Job request handler is supporting following resources:
*
* GET /v1/job/:jid
* Return details about one particular job with id :jid or about all of
* them if :jid equals to "all".
*
* POST /v1/job
* Create new job
*
* PUT /v1/job/:jid
* Update job with id :jid.
*
* PUT /v1/job/:jid/enable
* Enable job with id :jid
*
* PUT /v1/job/:jid/disable
* Disable job with id :jid
*
* DELETE /v1/job/:jid
* Remove job with id :jid
*
* Planned resources:
*
* GET /v1/job
* Get brief list of all jobs present in the system. This resource is not yet
* implemented.
*/
public class JobRequestHandler implements RequestHandler { public class JobRequestHandler implements RequestHandler {
private static final Logger LOG = /** enum for representing the actions supported on the job resource*/
Logger.getLogger(JobRequestHandler.class); enum JobAction {
ENABLE("enable"),
DISABLE("disable"),
START("start"),
STOP("stop"),
;
JobAction(String name) {
this.name = name;
}
private static final String ENABLE = "enable"; String name;
private static final String DISABLE = "disable";
public static JobAction fromString(String name) {
if (name != null) {
for (JobAction action : JobAction.values()) {
if (name.equalsIgnoreCase(action.name)) {
return action;
}
}
}
return null;
}
}
private static final Logger LOG = Logger.getLogger(JobRequestHandler.class);
static final String JOBS_PATH = "jobs";
static final String JOB_PATH = "job";
static final String STATUS = "status";
public JobRequestHandler() { public JobRequestHandler() {
LOG.info("JobRequestHandler initialized"); LOG.info("JobRequestHandler initialized");
@ -89,208 +93,306 @@ public JobRequestHandler() {
@Override @Override
public JsonBean handleEvent(RequestContext ctx) { public JsonBean handleEvent(RequestContext ctx) {
switch (ctx.getMethod()) { switch (ctx.getMethod()) {
case GET: case GET:
return getJobs(ctx); if (STATUS.equals(ctx.getLastURLElement())) {
case POST: return getJobStatus(ctx);
return createUpdateJob(ctx, false); }
case PUT: return getJobs(ctx);
if (ctx.getLastURLElement().equals(ENABLE)) { case POST:
return enableJob(ctx, true); return createUpdateJob(ctx, true);
} else if (ctx.getLastURLElement().equals(DISABLE)) { case PUT:
return enableJob(ctx, false); JobAction action = JobAction.fromString(ctx.getLastURLElement());
} else { switch (action) {
return createUpdateJob(ctx, true); case ENABLE:
} return enableJob(ctx, true);
case DELETE: case DISABLE:
return deleteJob(ctx); return enableJob(ctx, false);
case START:
return startJob(ctx);
case STOP:
return stopJob(ctx);
default:
return createUpdateJob(ctx, false);
}
case DELETE:
return deleteJob(ctx);
} }
return null; return null;
} }
/** /**
* Delete job from repository. * Delete job from repository.
* *
* @param ctx Context object * @param ctx
* Context object
* @return Empty bean * @return Empty bean
*/ */
private JsonBean deleteJob(RequestContext ctx) { private JsonBean deleteJob(RequestContext ctx) {
String sxid = ctx.getLastURLElement();
long jid = Long.valueOf(sxid);
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"delete", "job", sxid);
Repository repository = RepositoryManager.getInstance().getRepository(); Repository repository = RepositoryManager.getInstance().getRepository();
repository.deleteJob(jid);
String jobIdentifier = ctx.getLastURLElement();
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
repository.deleteJob(jobId);
return JsonBean.EMPTY_BEAN; return JsonBean.EMPTY_BEAN;
} }
/** /**
* Update or create job in repository. * Update or create job in repository.
* *
* @param ctx Context object * @param ctx
* Context object
* @return Validation bean object * @return Validation bean object
*/ */
private JsonBean createUpdateJob(RequestContext ctx, boolean update) { private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
Repository repository = RepositoryManager.getInstance().getRepository();
String username = ctx.getUserName(); String username = ctx.getUserName();
JobBean bean = new JobBean(); JobBean bean = new JobBean();
try { try {
JSONObject json = JSONObject json = (JSONObject) JSONValue.parse(ctx.getRequest().getReader());
(JSONObject) JSONValue.parse(ctx.getRequest().getReader());
bean.restore(json); bean.restore(json);
} catch (IOException e) { } catch (IOException e) {
throw new SqoopException(ServerError.SERVER_0003, throw new SqoopException(ServerError.SERVER_0003, "Can't read request content", e);
"Can't read request content", e);
} }
// Get job object // Get job object
List<MJob> jobs = bean.getJobs(); List<MJob> jobs = bean.getJobs();
if(jobs.size() != 1) { if (jobs.size() != 1) {
throw new SqoopException(ServerError.SERVER_0003, throw new SqoopException(ServerError.SERVER_0003, "Expected one job but got " + jobs.size());
"Expected one job but got " + jobs.size());
} }
// Job object // Job object
MJob job = jobs.get(0); MJob postedJob = jobs.get(0);
// Verify that user is not trying to spoof us // Verify that user is not trying to spoof us
MFromConfig fromConfig = ConnectorManager.getInstance() MFromConfig fromConfig = ConnectorManager.getInstance()
.getConnectorConfigurable(job.getConnectorId(Direction.FROM)) .getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig();
.getFromConfig();
MToConfig toConfig = ConnectorManager.getInstance() MToConfig toConfig = ConnectorManager.getInstance()
.getConnectorConfigurable(job.getConnectorId(Direction.TO)) .getConnectorConfigurable(postedJob.getConnectorId(Direction.TO)).getToConfig();
.getToConfig();
MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig(); MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
if(!fromConfig.equals(job.getJobConfig(Direction.FROM)) if (!fromConfig.equals(postedJob.getJobConfig(Direction.FROM))
|| !driverConfig.equals(job.getDriverConfig()) || !driverConfig.equals(postedJob.getDriverConfig())
|| !toConfig.equals(job.getJobConfig(Direction.TO))) { || !toConfig.equals(postedJob.getJobConfig(Direction.TO))) {
throw new SqoopException(ServerError.SERVER_0003, throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure");
"Detected incorrect config structure"); }
// if update get the job id from the request URI
if (!create) {
String jobIdentifier = ctx.getLastURLElement();
// support jobName or jobId for the api
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
MJob existingJob = repository.findJob(jobId);
postedJob.setPersistenceId(existingJob.getPersistenceId());
}
} }
// Corresponding connectors for this // Corresponding connectors for this
SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM)); SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO)); postedJob.getConnectorId(Direction.FROM));
SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
postedJob.getConnectorId(Direction.TO));
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) { if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName() throw new SqoopException(ServerError.SERVER_0004, "Connector "
+ " does not support FROM direction."); + fromConnector.getClass().getCanonicalName() + " does not support FROM direction.");
} }
if (!toConnector.getSupportedDirections().contains(Direction.TO)) { if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
throw new SqoopException(ServerError.SERVER_0004, "Connector " + toConnector.getClass().getCanonicalName() throw new SqoopException(ServerError.SERVER_0004, "Connector "
+ " does not support TO direction."); + toConnector.getClass().getCanonicalName() + " does not support TO direction.");
} }
// Validate user supplied data // Validate user supplied data
ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs( ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs(
job.getJobConfig(Direction.FROM).getConfigs(), postedJob.getJobConfig(Direction.FROM).getConfigs(),
fromConnector.getJobConfigurationClass(Direction.FROM) fromConnector.getJobConfigurationClass(Direction.FROM));
);
ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs( ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs(
job.getJobConfig(Direction.TO).getConfigs(), postedJob.getJobConfig(Direction.TO).getConfigs(),
toConnector.getJobConfigurationClass(Direction.TO) toConnector.getJobConfigurationClass(Direction.TO));
); ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob
ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs( .getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass());
job.getDriverConfig().getConfigs(), Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(),
Driver.getInstance().getDriverJobConfigurationClass() toConfigValidator.getStatus(), driverConfigValidator.getStatus());
);
Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(), toConfigValidator.getStatus(), driverConfigValidator.getStatus());
// Return back validations in all cases // Return back validations in all cases
ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, toConfigValidator); ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator,
toConfigValidator);
// If we're good enough let's perform the action // If we're good enough let's perform the action
if(finalStatus.canProceed()) { if (finalStatus.canProceed()) {
if(update) { if (create) {
AuditLoggerManager.getInstance() AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), ctx.getRequest().getRemoteAddr(), "create", "job",
"update", "job", String.valueOf(job.getPersistenceId())); String.valueOf(postedJob.getPersistenceId()));
job.setLastUpdateUser(username); postedJob.setCreationUser(username);
RepositoryManager.getInstance().getRepository().updateJob(job); postedJob.setLastUpdateUser(username);
repository.createJob(postedJob);
validationResultBean.setId(postedJob.getPersistenceId());
} else { } else {
job.setCreationUser(username); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
job.setLastUpdateUser(username); ctx.getRequest().getRemoteAddr(), "update", "job",
RepositoryManager.getInstance().getRepository().createJob(job); String.valueOf(postedJob.getPersistenceId()));
validationResultBean.setId(job.getPersistenceId());
AuditLoggerManager.getInstance() postedJob.setLastUpdateUser(username);
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), repository.updateJob(postedJob);
"create", "job", String.valueOf(job.getPersistenceId()));
} }
} }
return validationResultBean; return validationResultBean;
} }
private JsonBean getJobs(RequestContext ctx) { private JsonBean getJobs(RequestContext ctx) {
String sjid = ctx.getLastURLElement(); String identifier = ctx.getLastURLElement();
JobBean bean; JobBean jobBean;
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"get", "job", sjid);
Locale locale = ctx.getAcceptLanguageHeader(); Locale locale = ctx.getAcceptLanguageHeader();
Repository repository = RepositoryManager.getInstance().getRepository(); Repository repository = RepositoryManager.getInstance().getRepository();
if (sjid.equals(JsonBean.ALL)) { // jobs by connector
if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
List<MJob> jobs = repository.findJobs(); identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
bean = new JobBean(jobs); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier);
// Add associated resources into the bean if (repository.findConnector(identifier) != null) {
for( MJob job : jobs) { long connectorId = repository.findConnector(identifier).getPersistenceId();
long fromConnectorId = job.getConnectorId(Direction.FROM); jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
long toConnectorId = job.getConnectorId(Direction.TO); } else {
if(!bean.hasConnectorConfigBundle(fromConnectorId)) { // this means name nor Id existed
bean.addConnectorConfigBundle(fromConnectorId, throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale)); + " name for jobs given");
}
if(!bean.hasConnectorConfigBundle(toConnectorId)) {
bean.addConnectorConfigBundle(toConnectorId,
ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
}
} }
} else
// all jobs in the system
if (ctx.getPath().contains(JOBS_PATH)
|| (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
jobBean = createJobsBean(repository.findJobs(), locale);
}
// job by Id
else {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "job", identifier);
long jobId = getJobIdFromIdentifier(identifier, repository);
List<MJob> jobList = new ArrayList<MJob>();
// a list of single element
jobList.add(repository.findJob(jobId));
jobBean = createJobBean(jobList, locale);
}
return jobBean;
}
private long getJobIdFromIdentifier(String identifier, Repository repository) {
// support jobName or jobId for the api
// NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames
long jobId;
if (repository.findJob(identifier) != null) {
jobId = repository.findJob(identifier).getPersistenceId();
} else { } else {
long jid = Long.valueOf(sjid); try {
jobId = Long.valueOf(identifier);
MJob job = repository.findJob(jid); } catch (NumberFormatException ex) {
long fromConnectorId = job.getConnectorId(Direction.FROM); // this means name nor Id existed and we want to throw a user friendly message than a number format exception
long toConnectorId = job.getConnectorId(Direction.TO); throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
bean = new JobBean(job); + " requested");
if(!bean.hasConnectorConfigBundle(fromConnectorId)) {
bean.addConnectorConfigBundle(fromConnectorId,
ConnectorManager.getInstance().getResourceBundle(fromConnectorId, locale));
}
if(!bean.hasConnectorConfigBundle(toConnectorId)) {
bean.addConnectorConfigBundle(toConnectorId,
ConnectorManager.getInstance().getResourceBundle(toConnectorId, locale));
} }
} }
return jobId;
}
// set driver config bundle private JobBean createJobBean(List<MJob> jobs, Locale locale) {
bean.setDriverConfigBundle(Driver.getInstance().getBundle(locale)); JobBean jobBean = new JobBean(jobs);
return bean; addJob(jobs, locale, jobBean);
return jobBean;
}
private JobsBean createJobsBean(List<MJob> jobs, Locale locale) {
JobsBean jobsBean = new JobsBean(jobs);
addJob(jobs, locale, jobsBean);
return jobsBean;
}
private void addJob(List<MJob> jobs, Locale locale, JobBean bean) {
// Add associated resources into the bean
for (MJob job : jobs) {
long fromConnectorId = job.getConnectorId(Direction.FROM);
long toConnectorId = job.getConnectorId(Direction.TO);
// replace it only if it does not already exist
if (!bean.hasConnectorConfigBundle(fromConnectorId)) {
bean.addConnectorConfigBundle(fromConnectorId, ConnectorManager.getInstance()
.getResourceBundle(fromConnectorId, locale));
}
if (!bean.hasConnectorConfigBundle(toConnectorId)) {
bean.addConnectorConfigBundle(toConnectorId, ConnectorManager.getInstance()
.getResourceBundle(toConnectorId, locale));
}
}
} }
private JsonBean enableJob(RequestContext ctx, boolean enabled) { private JsonBean enableJob(RequestContext ctx, boolean enabled) {
String[] elements = ctx.getUrlElements();
String sjid = elements[elements.length - 2];
long xid = Long.valueOf(sjid);
Repository repository = RepositoryManager.getInstance().getRepository(); Repository repository = RepositoryManager.getInstance().getRepository();
repository.enableJob(xid, enabled); String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
repository.enableJob(jobId, enabled);
return JsonBean.EMPTY_BEAN; return JsonBean.EMPTY_BEAN;
} }
private JsonBean startJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId));
// TODO(SQOOP-1638): This should be outsourced somewhere more suitable than
// here
if (JobManager.getInstance().getNotificationBaseUrl() == null) {
String url = ctx.getRequest().getRequestURL().toString();
JobManager.getInstance().setNotificationBaseUrl(
url.split("v1")[0] + "/v1/job/status/notification/");
}
MSubmission submission = JobManager.getInstance()
.start(jobId, prepareRequestEventContext(ctx));
return new SubmissionBean(submission);
}
private JsonBean stopJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx));
return new SubmissionBean(submission);
}
private JsonBean getJobStatus(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().status(jobId);
return new SubmissionBean(submission);
}
private HttpEventContext prepareRequestEventContext(RequestContext ctx) {
HttpEventContext httpEventContext = new HttpEventContext();
httpEventContext.setUsername(ctx.getUserName());
return httpEventContext;
}
} }

View File

@ -22,37 +22,19 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.audit.AuditLoggerManager; import org.apache.sqoop.audit.AuditLoggerManager;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.driver.JobManager;
import org.apache.sqoop.json.JsonBean; import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestContext.Method;
import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError; import org.apache.sqoop.server.common.ServerError;
/**
* Submission request handler is supporting following resources:
*
* GET /v1/submission/action/:jid
* Get status of last submission for job with id :jid
*
* POST /v1/submission/action/:jid
* Create new submission for job with id :jid
*
* DELETE /v1/submission/action/:jid
* Stop last submission for job with id :jid
*
* GET /v1/submission/notification/:jid
* Notification endpoint to get job status outside normal interval
*
* Possible additions in the future: /v1/submission/history/* for history.
*/
public class SubmissionRequestHandler implements RequestHandler { public class SubmissionRequestHandler implements RequestHandler {
private static final Logger LOG = private static final Logger LOG = Logger.getLogger(SubmissionRequestHandler.class);
Logger.getLogger(SubmissionRequestHandler.class);
public SubmissionRequestHandler() { public SubmissionRequestHandler() {
LOG.info("SubmissionRequestHandler initialized"); LOG.info("SubmissionRequestHandler initialized");
@ -60,110 +42,45 @@ public SubmissionRequestHandler() {
@Override @Override
public JsonBean handleEvent(RequestContext ctx) { public JsonBean handleEvent(RequestContext ctx) {
String[] urlElements = ctx.getUrlElements();
if (urlElements.length < 2) { // submission only support GET requests
throw new SqoopException(ServerError.SERVER_0003, if (ctx.getMethod() != Method.GET) {
"Invalid URL, too few arguments for this servlet."); throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:"
+ ctx.getMethod());
} }
String identifier = ctx.getLastURLElement();
// Let's check Repository repository = RepositoryManager.getInstance().getRepository();
int length = urlElements.length; // links by connector ordered by updated time
String action = urlElements[length - 2]; // hence the latest submission is on the top
if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
if(action.equals("action")) { identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
return handleActionEvent(ctx, urlElements[length - 1]); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
} ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier);
if (repository.findJob(identifier) != null) {
if(action.equals("notification")) { long jobId = repository.findJob(identifier).getPersistenceId();
return handleNotification(ctx, urlElements[length - 1]); return getSubmissionsForJob(jobId);
} } else {
// this means name nor Id existed
if(action.equals("history")) { throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
return handleHistoryEvent(ctx, urlElements[length - 1]); + " name given");
} }
throw new SqoopException(ServerError.SERVER_0003,
"Do not know what to do.");
}
private JsonBean handleNotification(RequestContext ctx, String sjid) {
LOG.debug("Received notification request for job " + sjid);
JobManager.getInstance().status(Long.parseLong(sjid));
return JsonBean.EMPTY_BEAN;
}
private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
long jid = Long.parseLong(sjid);
String username = ctx.getUserName();
HttpEventContext ectx = new HttpEventContext();
ectx.setUsername(username);
switch (ctx.getMethod()) {
case GET:
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"status", "submission", String.valueOf(jid));
return submissionStatus(jid);
case POST:
// TODO: This should be outsourced somewhere more suitable than here
if(JobManager.getInstance().getNotificationBaseUrl() == null) {
String url = ctx.getRequest().getRequestURL().toString();
JobManager.getInstance().setNotificationBaseUrl(
url.split("v1")[0] + "/v1/submission/notification/");
}
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"submit", "submission", String.valueOf(jid));
return submissionSubmit(jid, ectx);
case DELETE:
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"stop", "submission", String.valueOf(jid));
return submissionStop(jid, ectx);
}
return null;
}
private JsonBean handleHistoryEvent(RequestContext ctx, String sjid) {
AuditLoggerManager.getInstance()
.logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(),
"get", "submission", sjid);
if (sjid.equals("all")) {
return getSubmissions();
} else { } else {
return getSubmissionsForJob(Long.parseLong(sjid)); // all submissions in the system
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "submissions", "all");
return getSubmissions();
} }
} }
private JsonBean submissionStop(long jid, HttpEventContext ctx) {
MSubmission submission = JobManager.getInstance().stop(jid, ctx);
return new SubmissionBean(submission);
}
private JsonBean submissionSubmit(long jid, HttpEventContext ctx) {
MSubmission submission = JobManager.getInstance().submit(jid, ctx);
return new SubmissionBean(submission);
}
private JsonBean submissionStatus(long jid) {
MSubmission submission = JobManager.getInstance().status(jid);
return new SubmissionBean(submission);
}
private JsonBean getSubmissions() { private JsonBean getSubmissions() {
List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissions(); List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
return new SubmissionBean(submissions); .findSubmissions();
return new SubmissionsBean(submissions);
} }
private JsonBean getSubmissionsForJob(long jid) { private JsonBean getSubmissionsForJob(long jid) {
List<MSubmission> submissions = RepositoryManager.getInstance().getRepository().findSubmissionsForJob(jid); List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
return new SubmissionBean(submissions); .findSubmissionsForJob(jid);
return new SubmissionsBean(submissions);
} }
} }

View File

@ -23,7 +23,53 @@
import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.SqoopProtocolServlet; import org.apache.sqoop.server.SqoopProtocolServlet;
/** /**
* Provides operations for job resource
*
* GET /v1/job/{jid}
* Return details about one particular job with id:jid or about all of
* them if jid equals to "all"
*
* POST /v1/job
* Create new job
* POST /v1/job/ with {from-link-id}, {to-link-id} and other job details in the post data
* Create job with from and to link
* PUT /v1/link/ with {from-link-id}, {to-link-id} and other job details in the post data
* Edit/Update job for the from and to link
*
* PUT /v1/job/{jid} and the job details in the post data
* Update job with id jid.
*
* PUT /v1/job/{jid}/enable
* Enable job with id jid
* PUT /v1/job/{jname}s/disable
* Enable job with name jname
*
* PUT /v1/job/{jid}/disable
* Disable job with id jid
* PUT /v1/job/{jname}/disable
* Disable job with name jname
*
* DELETE /v1/job/{jid}
* Remove job with id jid
* DELETE /v1/job/{jname}
* Remove job with name jname
*
* PUT /v1/job/{jid}/submit
* Submit job with id jid to create a submission record
* PUT /v1/job/{jname}/submit
* Submit job with name jname to create a submission record
*
* PUT /v1/job/{jid}/stop
* Abort/Stop last running associated submission with job id jid
* PUT /v1/job/{jname}/stop
* Abort/Stop last running associated submission with job name jname
*
* GET /v1/job/{jid}/status
* get status of running job with job id jid
* GET /v1/job/{jname}/status
* get status of running job with job name jname
* *
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.server.v1;
import org.apache.sqoop.handler.JobRequestHandler;
import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.SqoopProtocolServlet;
/**
* Displays all or jobs per connector in sqoop
*
* GET /v1/jobs
* Return details about every jobs that exists in the sqoop system
* GET /v1/jobs?cname=
* Return details about job(s) for a given connector name {cname}
*/
@SuppressWarnings("serial")
public class JobsServlet extends SqoopProtocolServlet {
private RequestHandler jobRequestHandler;
public JobsServlet() {
jobRequestHandler = new JobRequestHandler();
}
@Override
protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
return jobRequestHandler.handleEvent(ctx);
}
}

View File

@ -24,14 +24,21 @@
import org.apache.sqoop.server.SqoopProtocolServlet; import org.apache.sqoop.server.SqoopProtocolServlet;
/** /**
* Display job submissions in the sqoop system
*
* GET /v1/submissions
* Get all submissions in the system
*
* GET /v1/submissions?jname=
* Get all submissions for the given job ordered by updated time
* *
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class SubmissionServlet extends SqoopProtocolServlet { public class SubmissionsServlet extends SqoopProtocolServlet {
private RequestHandler submissionRequestHandler; private RequestHandler submissionRequestHandler;
public SubmissionServlet() { public SubmissionsServlet() {
submissionRequestHandler = new SubmissionRequestHandler(); submissionRequestHandler = new SubmissionRequestHandler();
} }
@ -39,14 +46,4 @@ public SubmissionServlet() {
protected JsonBean handleGetRequest(RequestContext ctx) throws Exception { protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
return submissionRequestHandler.handleEvent(ctx); return submissionRequestHandler.handleEvent(ctx);
} }
@Override
protected JsonBean handlePostRequest(RequestContext ctx) throws Exception {
return submissionRequestHandler.handleEvent(ctx);
}
@Override
protected JsonBean handleDeleteRequest(RequestContext ctx) throws Exception {
return submissionRequestHandler.handleEvent(ctx);
}
} }

View File

@ -123,16 +123,28 @@ limitations under the License.
<url-pattern>/v1/job/*</url-pattern> <url-pattern>/v1/job/*</url-pattern>
</servlet-mapping> </servlet-mapping>
<!-- Submission servlet --> <!-- Jobs servlet -->
<servlet> <servlet>
<servlet-name>v1.SubmissionServlet</servlet-name> <servlet-name>v1.JobsServlet</servlet-name>
<servlet-class>org.apache.sqoop.server.v1.SubmissionServlet</servlet-class> <servlet-class>org.apache.sqoop.server.v1.JobsServlet</servlet-class>
<load-on-startup>1</load-on-startup> <load-on-startup>1</load-on-startup>
</servlet> </servlet>
<servlet-mapping> <servlet-mapping>
<servlet-name>v1.SubmissionServlet</servlet-name> <servlet-name>v1.JobsServlet</servlet-name>
<url-pattern>/v1/submission/*</url-pattern> <url-pattern>/v1/jobs/*</url-pattern>
</servlet-mapping>
<!-- Submissions servlet -->
<servlet>
<servlet-name>v1.SubmissionsServlet</servlet-name>
<servlet-class>org.apache.sqoop.server.v1.SubmissionsServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>v1.SubmissionsServlet</servlet-name>
<url-pattern>/v1/submissions/*</url-pattern>
</servlet-mapping> </servlet-mapping>

View File

@ -29,10 +29,10 @@
import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Status;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class StatusJobFunction extends SqoopFunction { public class ShowJobStatusFunction extends SqoopFunction {
@SuppressWarnings("static-access") @SuppressWarnings("static-access")
public StatusJobFunction() { public ShowJobStatusFunction() {
this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_JID) this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_JID)
.withDescription(resourceString(Constants.RES_PROMPT_JOB_ID)) .withDescription(resourceString(Constants.RES_PROMPT_JOB_ID))
.withLongOpt(Constants.OPT_JID) .withLongOpt(Constants.OPT_JID)
@ -42,7 +42,7 @@ public StatusJobFunction() {
@Override @Override
public Object executeFunction(CommandLine line, boolean isInteractive) { public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_JID)) { if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.getSubmissionStatus(getLong(line, Constants.OPT_JID)); MSubmission submission = client.getJobStatus(getLong(line, Constants.OPT_JID));
if(submission.getStatus().isFailure() || submission.getStatus().equals(SubmissionStatus.SUCCEEDED)) { if(submission.getStatus().isFailure() || submission.getStatus().equals(SubmissionStatus.SUCCEEDED)) {
SubmissionDisplayer.displayHeader(submission); SubmissionDisplayer.displayHeader(submission);
SubmissionDisplayer.displayFooter(submission); SubmissionDisplayer.displayFooter(submission);

View File

@ -72,12 +72,12 @@ public void finished(MSubmission submission) {
}; };
try { try {
client.startSubmission(getLong(line, Constants.OPT_JID), callback, pollTimeout); client.startJob(getLong(line, Constants.OPT_JID), callback, pollTimeout);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new SqoopException(ShellError.SHELL_0007, e); throw new SqoopException(ShellError.SHELL_0007, e);
} }
} else if (line.hasOption(Constants.OPT_JID)) { } else if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID)); MSubmission submission = client.startJob(getLong(line, Constants.OPT_JID));
if(submission.getStatus().isFailure()) { if(submission.getStatus().isFailure()) {
SubmissionDisplayer.displayFooter(submission); SubmissionDisplayer.displayFooter(submission);
} else { } else {

View File

@ -28,7 +28,7 @@ protected StatusCommand(Shell shell) {
Constants.CMD_STATUS, Constants.CMD_STATUS,
Constants.CMD_STATUS_SC, Constants.CMD_STATUS_SC,
new ImmutableMap.Builder<String, Class<? extends SqoopFunction>>() new ImmutableMap.Builder<String, Class<? extends SqoopFunction>>()
.put(Constants.FN_JOB, StatusJobFunction.class) .put(Constants.FN_JOB, ShowJobStatusFunction.class)
.build() .build()
); );
} }

View File

@ -40,7 +40,7 @@ public StopJobFunction() {
@Override @Override
public Object executeFunction(CommandLine line, boolean isInteractive) { public Object executeFunction(CommandLine line, boolean isInteractive) {
if (line.hasOption(Constants.OPT_JID)) { if (line.hasOption(Constants.OPT_JID)) {
MSubmission submission = client.stopSubmission(getLong(line, Constants.OPT_JID)); MSubmission submission = client.stopJob(getLong(line, Constants.OPT_JID));
if(submission.getStatus().isFailure()) { if(submission.getStatus().isFailure()) {
SubmissionDisplayer.displayFooter(submission); SubmissionDisplayer.displayFooter(submission);
} else { } else {

View File

@ -61,13 +61,13 @@ private Status updateJob(Long jobId, List<String> args, boolean isInteractive) t
ConsoleReader reader = new ConsoleReader(); ConsoleReader reader = new ConsoleReader();
// TODO(SQOOP-1634): using from/to and driver config id, this call can be avoided
MJob job = client.getJob(jobId); MJob job = client.getJob(jobId);
ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle( ResourceBundle fromConnectorBundle = client.getConnectorConfigBundle(
job.getConnectorId(Direction.FROM)); job.getConnectorId(Direction.FROM));
ResourceBundle toConnectorBundle = client.getConnectorConfigBundle( ResourceBundle toConnectorBundle = client.getConnectorConfigBundle(
job.getConnectorId(Direction.TO)); job.getConnectorId(Direction.TO));
ResourceBundle driverConfigBundle = client.getDriverConfigBundle(); ResourceBundle driverConfigBundle = client.getDriverConfigBundle();
Status status = Status.OK; Status status = Status.OK;

View File

@ -242,7 +242,7 @@ protected void saveJob(MJob job) {
* @throws Exception * @throws Exception
*/ */
protected void executeJob(long jid) throws Exception { protected void executeJob(long jid) throws Exception {
getClient().startSubmission(jid, DEFAULT_SUBMISSION_CALLBACKS, 100); getClient().startJob(jid, DEFAULT_SUBMISSION_CALLBACKS, 100);
} }
/** /**

View File

@ -101,14 +101,14 @@ public void testColumns() throws Exception {
fillHdfsToConfig(job, ToFormat.TEXT_FILE); fillHdfsToConfig(job, ToFormat.TEXT_FILE);
saveJob(job); saveJob(job);
MSubmission submission = getClient().startSubmission(job.getPersistenceId()); MSubmission submission = getClient().startJob(job.getPersistenceId());
assertTrue(submission.getStatus().isRunning()); assertTrue(submission.getStatus().isRunning());
// Wait until the job finish - this active waiting will be removed once // Wait until the job finish - this active waiting will be removed once
// Sqoop client API will get blocking support. // Sqoop client API will get blocking support.
do { do {
Thread.sleep(5000); Thread.sleep(5000);
submission = getClient().getSubmissionStatus(job.getPersistenceId()); submission = getClient().getJobStatus(job.getPersistenceId());
} while(submission.getStatus().isRunning()); } while(submission.getStatus().isRunning());
// Assert correct output // Assert correct output

View File

@ -34,7 +34,7 @@
import org.apache.sqoop.json.JobBean; import org.apache.sqoop.json.JobBean;
import org.apache.sqoop.json.JsonBean; import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.LinkBean;
import org.apache.sqoop.json.SubmissionBean; import org.apache.sqoop.json.SubmissionsBean;
import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.tools.ConfiguredTool; import org.apache.sqoop.tools.ConfiguredTool;
@ -110,7 +110,7 @@ private JSONObject dump(boolean skipSensitive) {
result.put(JSONConstants.JOBS, addConnectorName(jobs.extract(skipSensitive))); result.put(JSONConstants.JOBS, addConnectorName(jobs.extract(skipSensitive)));
LOG.info("Dumping Submissions with skipSensitive=" + String.valueOf(skipSensitive)); LOG.info("Dumping Submissions with skipSensitive=" + String.valueOf(skipSensitive));
SubmissionBean submissions = new SubmissionBean(repository.findSubmissions()); SubmissionsBean submissions = new SubmissionsBean(repository.findSubmissions());
result.put(JSONConstants.SUBMISSIONS, submissions.extract(skipSensitive)); result.put(JSONConstants.SUBMISSIONS, submissions.extract(skipSensitive));
result.put(JSONConstants.METADATA, repoMetadata(skipSensitive)); result.put(JSONConstants.METADATA, repoMetadata(skipSensitive));