diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index 39ee4a31..f06300ae 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -35,4 +35,6 @@ public final class HdfsConstants extends Constants { public static final String WORK_DIRECTORY = PREFIX + "work_dir"; public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date"; + + public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens"; } diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 583acddc..441fe303 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -37,6 +37,7 @@ import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; @@ -60,7 +61,7 @@ public class HdfsExtractor extends Extractor() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { HdfsUtils.contextToConfiguration(context.getContext(), conf); dataWriter = context.getDataWriter(); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index be837ca0..3a0d626c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -27,6 +27,7 @@ import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; @@ -62,7 +63,7 @@ public void initialize(final InitializerContext context, final LinkConfiguration // In case of incremental import, we need to persist the highest last modified try { - UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction() { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); Path path = new Path(jobConfig.fromJobConfig.inputDirectory); @@ -89,6 +90,10 @@ public Void run() throws Exception { LOG.info("Maximal age of file is: " + maxModifiedTime); context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); } + + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration); + return null; } }); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 04acd188..a6551e6d 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -34,6 +34,7 @@ import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; @@ -56,8 +57,7 @@ public class HdfsLoader extends Loader { @Override public void load(final LoaderContext context, final LinkConfiguration linkConfiguration, final ToJobConfiguration toJobConfig) throws Exception { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { Configuration conf = new Configuration(); HdfsUtils.contextToConfiguration(context.getContext(), conf); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index 998b9036..d01e9324 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -45,6 +45,7 @@ import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -83,8 +84,7 @@ public List getPartitions(final PartitionerContext context, final List partitions = new ArrayList<>(); try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); maxSplitSize = numInputBytes / context.getMaxPartitions(); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 2bad23aa..858042c3 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -26,6 +26,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; @@ -50,8 +51,7 @@ public void destroy(final DestroyerContext context, final LinkConfiguration link final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 5856371c..204c9786 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -27,6 +27,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; @@ -58,8 +59,7 @@ public void initialize(final InitializerContext context, final LinkConfiguration // Verification that given HDFS directory either don't exists or is empty try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction() { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); Path path = new Path(jobConfig.toJobConfig.outputDirectory); @@ -76,6 +76,10 @@ public Void run() throws Exception { } } } + + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration); + return null; } }); diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java new file mode 100644 index 00000000..0a429361 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.security; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.connector.hdfs.HdfsConstants; +import org.apache.sqoop.job.etl.TransferableContext; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion + * (like generating and distributing delegation tokens) won't happen automatically for us under the hood + * and we have to do everything manually. + */ +public class SecurityUtils { + + private static final Logger LOG = Logger.getLogger(SecurityUtils.class); + + /** + * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad) + */ + static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException { + return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()); + } + + /** + * Creates proxy user and load's it up with all delegation tokens that we have created ourselves + */ + static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException { + UserGroupInformation proxyUser = createProxyUser(context); + loadDelegationTokensToUGI(proxyUser, context.getContext()); + + return proxyUser; + } + + /** + * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them + * serialized in given mutable context. + */ + static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException { + if(!UserGroupInformation.isSecurityEnabled()) { + LOG.info("Running on unsecured cluster, skipping delegation token generation."); + return; + } + + // String representation of all tokens that we will create (most likely single one) + List tokens = new LinkedList<>(); + + Credentials credentials = new Credentials(); + TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration); + for (Token token : credentials.getAllTokens()) { + LOG.info("Generated token: " + token.toString()); + tokens.add(serializeToken(token)); + } + + // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here + if(tokens.size() > 0) { + context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " ")); + } + } + + /** + * Loads delegation tokens that we created and serialize into the mutable context + */ + static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException { + String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS); + if(tokenList == null) { + LOG.info("No delegation tokens found"); + return; + } + + for(String stringToken: tokenList.split(" ")) { + Token token = deserializeToken(stringToken); + LOG.info("Loaded delegation token: " + token.toString()); + ugi.addToken(token); + } + } + + /** + * Serialize given token into String. + * + * We'll convert token to byte[] using Writable methods fro I/O and then Base64 + * encode the bytes to a human readable string. + */ + static public String serializeToken(Token token) throws IOException { + // Serialize the Token to a byte array + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + token.write(dos); + baos.flush(); + + return Base64.encodeBase64String(baos.toByteArray()); + } + + /** + * Deserialize token from given String. + * + * See serializeToken for details how the token is expected to be serialized. + */ + static public Token deserializeToken(String stringToken) throws IOException { + Token token = new Token(); + byte[] tokenBytes = Base64.decodeBase64(stringToken); + + ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes); + DataInputStream dis = new DataInputStream(bais); + token.readFields(dis); + + return token; + } + + private SecurityUtils() { + // Initialization is prohibited + } +} diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java new file mode 100644 index 00000000..713c7041 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.security; + +import org.apache.hadoop.io.Text; +import org.testng.annotations.Test; +import org.apache.hadoop.security.token.Token; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestSecurityUtils { + + @Test + public void testTokenSerializationDeserialization() throws Exception { + byte[] identifier = "identifier".getBytes(); + byte[] password = "password".getBytes(); + Text kind = new Text("kind"); + Text service = new Text("service"); + + Token token = new Token(identifier, password, kind, service); + String serializedForm = SecurityUtils.serializeToken(token); + assertNotNull(serializedForm); + + Token deserializedToken = SecurityUtils.deserializeToken(serializedForm); + assertNotNull(deserializedToken); + + assertEquals(identifier, deserializedToken.getIdentifier()); + assertEquals(password, deserializedToken.getPassword()); + assertEquals(kind.toString(), deserializedToken.getKind().toString()); + assertEquals(service.toString(), deserializedToken.getService().toString()); + } + +}