5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 20:52:31 +08:00

SQOOP-3300: Implement JDBC and Kerberos tools for HiveServer2 support

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-03-23 14:18:49 +01:00
parent d57f9fb06b
commit d67bb816ce
10 changed files with 588 additions and 4 deletions

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.authentication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
public class KerberosAuthenticator {
private final Configuration configuration;
private final String principal;
private final String keytabLocation;
public KerberosAuthenticator(Configuration configuration, String principal, String keytabLocation) {
this.configuration = configuration;
this.principal = principal;
this.keytabLocation = keytabLocation;
}
public UserGroupInformation authenticate() {
UserGroupInformation.setConfiguration(configuration);
try {
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabLocation);
} catch (IOException e) {
throw new RuntimeException("Kerberos authentication failed!", e);
}
}
public Configuration getConfiguration() {
return configuration;
}
public String getPrincipal() {
return principal;
}
public String getKeytabLocation() {
return keytabLocation;
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
public class DriverManagerJdbcConnectionFactory implements JdbcConnectionFactory {
private final String driverClass;
private final String connectionString;
private final String username;
private final String password;
private final Properties additionalProps;
public DriverManagerJdbcConnectionFactory(String driverClass, String connectionString, String username,
String password, Properties additionalProps) {
this.driverClass = driverClass;
this.connectionString = connectionString;
this.username = username;
this.password = password;
this.additionalProps = additionalProps;
}
public DriverManagerJdbcConnectionFactory(String driverClass, String connectionString, String username, String password) {
this(driverClass, connectionString, username, password, new Properties());
}
@Override
public Connection createConnection() {
loadDriverClass();
Properties connectionProperties = new Properties();
if (username != null) {
connectionProperties.put("user", username);
}
if (password != null) {
connectionProperties.put("password", password);
}
connectionProperties.putAll(additionalProps);
try {
return DriverManager.getConnection(connectionString, connectionProperties);
} catch (SQLException e) {
throw new RuntimeException("Establishing connection failed!", e);
}
}
private void loadDriverClass() {
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load db driver class: " + driverClass);
}
}
public String getDriverClass() {
return driverClass;
}
public String getConnectionString() {
return connectionString;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public Properties getAdditionalProps() {
return new Properties(additionalProps);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db;
import java.sql.Connection;
public interface JdbcConnectionFactory {
Connection createConnection();
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db.decorator;
import org.apache.sqoop.db.JdbcConnectionFactory;
public abstract class JdbcConnectionFactoryDecorator implements JdbcConnectionFactory {
protected final JdbcConnectionFactory decorated;
public JdbcConnectionFactoryDecorator(JdbcConnectionFactory decorated) {
this.decorated = decorated;
}
public JdbcConnectionFactory getDecorated() {
return decorated;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db.decorator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.authentication.KerberosAuthenticator;
import org.apache.sqoop.db.JdbcConnectionFactory;
import java.security.PrivilegedAction;
import java.sql.Connection;
public class KerberizedConnectionFactoryDecorator extends JdbcConnectionFactoryDecorator {
private final KerberosAuthenticator authenticator;
public KerberizedConnectionFactoryDecorator(JdbcConnectionFactory decorated, KerberosAuthenticator authenticator) {
super(decorated);
this.authenticator = authenticator;
}
@Override
public Connection createConnection() {
UserGroupInformation ugi = authenticator.authenticate();
return ugi.doAs(new PrivilegedAction<Connection>() {
@Override
public Connection run() {
return decorated.createConnection();
}
});
}
public KerberosAuthenticator getAuthenticator() {
return authenticator;
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.authentication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
public class TestKerberosAuthenticator {
private static final String KERBEROS_RULE_TEMPLATE = "RULE:[2:$1@$0](.*@%s)s/@%s//";
@ClassRule
public static MiniKdcInfrastructureRule miniKdc = new MiniKdcInfrastructureRule();
private KerberosAuthenticator kerberosAuthenticator;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testAuthenticateReturnsCurrentUserIfKerberosIsNotEnabled() throws Exception {
kerberosAuthenticator = new KerberosAuthenticator(new Configuration(), miniKdc.getTestPrincipal(), miniKdc.getKeytabFilePath());
assertSame(UserGroupInformation.getCurrentUser(), kerberosAuthenticator.authenticate());
}
@Test
public void testAuthenticateReturnsAUserDifferentThanCurrentUserIfKerberosIsEnabled() throws Exception {
kerberosAuthenticator = new KerberosAuthenticator(createKerberosConfiguration(), miniKdc.getTestPrincipal(), miniKdc.getKeytabFilePath());
assertNotSame(UserGroupInformation.getCurrentUser(), kerberosAuthenticator.authenticate());
}
@Test
public void testAuthenticateReturnsAKerberosAuthenticatedUserIfKerberosIsEnabled() throws Exception {
kerberosAuthenticator = new KerberosAuthenticator(createKerberosConfiguration(), miniKdc.getTestPrincipal(), miniKdc.getKeytabFilePath());
UserGroupInformation authenticatedUser = kerberosAuthenticator.authenticate();
assertEquals(KERBEROS, authenticatedUser.getRealAuthenticationMethod());
}
@Test
public void testAuthenticateReturnsAnAuthenticatedUserWithProperUsernameIfKerberosIsEnabled() throws Exception {
kerberosAuthenticator = new KerberosAuthenticator(createKerberosConfiguration(), miniKdc.getTestPrincipal(), miniKdc.getKeytabFilePath());
UserGroupInformation authenticatedUser = kerberosAuthenticator.authenticate();
assertEquals(miniKdc.getTestPrincipal(), authenticatedUser.getUserName());
}
@Test
public void testAuthenticateThrowsIfKerberosIsEnabledAndInvalidKeytabIsProvided() throws Exception {
String invalidKeytabLocation = "invalid_keytab_location";
kerberosAuthenticator = new KerberosAuthenticator(createKerberosConfiguration(), miniKdc.getTestPrincipal(), invalidKeytabLocation);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Kerberos authentication failed!");
kerberosAuthenticator.authenticate();
}
@Test
public void testAuthenticateThrowsIfKerberosIsEnabledAndInvalidPrincipalIsProvided() throws Exception {
String invalidPrincipal = "invalid_principal";
kerberosAuthenticator = new KerberosAuthenticator(createKerberosConfiguration(), invalidPrincipal, miniKdc.getKeytabFilePath());
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Kerberos authentication failed!");
kerberosAuthenticator.authenticate();
}
private Configuration createKerberosConfiguration() {
Configuration configuration = new Configuration();
configuration.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
// Adding a rule for the realm used by the MiniKdc since the default kerberos configuration might contain another realm.
configuration.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTH_TO_LOCAL, buildKerberosRule());
return configuration;
}
private String buildKerberosRule() {
return String.format(KERBEROS_RULE_TEMPLATE, miniKdc.getRealm(), miniKdc.getRealm());
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.sql.Connection;
import static org.junit.Assert.assertFalse;
public class TestDriverManagerJdbcConnectionFactory {
private static final String HSQLDB_DRIVER_CLASS = "org.hsqldb.jdbcDriver";
private static final String POSTGRESQL_DRIVER_CLASS = "org.postgresql.Driver";
private static final String DB_USERNAME = "testuser";
private static final String DB_PASSWORD = "testpassword";
private static HsqldbTestServer hsqldbTestServer;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private DriverManagerJdbcConnectionFactory connectionFactory;
@BeforeClass
public static void beforeClass() throws Exception {
hsqldbTestServer = new HsqldbTestServer();
hsqldbTestServer.start();
hsqldbTestServer.createNewUser(DB_USERNAME, DB_PASSWORD);
}
@AfterClass
public static void afterClass() throws Exception {
hsqldbTestServer.stop();
}
@Test
public void testCreateConnectionThrowsWithInvalidDriverClass() throws Exception {
String invalidDriverClass = "this_is_an_invalid_driver_class";
connectionFactory = new DriverManagerJdbcConnectionFactory(invalidDriverClass, HsqldbTestServer.getUrl(), DB_USERNAME, DB_PASSWORD);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Could not load db driver class: this_is_an_invalid_driver_class");
connectionFactory.createConnection();
}
@Test
public void testCreateConnectionThrowsWithoutRunningDatabase() throws Exception {
String notRunningDb = "jdbc:postgresql://myhost:1234/database";
connectionFactory = new DriverManagerJdbcConnectionFactory(POSTGRESQL_DRIVER_CLASS, notRunningDb, DB_USERNAME, DB_PASSWORD);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Establishing connection failed!");
connectionFactory.createConnection();
}
@Test
public void testCreateConnectionThrowsWithInvalidUsername() throws Exception {
String invalidUsername = "invalid_username";
connectionFactory = new DriverManagerJdbcConnectionFactory(HSQLDB_DRIVER_CLASS, HsqldbTestServer.getUrl(), invalidUsername, DB_PASSWORD);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Establishing connection failed!");
connectionFactory.createConnection();
}
@Test
public void testCreateConnectionThrowsWithInvalidPassword() throws Exception {
String invalidPassword = "invalid_password";
connectionFactory = new DriverManagerJdbcConnectionFactory(HSQLDB_DRIVER_CLASS, HsqldbTestServer.getUrl(), DB_USERNAME, invalidPassword);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Establishing connection failed!");
connectionFactory.createConnection();
}
@Test
public void testCreateConnectionSucceedsWithValidParameters() throws Exception {
connectionFactory = new DriverManagerJdbcConnectionFactory(HSQLDB_DRIVER_CLASS, HsqldbTestServer.getUrl(), DB_USERNAME, DB_PASSWORD);
try (Connection connection = connectionFactory.createConnection()) {
assertFalse(connection.isClosed());
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.db.decorator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.authentication.KerberosAuthenticator;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.sql.Connection;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestKerberizedConnectionFactoryDecorator {
private KerberizedConnectionFactoryDecorator kerberizedConnectionFactoryDecorator;
private KerberosAuthenticator kerberosAuthenticator;
private JdbcConnectionFactory decoratedFactory;
private UserGroupInformation testUser;
private UserGroupInformation capturedCurrentUser;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void before() throws Exception {
decoratedFactory = mock(JdbcConnectionFactory.class);
kerberosAuthenticator = mock(KerberosAuthenticator.class);
testUser = UserGroupInformation.createUserForTesting("testUser", new String[]{});
when(kerberosAuthenticator.authenticate()).thenReturn(testUser);
kerberizedConnectionFactoryDecorator = new KerberizedConnectionFactoryDecorator(decoratedFactory, kerberosAuthenticator);
}
@Test
public void testCreateConnectionIsInvokedAsAuthenticatedUser() throws Exception {
// We want to capture the current user when the createConnection() method is invoked on the decorated factory.
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedCurrentUser = UserGroupInformation.getCurrentUser();
return null;
}
}).when(decoratedFactory).createConnection();
kerberizedConnectionFactoryDecorator.createConnection();
assertEquals(testUser, capturedCurrentUser);
}
@Test
public void testCreateConnectionReturnsConnectionCreatedByDecoratedFactory() throws Exception {
Connection expected = mock(Connection.class);
when(decoratedFactory.createConnection()).thenReturn(expected);
assertSame(expected, kerberizedConnectionFactoryDecorator.createConnection());
}
@Test
public void testCreateConnectionThrowsTheSameExceptionDecoratedFactoryThrows() throws Exception {
RuntimeException expected = mock(RuntimeException.class);
when(decoratedFactory.createConnection()).thenThrow(expected);
expectedException.expect(equalTo(expected));
kerberizedConnectionFactoryDecorator.createConnection();
}
}

View File

@ -193,8 +193,7 @@ private void setupKerberos() {
return;
}
String servicePrincipal = kerberosConfigurationProvider.getTestPrincipal() + "@" + kerberosConfigurationProvider.getRealm();
HBaseKerberosUtils.setPrincipalForTesting(servicePrincipal);
HBaseKerberosUtils.setPrincipalForTesting(kerberosConfigurationProvider.getTestPrincipal());
HBaseKerberosUtils.setKeytabFileForTesting(kerberosConfigurationProvider.getKeytabFilePath());
Configuration configuration = hbaseTestUtil.getConfiguration();
@ -202,7 +201,7 @@ private void setupKerberos() {
UserGroupInformation.setConfiguration(configuration);
configuration.setStrings(REGION_COPROCESSOR_CONF_KEY, TokenProvider.class.getName());
setupKerberosForHdfs(servicePrincipal, configuration);
setupKerberosForHdfs(kerberosConfigurationProvider.getTestPrincipal(), configuration);
}
private void setupKerberosForHdfs(String servicePrincipal, Configuration configuration) {

View File

@ -103,7 +103,7 @@ public void evaluate() throws Throwable {
@Override
public String getTestPrincipal() {
return testPrincipal;
return testPrincipal + "@" + miniKdc.getRealm();
}
@Override