5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-10 00:13:17 +08:00

SQOOP-2709. Sqoop2: HDFS: Make sure impersonation works on secured cluster.

(Jarcec via Hari)
This commit is contained in:
Hari Shreedharan 2015-12-02 12:14:21 -08:00
parent e2fc4a75e8
commit 408e3d5663
9 changed files with 217 additions and 10 deletions

View File

@ -35,4 +35,6 @@ public final class HdfsConstants extends Constants {
public static final String WORK_DIRECTORY = PREFIX + "work_dir"; public static final String WORK_DIRECTORY = PREFIX + "work_dir";
public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date"; public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens";
} }

View File

@ -37,6 +37,7 @@
import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.Extractor;
@ -60,7 +61,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
@Override @Override
public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) { public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) {
try { try {
UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
HdfsUtils.contextToConfiguration(context.getContext(), conf); HdfsUtils.contextToConfiguration(context.getContext(), conf);
dataWriter = context.getDataWriter(); dataWriter = context.getDataWriter();

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.job.etl.InitializerContext;
@ -62,7 +63,7 @@ public void initialize(final InitializerContext context, final LinkConfiguration
// In case of incremental import, we need to persist the highest last modified // In case of incremental import, we need to persist the highest last modified
try { try {
UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration); FileSystem fs = FileSystem.get(configuration);
Path path = new Path(jobConfig.fromJobConfig.inputDirectory); Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
@ -89,6 +90,10 @@ public Void run() throws Exception {
LOG.info("Maximal age of file is: " + maxModifiedTime); LOG.info("Maximal age of file is: " + maxModifiedTime);
context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime);
} }
// Generate delegation tokens if we are on secured cluster
SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration);
return null; return null;
} }
}); });

View File

@ -34,6 +34,7 @@
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.Loader;
@ -56,8 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
@Override @Override
public void load(final LoaderContext context, final LinkConfiguration linkConfiguration, public void load(final LoaderContext context, final LinkConfiguration linkConfiguration,
final ToJobConfiguration toJobConfig) throws Exception { final ToJobConfiguration toJobConfig) throws Exception {
UserGroupInformation.createProxyUser(context.getUser(), SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HdfsUtils.contextToConfiguration(context.getContext(), conf); HdfsUtils.contextToConfiguration(context.getContext(), conf);

View File

@ -45,6 +45,7 @@
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.Partitioner;
@ -83,8 +84,7 @@ public List<Partition> getPartitions(final PartitionerContext context,
final List<Partition> partitions = new ArrayList<>(); final List<Partition> partitions = new ArrayList<>();
try { try {
UserGroupInformation.createProxyUser(context.getUser(), SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
maxSplitSize = numInputBytes / context.getMaxPartitions(); maxSplitSize = numInputBytes / context.getMaxPartitions();

View File

@ -26,6 +26,7 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.job.etl.DestroyerContext;
@ -50,8 +51,7 @@ public void destroy(final DestroyerContext context, final LinkConfiguration link
final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
try { try {
UserGroupInformation.createProxyUser(context.getUser(), SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration); FileSystem fs = FileSystem.get(configuration);

View File

@ -27,6 +27,7 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.job.etl.InitializerContext;
@ -58,8 +59,7 @@ public void initialize(final InitializerContext context, final LinkConfiguration
// Verification that given HDFS directory either don't exists or is empty // Verification that given HDFS directory either don't exists or is empty
try { try {
UserGroupInformation.createProxyUser(context.getUser(), SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception { public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration); FileSystem fs = FileSystem.get(configuration);
Path path = new Path(jobConfig.toJobConfig.outputDirectory); Path path = new Path(jobConfig.toJobConfig.outputDirectory);
@ -76,6 +76,10 @@ public Void run() throws Exception {
} }
} }
} }
// Generate delegation tokens if we are on secured cluster
SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration);
return null; return null;
} }
}); });

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.security;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.connector.hdfs.HdfsConstants;
import org.apache.sqoop.job.etl.TransferableContext;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion
* (like generating and distributing delegation tokens) won't happen automatically for us under the hood
* and we have to do everything manually.
*/
public class SecurityUtils {
private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
/**
* Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad)
*/
static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException {
return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
}
/**
* Creates proxy user and load's it up with all delegation tokens that we have created ourselves
*/
static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException {
UserGroupInformation proxyUser = createProxyUser(context);
loadDelegationTokensToUGI(proxyUser, context.getContext());
return proxyUser;
}
/**
* Generate delegation tokens for current user (this code is suppose to run in doAs) and store them
* serialized in given mutable context.
*/
static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException {
if(!UserGroupInformation.isSecurityEnabled()) {
LOG.info("Running on unsecured cluster, skipping delegation token generation.");
return;
}
// String representation of all tokens that we will create (most likely single one)
List<String> tokens = new LinkedList<>();
Credentials credentials = new Credentials();
TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
for (Token token : credentials.getAllTokens()) {
LOG.info("Generated token: " + token.toString());
tokens.add(serializeToken(token));
}
// The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here
if(tokens.size() > 0) {
context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " "));
}
}
/**
* Loads delegation tokens that we created and serialize into the mutable context
*/
static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException {
String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS);
if(tokenList == null) {
LOG.info("No delegation tokens found");
return;
}
for(String stringToken: tokenList.split(" ")) {
Token token = deserializeToken(stringToken);
LOG.info("Loaded delegation token: " + token.toString());
ugi.addToken(token);
}
}
/**
* Serialize given token into String.
*
* We'll convert token to byte[] using Writable methods fro I/O and then Base64
* encode the bytes to a human readable string.
*/
static public String serializeToken(Token token) throws IOException {
// Serialize the Token to a byte array
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
token.write(dos);
baos.flush();
return Base64.encodeBase64String(baos.toByteArray());
}
/**
* Deserialize token from given String.
*
* See serializeToken for details how the token is expected to be serialized.
*/
static public Token deserializeToken(String stringToken) throws IOException {
Token token = new Token();
byte[] tokenBytes = Base64.decodeBase64(stringToken);
ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
DataInputStream dis = new DataInputStream(bais);
token.readFields(dis);
return token;
}
private SecurityUtils() {
// Initialization is prohibited
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.hdfs.security;
import org.apache.hadoop.io.Text;
import org.testng.annotations.Test;
import org.apache.hadoop.security.token.Token;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class TestSecurityUtils {
@Test
public void testTokenSerializationDeserialization() throws Exception {
byte[] identifier = "identifier".getBytes();
byte[] password = "password".getBytes();
Text kind = new Text("kind");
Text service = new Text("service");
Token token = new Token(identifier, password, kind, service);
String serializedForm = SecurityUtils.serializeToken(token);
assertNotNull(serializedForm);
Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
assertNotNull(deserializedToken);
assertEquals(identifier, deserializedToken.getIdentifier());
assertEquals(password, deserializedToken.getPassword());
assertEquals(kind.toString(), deserializedToken.getKind().toString());
assertEquals(service.toString(), deserializedToken.getService().toString());
}
}