5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 12:40:34 +08:00

SQOOP-1782: Sqoop2: Get submissions per job

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-01-22 18:59:39 -08:00
parent 3a0c46fd66
commit 6c233b9b51
6 changed files with 112 additions and 86 deletions

View File

@ -41,9 +41,11 @@ public SubmissionResourceRequest(DelegationTokenAuthenticatedURL.Token token){
public SubmissionsBean read(String serverUrl, Long jid) {
String response;
if (jid == null) {
// all submissions
response = super.get(serverUrl + RESOURCE);
} else {
response = super.get(serverUrl + RESOURCE + jid);
// submission per job ( name preferred, we fall back to id)
response = super.get(serverUrl + RESOURCE + "?jname=" + jid);
}
JSONObject jsonObject = JSONUtils.parse(response);
SubmissionsBean submissionBean = new SubmissionsBean();

View File

@ -72,18 +72,7 @@ public JsonBean handleEvent(RequestContext ctx) {
} else {
// NOTE: we now support using unique name as well as the connector id
// NOTE: connectorId is a fallback for older sqoop clients if any, since we want to primarily use unique conenctorNames
boolean cIdNameIdentfierUsed = true;
Long cId = ConnectorManager.getInstance().getConnectorId(cIdentifier);
if (cId == null) {
// support for cId in the query
cIdNameIdentfierUsed = false;
cId = Long.parseLong(cIdentifier);
}
// Check that user is not asking for non existing connector id or non
// existing unique connector name
if (!cIdNameIdentfierUsed && !ConnectorManager.getInstance().getConnectorIds().contains(cId)) {
throw new SqoopException(ServerError.SERVER_0004, "Invalid connector id " + cId);
}
long cId = HandlerUtils.getConnectorIdFromIdentifier(cIdentifier);
connectors = new LinkedList<MConnector>();
configParamBundles = new HashMap<Long, ResourceBundle>();

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.handler;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.server.common.ServerError;
public class HandlerUtils {
public static long getJobIdFromIdentifier(String identifier, Repository repository) {
// support jobName or jobId for the api
// NOTE: jobId is a fallback for older sqoop clients if any, since we want
// to primarily use unique jobNames
long jobId;
if (repository.findJob(identifier) != null) {
jobId = repository.findJob(identifier).getPersistenceId();
} else {
try {
jobId = Long.valueOf(identifier);
} catch (NumberFormatException ex) {
// this means name nor Id existed and we want to throw a user friendly
// message than a number format exception
throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+ " requested");
}
}
return jobId;
}
public static long getLinkIdFromIdentifier(String identifier, Repository repository) {
// support linkName or linkId for the api
// NOTE: linkId is a fallback for older sqoop clients if any, since we want
// to primarily use unique linkNames
long linkId;
if (repository.findLink(identifier) != null) {
linkId = repository.findLink(identifier).getPersistenceId();
} else {
try {
linkId = Long.valueOf(identifier);
} catch (NumberFormatException ex) {
// this means name nor Id existed and we want to throw a user friendly
// message than a number format exception
throw new SqoopException(ServerError.SERVER_0005, "Invalid link: " + identifier
+ " requested");
}
}
return linkId;
}
public static long getConnectorIdFromIdentifier(String identifier) {
long connectorId;
if (ConnectorManager.getInstance().getConnectorId(identifier) != null) {
connectorId = ConnectorManager.getInstance().getConnectorId(identifier);
} else {
try {
connectorId = Long.valueOf(identifier);
} catch (NumberFormatException ex) {
// this means name nor Id existed and we want to throw a user friendly
// message than a number format exception
throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
+ " requested");
}
}
return connectorId;
}
}

View File

@ -135,7 +135,7 @@ private JsonBean deleteJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String jobIdentifier = ctx.getLastURLElement();
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
@ -192,7 +192,7 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
if (!create) {
String jobIdentifier = ctx.getLastURLElement();
// support jobName or jobId for the api
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
MJob existingJob = repository.findJob(jobId);
postedJob.setPersistenceId(existingJob.getPersistenceId());
@ -253,28 +253,21 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
}
private JsonBean getJobs(RequestContext ctx) {
String identifier = ctx.getLastURLElement();
String connectorIdentifier = ctx.getLastURLElement();
JobBean jobBean;
Locale locale = ctx.getAcceptLanguageHeader();
Repository repository = RepositoryManager.getInstance().getRepository();
// jobs by connector
if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
connectorIdentifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier);
if (repository.findConnector(identifier) != null) {
long connectorId = repository.findConnector(identifier).getPersistenceId();
jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
} else {
// this means name nor Id existed
throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
+ " name for jobs given");
}
ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", connectorIdentifier);
long connectorId = HandlerUtils.getConnectorIdFromIdentifier(connectorIdentifier);
jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
} else
// all jobs in the system
if (ctx.getPath().contains(JOBS_PATH)
|| (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) {
|| (ctx.getPath().contains(JOB_PATH) && connectorIdentifier.equals("all"))) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
jobBean = createJobsBean(repository.findJobs(), locale);
@ -282,9 +275,9 @@ private JsonBean getJobs(RequestContext ctx) {
// job by Id
else {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "job", identifier);
ctx.getRequest().getRemoteAddr(), "get", "job", connectorIdentifier);
long jobId = getJobIdFromIdentifier(identifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(connectorIdentifier, repository);
List<MJob> jobList = new ArrayList<MJob>();
// a list of single element
jobList.add(repository.findJob(jobId));
@ -293,24 +286,6 @@ private JsonBean getJobs(RequestContext ctx) {
return jobBean;
}
private long getJobIdFromIdentifier(String identifier, Repository repository) {
// support jobName or jobId for the api
// NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames
long jobId;
if (repository.findJob(identifier) != null) {
jobId = repository.findJob(identifier).getPersistenceId();
} else {
try {
jobId = Long.valueOf(identifier);
} catch (NumberFormatException ex) {
// this means name nor Id existed and we want to throw a user friendly message than a number format exception
throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+ " requested");
}
}
return jobId;
}
private JobBean createJobBean(List<MJob> jobs, Locale locale) {
JobBean jobBean = new JobBean(jobs);
addJob(jobs, locale, jobBean);
@ -344,7 +319,7 @@ private JsonBean enableJob(RequestContext ctx, boolean enabled) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
repository.enableJob(jobId, enabled);
return JsonBean.EMPTY_BEAN;
}
@ -353,7 +328,7 @@ private JsonBean startJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId));
// TODO(SQOOP-1638): This should be outsourced somewhere more suitable than
@ -373,7 +348,7 @@ private JsonBean stopJob(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx));
@ -384,7 +359,7 @@ private JsonBean getJobStatus(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = getJobIdFromIdentifier(jobIdentifier, repository);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().status(jobId);

View File

@ -89,7 +89,7 @@ private JsonBean deleteLink(RequestContext ctx) {
Repository repository = RepositoryManager.getInstance().getRepository();
String linkIdentifier = ctx.getLastURLElement();
// support linkName or linkId for the api
long linkId = getLinkIdFromIdentifier(linkIdentifier, repository);
long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "delete", "link", linkIdentifier);
@ -135,7 +135,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) {
if (!create) {
String linkIdentifier = ctx.getLastURLElement();
// support linkName or linkId for the api
long linkId = getLinkIdFromIdentifier(linkIdentifier, repository);
long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository);
if (postedLink.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
MLink existingLink = repository.findLink(linkId);
postedLink.setPersistenceId(existingLink.getPersistenceId());
@ -206,7 +206,7 @@ private JsonBean getLinks(RequestContext ctx) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "link", identifier);
long linkId = getLinkIdFromIdentifier(identifier, repository);
long linkId = HandlerUtils.getLinkIdFromIdentifier(identifier, repository);
List<MLink> linkList = new ArrayList<MLink>();
// a list of single element
linkList.add(repository.findLink(linkId));
@ -215,24 +215,6 @@ private JsonBean getLinks(RequestContext ctx) {
return linkBean;
}
private long getLinkIdFromIdentifier(String identifier, Repository repository) {
// support linkName or linkId for the api
// NOTE: linkId is a fallback for older sqoop clients if any, since we want to primarily use unique linkNames
long linkId;
if (repository.findLink(identifier) != null) {
linkId = repository.findLink(identifier).getPersistenceId();
} else {
try {
linkId = Long.valueOf(identifier);
} catch (NumberFormatException ex) {
// this means name nor Id existed and we want to throw a user friendly message than a number format exception
throw new SqoopException(ServerError.SERVER_0005, "Invalid link: " + identifier
+ " requested");
}
}
return linkId;
}
private LinkBean createLinkBean(List<MLink> links, Locale locale) {
LinkBean linkBean = new LinkBean(links);
addLink(links, locale, linkBean);
@ -260,7 +242,7 @@ private JsonBean enableLink(RequestContext ctx, boolean enabled) {
Repository repository = RepositoryManager.getInstance().getRepository();
String[] elements = ctx.getUrlElements();
String linkIdentifier = elements[elements.length - 2];
long linkId = getLinkIdFromIdentifier(linkIdentifier, repository);
long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository);
repository.enableLink(linkId, enabled);
return JsonBean.EMPTY_BEAN;
}

View File

@ -48,22 +48,16 @@ public JsonBean handleEvent(RequestContext ctx) {
throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:"
+ ctx.getMethod());
}
String identifier = ctx.getLastURLElement();
String jobIdentifier = ctx.getLastURLElement();
Repository repository = RepositoryManager.getInstance().getRepository();
// links by connector ordered by updated time
// submissions per job are ordered by update time
// hence the latest submission is on the top
if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
if (ctx.getParameterValue(JOB_NAME_QUERY_PARAM) != null) {
jobIdentifier = ctx.getParameterValue(JOB_NAME_QUERY_PARAM);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier);
if (repository.findJob(identifier) != null) {
long jobId = repository.findJob(identifier).getPersistenceId();
ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", jobIdentifier);
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
return getSubmissionsForJob(jobId);
} else {
// this means name nor Id existed
throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier
+ " name given");
}
} else {
// all submissions in the system
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),