5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-10 16:41:58 +08:00

SQOOP-1980: Sqoop2: Rule based rest API protection

(Richard Zhou via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-02-02 19:12:04 -08:00
parent f7efa38005
commit fc32358abc
5 changed files with 283 additions and 4 deletions

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.security.Authorization;
import com.beust.jcommander.internal.Lists;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.web.HttpUserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.*;
import org.apache.sqoop.security.AuthorizationHandler;
import org.apache.sqoop.security.AuthorizationManager;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class AuthorizationEngine {
private static final Logger LOG = Logger.getLogger(AuthorizationEngine.class);
/**
* Role type
*/
public enum RoleType {
USER, GROUP, ROLE
}
/**
* Resource type
*/
public enum ResourceType {
CONNECTOR, LINK, JOB
}
/**
* Action type in Privilege
*/
public enum PrivilegeActionType {
VIEW, USE, CREATE, UPDATE, DELETE, ENABlE_DISABLE, START_STOP, STATUS
}
/**
* Filter resources, get all valid resources from all resources
*/
public static <T extends MPersistableEntity> List<T> filterResource(final ResourceType type, List<T> resources) throws SqoopException {
Collection<T> collection = Collections2.filter(resources, new Predicate<T>() {
@Override
public boolean apply(T input) {
try {
String name = String.valueOf(input.getPersistenceId());
checkPrivilege(getPrivilege(type, name, PrivilegeActionType.VIEW));
// add valid resource
return true;
} catch (Exception e) {
//do not add into result if invalid resource
return false;
}
}
});
return Lists.newArrayList(collection);
}
/**
* Link related function
*/
public static void createLink(String connectorId) throws SqoopException {
MPrivilege privilege1 = getPrivilege(ResourceType.CONNECTOR, connectorId, PrivilegeActionType.USE);
// resource id is empty, means it is a global privilege
MPrivilege privilege2 = getPrivilege(ResourceType.LINK, StringUtils.EMPTY, PrivilegeActionType.CREATE);
checkPrivilege(privilege1, privilege2);
}
public static void updateLink(String connectorId, String linkId) throws SqoopException {
MPrivilege privilege1 = getPrivilege(ResourceType.CONNECTOR, connectorId, PrivilegeActionType.USE);
MPrivilege privilege2 = getPrivilege(ResourceType.LINK, linkId, PrivilegeActionType.UPDATE);
checkPrivilege(privilege1, privilege2);
}
public static void deleteLink(String linkId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.LINK, linkId, PrivilegeActionType.DELETE));
}
public static void enableDisableLink(String linkId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.LINK, linkId, PrivilegeActionType.ENABlE_DISABLE));
}
/**
* Job related function
*/
public static void createJob(String linkId1, String linkId2) throws SqoopException {
MPrivilege privilege1 = getPrivilege(ResourceType.LINK, linkId1, PrivilegeActionType.USE);
MPrivilege privilege2 = getPrivilege(ResourceType.LINK, linkId2, PrivilegeActionType.USE);
// resource id is empty, means it is a global privilege
MPrivilege privilege3 = getPrivilege(ResourceType.JOB, StringUtils.EMPTY, PrivilegeActionType.CREATE);
checkPrivilege(privilege1, privilege2, privilege3);
}
public static void updateJob(String linkId1, String linkId2, String jobId) throws SqoopException {
MPrivilege privilege1 = getPrivilege(ResourceType.LINK, linkId1, PrivilegeActionType.USE);
MPrivilege privilege2 = getPrivilege(ResourceType.LINK, linkId2, PrivilegeActionType.USE);
MPrivilege privilege3 = getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.UPDATE);
checkPrivilege(privilege1, privilege2, privilege3);
}
public static void deleteJob(String jobId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.DELETE));
}
public static void enableDisableJob(String jobId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.ENABlE_DISABLE));
}
public static void startJob(String jobId) throws SqoopException {
;
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.START_STOP));
}
public static void stopJob(String jobId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.START_STOP));
}
public static void statusJob(String jobId) throws SqoopException {
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.STATUS));
}
/**
* Filter resources, get all valid resources from all resources
*/
public static List<MSubmission> filterSubmission(List<MSubmission> submissions) throws SqoopException {
Collection<MSubmission> collection = Collections2.filter(submissions, new Predicate<MSubmission>() {
@Override
public boolean apply(MSubmission input) {
try {
String jobId = String.valueOf(input.getJobId());
checkPrivilege(getPrivilege(ResourceType.JOB, jobId, PrivilegeActionType.STATUS));
// add valid submission
return true;
} catch (Exception e) {
//do not add into result if invalid submission
return false;
}
}
});
return Lists.newArrayList(collection);
}
/**
* Help function
*/
private static MPrivilege getPrivilege(ResourceType resourceType,
String resourceId,
PrivilegeActionType privilegeActionType) {
// Do a transfer. "all" means global instances in Restful API, whilst empty
// string means global instances in role based access controller.
resourceId = (resourceId == null || resourceId.equals("all")) ? StringUtils.EMPTY : resourceId;
return new MPrivilege(new MResource(resourceId, resourceType.name()), privilegeActionType.name());
}
private static void checkPrivilege(MPrivilege... privileges) {
AuthorizationHandler handler = AuthorizationManager.getAuthorizationHandler();
UserGroupInformation user = HttpUserGroupInformation.get();
MPrincipal principal = new MPrincipal(user.getUserName(), RoleType.USER.name());
handler.checkPrivileges(principal, Arrays.asList(privileges));
}
}

View File

@ -32,6 +32,7 @@
import org.apache.sqoop.json.ConnectorsBean;
import org.apache.sqoop.json.JsonBean;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.security.Authorization.AuthorizationEngine;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestContext.Method;
import org.apache.sqoop.server.RequestHandler;
@ -67,6 +68,10 @@ public JsonBean handleEvent(RequestContext ctx) {
configParamBundles = ConnectorManager.getInstance().getResourceBundles(locale);
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "connectors", "all");
// Authorization check
connectors = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.CONNECTOR, connectors);
return new ConnectorsBean(connectors, configParamBundles);
} else {
@ -82,6 +87,10 @@ public JsonBean handleEvent(RequestContext ctx) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "connector", String.valueOf(cIdentifier));
// Authorization check
connectors = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.CONNECTOR, connectors);
return new ConnectorBean(connectors, configParamBundles);
}
}

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
@ -46,6 +47,7 @@
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.security.Authorization.AuthorizationEngine;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.error.code.ServerError;
@ -137,6 +139,9 @@ private JsonBean deleteJob(RequestContext ctx) {
String jobIdentifier = ctx.getLastURLElement();
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
// Authorization check
AuthorizationEngine.deleteJob(String.valueOf(jobId));
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
repository.deleteJob(jobId);
@ -175,6 +180,16 @@ private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
// Job object
MJob postedJob = jobs.get(0);
// Authorization check
if (create) {
AuthorizationEngine.createJob(String.valueOf(postedJob.getFromConnectorId()),
String.valueOf(postedJob.getToConnectorId()));
} else {
AuthorizationEngine.updateJob(String.valueOf(postedJob.getFromConnectorId()),
String.valueOf(postedJob.getToConnectorId()),
String.valueOf(postedJob.getPersistenceId()));
}
// Verify that user is not trying to spoof us
MFromConfig fromConfig = ConnectorManager.getInstance()
.getConnectorConfigurable(postedJob.getConnectorId(Direction.FROM)).getFromConfig();
@ -263,14 +278,24 @@ private JsonBean getJobs(RequestContext ctx) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", connectorIdentifier);
long connectorId = HandlerUtils.getConnectorIdFromIdentifier(connectorIdentifier);
jobBean = createJobsBean(repository.findJobsForConnector(connectorId), locale);
List<MJob> jobList = repository.findJobsForConnector(connectorId);
// Authorization check
jobList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.JOB, jobList);
jobBean = createJobsBean(jobList, locale);
} else
// all jobs in the system
if (ctx.getPath().contains(JOBS_PATH)
|| (ctx.getPath().contains(JOB_PATH) && connectorIdentifier.equals("all"))) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
jobBean = createJobsBean(repository.findJobs(), locale);
List<MJob> jobList = repository.findJobs();
// Authorization check
jobList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.JOB, jobList);
jobBean = createJobsBean(jobList, locale);
}
// job by Id
else {
@ -281,6 +306,10 @@ private JsonBean getJobs(RequestContext ctx) {
List<MJob> jobList = new ArrayList<MJob>();
// a list of single element
jobList.add(repository.findJob(jobId));
// Authorization check
jobList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.JOB, jobList);
jobBean = createJobBean(jobList, locale);
}
return jobBean;
@ -320,6 +349,10 @@ private JsonBean enableJob(RequestContext ctx, boolean enabled) {
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
// Authorization check
AuthorizationEngine.enableDisableJob(String.valueOf(jobId));
repository.enableJob(jobId, enabled);
return JsonBean.EMPTY_BEAN;
}
@ -329,6 +362,10 @@ private JsonBean startJob(RequestContext ctx) {
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
// Authorization check
AuthorizationEngine.startJob(String.valueOf(jobId));
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "submit", "job", String.valueOf(jobId));
// TODO(SQOOP-1638): This should be outsourced somewhere more suitable than
@ -349,6 +386,10 @@ private JsonBean stopJob(RequestContext ctx) {
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
// Authorization check
AuthorizationEngine.stopJob(String.valueOf(jobId));
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().stop(jobId, prepareRequestEventContext(ctx));
@ -360,9 +401,14 @@ private JsonBean getJobStatus(RequestContext ctx) {
String[] elements = ctx.getUrlElements();
String jobIdentifier = elements[elements.length - 2];
long jobId = HandlerUtils.getJobIdFromIdentifier(jobIdentifier, repository);
// Authorization check
AuthorizationEngine.statusJob(String.valueOf(jobId));
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId));
MSubmission submission = JobManager.getInstance().status(jobId);
return new SubmissionBean(submission);
}

View File

@ -38,6 +38,7 @@
import org.apache.sqoop.model.MPersistableEntity;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.security.Authorization.AuthorizationEngine;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.error.code.ServerError;
@ -91,6 +92,9 @@ private JsonBean deleteLink(RequestContext ctx) {
// support linkName or linkId for the api
long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository);
// Authorization check
AuthorizationEngine.deleteLink(String.valueOf(linkId));
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "delete", "link", linkIdentifier);
@ -126,6 +130,15 @@ private JsonBean createUpdateLink(RequestContext ctx, boolean create) {
}
MLink postedLink = links.get(0);
// Authorization check
if (create) {
AuthorizationEngine.createLink(String.valueOf(postedLink.getConnectorId()));
} else {
AuthorizationEngine.updateLink(String.valueOf(postedLink.getConnectorId()),
String.valueOf(postedLink.getPersistenceId()));
}
MLinkConfig linkConfig = ConnectorManager.getInstance()
.getConnectorConfigurable(postedLink.getConnectorId()).getLinkConfig();
if (!linkConfig.equals(postedLink.getConnectorLinkConfig())) {
@ -187,7 +200,12 @@ private JsonBean getLinks(RequestContext ctx) {
ctx.getRequest().getRemoteAddr(), "get", "linksByConnector", identifier);
if (repository.findConnector(identifier) != null) {
long connectorId = repository.findConnector(identifier).getPersistenceId();
linkBean = createLinksBean(repository.findLinksForConnector(connectorId), locale);
List<MLink> linkList = repository.findLinksForConnector(connectorId);
// Authorization check
linkList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.LINK, linkList);
linkBean = createLinksBean(linkList, locale);
} else {
// this means name nor Id existed
throw new SqoopException(ServerError.SERVER_0005, "Invalid connector: " + identifier
@ -199,7 +217,12 @@ private JsonBean getLinks(RequestContext ctx) {
|| (ctx.getPath().contains(LINK_PATH) && identifier.equals("all"))) {
AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
ctx.getRequest().getRemoteAddr(), "get", "links", "all");
linkBean = createLinksBean(repository.findLinks(), locale);
List<MLink> linkList = repository.findLinks();
// Authorization check
linkList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.LINK, linkList);
linkBean = createLinksBean(linkList, locale);
}
// link by Id
else {
@ -210,6 +233,10 @@ private JsonBean getLinks(RequestContext ctx) {
List<MLink> linkList = new ArrayList<MLink>();
// a list of single element
linkList.add(repository.findLink(linkId));
// Authorization check
linkList = AuthorizationEngine.filterResource(AuthorizationEngine.ResourceType.LINK, linkList);
linkBean = createLinkBean(linkList, locale);
}
return linkBean;
@ -243,6 +270,10 @@ private JsonBean enableLink(RequestContext ctx, boolean enabled) {
String[] elements = ctx.getUrlElements();
String linkIdentifier = elements[elements.length - 2];
long linkId = HandlerUtils.getLinkIdFromIdentifier(linkIdentifier, repository);
// Authorization check
AuthorizationEngine.enableDisableLink(String.valueOf(linkId));
repository.enableLink(linkId, enabled);
return JsonBean.EMPTY_BEAN;
}

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.security.Authorization.AuthorizationEngine;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestContext.Method;
import org.apache.sqoop.server.RequestHandler;
@ -69,12 +70,20 @@ public JsonBean handleEvent(RequestContext ctx) {
private JsonBean getSubmissions() {
List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
.findSubmissions();
//Authorization check
submissions = AuthorizationEngine.filterSubmission(submissions);
return new SubmissionsBean(submissions);
}
private JsonBean getSubmissionsForJob(long jid) {
//Authorization check
AuthorizationEngine.statusJob(String.valueOf(jid));
List<MSubmission> submissions = RepositoryManager.getInstance().getRepository()
.findSubmissionsForJob(jid);
return new SubmissionsBean(submissions);
}
}