From 6c233b9b513cb13da9256322ae5abf2adbcc0f2c Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Thu, 22 Jan 2015 18:59:39 -0800 Subject: [PATCH] SQOOP-1782: Sqoop2: Get submissions per job (Veena Basavaraj via Abraham Elmahrek) --- .../request/SubmissionResourceRequest.java | 4 +- .../handler/ConnectorRequestHandler.java | 13 +-- .../apache/sqoop/handler/HandlerUtils.java | 84 +++++++++++++++++++ .../sqoop/handler/JobRequestHandler.java | 53 ++++-------- .../sqoop/handler/LinkRequestHandler.java | 26 +----- .../handler/SubmissionRequestHandler.java | 18 ++-- 6 files changed, 112 insertions(+), 86 deletions(-) create mode 100644 server/src/main/java/org/apache/sqoop/handler/HandlerUtils.java diff --git a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java index 7beea08b..2d2d751c 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java +++ b/client/src/main/java/org/apache/sqoop/client/request/SubmissionResourceRequest.java @@ -41,9 +41,11 @@ public SubmissionResourceRequest(DelegationTokenAuthenticatedURL.Token token){ public SubmissionsBean read(String serverUrl, Long jid) { String response; if (jid == null) { + // all submissions response = super.get(serverUrl + RESOURCE); } else { - response = super.get(serverUrl + RESOURCE + jid); + // submission per job ( name preferred, we fall back to id) + response = super.get(serverUrl + RESOURCE + "?jname=" + jid); } JSONObject jsonObject = JSONUtils.parse(response); SubmissionsBean submissionBean = new SubmissionsBean(); diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java index 7903713b..ccf928eb 100644 --- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java @@ -72,18 +72,7 @@ public JsonBean handleEvent(RequestContext ctx) { } else { // NOTE: we now support using unique name as well as the connector id // NOTE: connectorId is a fallback for older sqoop clients if any, since we want to primarily use unique conenctorNames - boolean cIdNameIdentfierUsed = true; - Long cId = ConnectorManager.getInstance().getConnectorId(cIdentifier); - if (cId == null) { - // support for cId in the query - cIdNameIdentfierUsed = false; - cId = Long.parseLong(cIdentifier); - } - // Check that user is not asking for non existing connector id or non - // existing unique connector name - if (!cIdNameIdentfierUsed && !ConnectorManager.getInstance().getConnectorIds().contains(cId)) { - throw new SqoopException(ServerError.SERVER_0004, "Invalid connector id " + cId); - } + long cId = HandlerUtils.getConnectorIdFromIdentifier(cIdentifier); connectors = new LinkedList(); configParamBundles = new HashMap(); diff --git a/server/src/main/java/org/apache/sqoop/handler/HandlerUtils.java b/server/src/main/java/org/apache/sqoop/handler/HandlerUtils.java new file mode 100644 index 00000000..93ff60b0 --- /dev/null +++ b/server/src/main/java/org/apache/sqoop/handler/HandlerUtils.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.handler; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ConnectorManager; +import org.apache.sqoop.repository.Repository; +import org.apache.sqoop.server.common.ServerError; + +public class HandlerUtils { + + public static long getJobIdFromIdentifier(String identifier, Repository repository) { + // support jobName or jobId for the api + // NOTE: jobId is a fallback for older sqoop clients if any, since we want + // to primarily use unique jobNames + long jobId; + if (repository.findJob(identifier) != null) { + jobId = repository.findJob(identifier).getPersistenceId(); + } else { + try { + jobId = Long.valueOf(identifier); + } catch (NumberFormatException ex) { + // this means name nor Id existed and we want to throw a user friendly + // message than a number format exception + throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier + + " requested"); + } + } + return jobId; + } + + public static long getLinkIdFromIdentifier(String identifier, Repository repository) { + // support linkName or linkId for the api + // NOTE: linkId is a fallback for older sqoop clients if any, since we want + // to primarily use unique linkNames + long linkId; + if (repository.findLink(identifier) != null) { + linkId = repository.findLink(identifier).getPersistenceId(); + } else { + try { + linkId = Long.valueOf(identifier); + } catch (NumberFormatException ex) { + // this means name nor Id existed and we want to throw a user friendly + // message than a number format exception + throw new SqoopException(ServerError.SERVER_0005, "Invalid link: " + identifier + + " requested"); + } + } + return linkId; + } + + public static long getConnectorIdFromIdentifier(String identifier) { + long connectorId; + if (ConnectorManager.getInstance().getConnectorId(identifier) != null) { + connectorId = ConnectorManager.getInstance().getConnectorId(identifier); + } else { + try { + connectorId = Long.valueOf(identifier); + } catch (NumberFormatException ex) { + // this means name nor Id existed and we want to throw a user friendly + // message than a number format exception + throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier + + " requested"); + } + } + return connectorId; + } + +} diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index c9f3dd78..f4482cee 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -135,7 +135,7 @@ private JsonBean deleteJob(RequestContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); String jobIdentifier = ctx.getLastURLElement(); - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier); @@ -192,7 +192,7 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) { if (!create) { String jobIdentifier = ctx.getLastURLElement(); // support jobName or jobId for the api - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) { MJob existingJob = repository.findJob(jobId); postedJob.setPersistenceId(existingJob.getPersistenceId()); @@ -253,28 +253,21 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) { } private JsonBean getJobs(RequestContext ctx) { - String identifier = ctx.getLastURLElement(); + String connectorIdentifier = ctx.getLastURLElement(); JobBean jobBean; Locale locale = ctx.getAcceptLanguageHeader(); Repository repository = RepositoryManager.getInstance().getRepository(); - // jobs by connector if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) { - identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM); + connectorIdentifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), - ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", identifier); - if (repository.findConnector(identifier) != null) { - long connectorId = repository.findConnector(identifier).getPersistenceId(); - jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale); - } else { - // this means name nor Id existed - throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier - + " name for jobs given"); - } + ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", connectorIdentifier); + long connectorId = HandlerUtils.getConnectorIdFromIdentifier(connectorIdentifier); + jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale); } else // all jobs in the system if (ctx.getPath().contains(JOBS_PATH) - || (ctx.getPath().contains(JOB_PATH) && identifier.equals("all"))) { + || (ctx.getPath().contains(JOB_PATH) && connectorIdentifier.equals("all"))) { AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "get", "jobs", "all"); jobBean = createJobsBean(repository.findJobs(), locale); @@ -282,9 +275,9 @@ private JsonBean getJobs(RequestContext ctx) { // job by Id else { AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), - ctx.getRequest().getRemoteAddr(), "get", "job", identifier); + ctx.getRequest().getRemoteAddr(), "get", "job", connectorIdentifier); - long jobId = getJobIdFromIdentifier(identifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(connectorIdentifier, repository); List jobList = new ArrayList(); // a list of single element jobList.add(repository.findJob(jobId)); @@ -293,24 +286,6 @@ private JsonBean getJobs(RequestContext ctx) { return jobBean; } - private long getJobIdFromIdentifier(String identifier, Repository repository) { - // support jobName or jobId for the api - // NOTE: jobId is a fallback for older sqoop clients if any, since we want to primarily use unique jobNames - long jobId; - if (repository.findJob(identifier) != null) { - jobId = repository.findJob(identifier).getPersistenceId(); - } else { - try { - jobId = Long.valueOf(identifier); - } catch (NumberFormatException ex) { - // this means name nor Id existed and we want to throw a user friendly message than a number format exception - throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier - + " requested"); - } - } - return jobId; - } - private JobBean createJobBean(List jobs, Locale locale) { JobBean jobBean = new JobBean(jobs); addJob(jobs, locale, jobBean); @@ -344,7 +319,7 @@ private JsonBean enableJob(RequestContext ctx, boolean enabled) { Repository repository = RepositoryManager.getInstance().getRepository(); String[] elements = ctx.getUrlElements(); String jobIdentifier = elements[elements.length - 2]; - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); repository.enableJob(jobId, enabled); return JsonBean.EMPTY_BEAN; } @@ -353,7 +328,7 @@ private JsonBean startJob(RequestContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); String[] elements = ctx.getUrlElements(); String jobIdentifier = elements[elements.length - 2]; - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId)); // TODO(SQOOP-1638): This should be outsourced somewhere more suitable than @@ -373,7 +348,7 @@ private JsonBean stopJob(RequestContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); String[] elements = ctx.getUrlElements(); String jobIdentifier = elements[elements.length - 2]; - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId)); MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx)); @@ -384,7 +359,7 @@ private JsonBean getJobStatus(RequestContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); String[] elements = ctx.getUrlElements(); String jobIdentifier = elements[elements.length - 2]; - long jobId = getJobIdFromIdentifier(jobIdentifier, repository); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId)); MSubmission submission = JobManager.getInstance().status(jobId); diff --git a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java index 2e122175..74fa321a 100644 --- a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java @@ -89,7 +89,7 @@ private JsonBean deleteLink(RequestContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); String linkIdentifier = ctx.getLastURLElement(); // support linkName or linkId for the api - long linkId = getLinkIdFromIdentifier(linkIdentifier, repository); + long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "delete", "link", linkIdentifier); @@ -135,7 +135,7 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) { if (!create) { String linkIdentifier = ctx.getLastURLElement(); // support linkName or linkId for the api - long linkId = getLinkIdFromIdentifier(linkIdentifier, repository); + long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository); if (postedLink.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) { MLink existingLink = repository.findLink(linkId); postedLink.setPersistenceId(existingLink.getPersistenceId()); @@ -206,7 +206,7 @@ private JsonBean getLinks(RequestContext ctx) { AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "get", "link", identifier); - long linkId = getLinkIdFromIdentifier(identifier, repository); + long linkId = HandlerUtils.getLinkIdFromIdentifier(identifier, repository); List linkList = new ArrayList(); // a list of single element linkList.add(repository.findLink(linkId)); @@ -215,24 +215,6 @@ private JsonBean getLinks(RequestContext ctx) { return linkBean; } - private long getLinkIdFromIdentifier(String identifier, Repository repository) { - // support linkName or linkId for the api - // NOTE: linkId is a fallback for older sqoop clients if any, since we want to primarily use unique linkNames - long linkId; - if (repository.findLink(identifier) != null) { - linkId = repository.findLink(identifier).getPersistenceId(); - } else { - try { - linkId = Long.valueOf(identifier); - } catch (NumberFormatException ex) { - // this means name nor Id existed and we want to throw a user friendly message than a number format exception - throw new SqoopException(ServerError.SERVER_0005, "Invalid link: " + identifier - + " requested"); - } - } - return linkId; - } - private LinkBean createLinkBean(List links, Locale locale) { LinkBean linkBean = new LinkBean(links); addLink(links, locale, linkBean); @@ -260,7 +242,7 @@ private JsonBean enableLink(RequestContext ctx, boolean enabled) { Repository repository = RepositoryManager.getInstance().getRepository(); String[] elements = ctx.getUrlElements(); String linkIdentifier = elements[elements.length - 2]; - long linkId = getLinkIdFromIdentifier(linkIdentifier, repository); + long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository); repository.enableLink(linkId, enabled); return JsonBean.EMPTY_BEAN; } diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java index cfbb5245..a0b29c86 100644 --- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java @@ -48,22 +48,16 @@ public JsonBean handleEvent(RequestContext ctx) { throw new SqoopException(ServerError.SERVER_0002, "Unsupported HTTP method for connector:" + ctx.getMethod()); } - String identifier = ctx.getLastURLElement(); + String jobIdentifier = ctx.getLastURLElement(); Repository repository = RepositoryManager.getInstance().getRepository(); - // links by connector ordered by updated time + // submissions per job are ordered by update time // hence the latest submission is on the top - if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) { - identifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM); + if (ctx.getParameterValue(JOB_NAME_QUERY_PARAM) != null) { + jobIdentifier = ctx.getParameterValue(JOB_NAME_QUERY_PARAM); AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), - ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", identifier); - if (repository.findJob(identifier) != null) { - long jobId = repository.findJob(identifier).getPersistenceId(); + ctx.getRequest().getRemoteAddr(), "get", "submissionsByJob", jobIdentifier); + long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository); return getSubmissionsForJob(jobId); - } else { - // this means name nor Id existed - throw new SqoopException(ServerError.SERVER_0005, "Invalid job: " + identifier - + " name given"); - } } else { // all submissions in the system AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),