mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 00:43:42 +08:00
SQOOP-2900: Support encrypting map inputs with sensitive fields
(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
parent
9817be56b1
commit
1c6a8f4d6e
@ -115,12 +115,16 @@ static JSONObject extractConfig(MConfig mConfig, MConfigType type, boolean skipS
|
||||
StringUtils.join(((MEnumInput)mInput).getValues(), ","));
|
||||
}
|
||||
|
||||
// Map specific serialization
|
||||
if(mInput.getType() == MInputType.MAP) {
|
||||
input.put(ConfigInputConstants.CONFIG_INPUT_SENSITIVE_KEY_PATTERN, ((MMapInput)mInput).getSensitiveKeyPattern());
|
||||
}
|
||||
|
||||
// Serialize value if is there
|
||||
// Skip if sensitive
|
||||
if (!mInput.isEmpty() && !(skipSensitive && mInput.isSensitive())) {
|
||||
if (mInput.getType() == MInputType.MAP) {
|
||||
MMapInput mMapInput = (MMapInput)mInput;
|
||||
input.put(ConfigInputConstants.CONFIG_INPUT_SENSITIVE_KEY_PATTERN, mMapInput.getSensitiveKeyPattern());
|
||||
if (skipSensitive) {
|
||||
input.put(ConfigInputConstants.CONFIG_INPUT_VALUE, mMapInput.getNonsenstiveValue());
|
||||
} else {
|
||||
|
@ -42,9 +42,11 @@
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.common.SupportedDirections;
|
||||
import org.apache.sqoop.connector.ConnectorManager;
|
||||
import org.apache.sqoop.connector.spi.SqoopConnector;
|
||||
import org.apache.sqoop.core.SqoopConfiguration;
|
||||
import org.apache.sqoop.driver.Driver;
|
||||
import org.apache.sqoop.error.code.CommonRepositoryError;
|
||||
import org.apache.sqoop.model.ConfigUtils;
|
||||
import org.apache.sqoop.model.InputEditable;
|
||||
import org.apache.sqoop.model.MBooleanInput;
|
||||
import org.apache.sqoop.model.MConfig;
|
||||
@ -70,9 +72,11 @@
|
||||
import org.apache.sqoop.model.MStringInput;
|
||||
import org.apache.sqoop.model.MSubmission;
|
||||
import org.apache.sqoop.model.MToConfig;
|
||||
import org.apache.sqoop.model.ModelError;
|
||||
import org.apache.sqoop.model.SubmissionError;
|
||||
import org.apache.sqoop.repository.JdbcRepositoryHandler;
|
||||
import org.apache.sqoop.repository.MasterKeyManager;
|
||||
import org.apache.sqoop.repository.RepositoryError;
|
||||
import org.apache.sqoop.security.SecurityConstants;
|
||||
import org.apache.sqoop.submission.SubmissionStatus;
|
||||
import org.apache.sqoop.submission.counter.Counter;
|
||||
@ -2277,8 +2281,37 @@ private void transitionInputs(MasterKeyManager fromMasterKeyManager, MasterKeyMa
|
||||
try (ResultSet inputs = selectInputsStatement.executeQuery()) {
|
||||
while (inputs.next()) {
|
||||
long inputId = inputs.getLong(1);
|
||||
String inputName = inputs.getString(2);
|
||||
|
||||
boolean encrypted = inputs.getBoolean(11);
|
||||
boolean sensitive = inputs.getBoolean(6);
|
||||
String configType = inputs.getString(15);
|
||||
String connectorName = inputs.getString(16);
|
||||
|
||||
SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(connectorName);
|
||||
Class configurationClass;
|
||||
if (MConfigType.LINK.name().equals(configType)) {
|
||||
configurationClass = connector.getLinkConfigurationClass();
|
||||
} else {
|
||||
String direction = inputs.getString(17);
|
||||
if (direction == null) {
|
||||
configurationClass = Driver.getInstance().getDriverJobConfigurationClass();
|
||||
} else {
|
||||
configurationClass = connector.getJobConfigurationClass(Direction.valueOf(direction));
|
||||
}
|
||||
}
|
||||
|
||||
List<MConfig> mConfigList = ConfigUtils.toConfigs(configurationClass);
|
||||
MInput mInput = null;
|
||||
for (MConfig mConfig : mConfigList) {
|
||||
if (mConfig.getInputNames().contains(inputName)) {
|
||||
mInput = mConfig.getInput(inputName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mInput == null) {
|
||||
throw new SqoopException(RepositoryError.REPO_0002);
|
||||
}
|
||||
|
||||
if (encrypted) {
|
||||
assert(fromMasterKeyManager != null);
|
||||
@ -2289,7 +2322,7 @@ private void transitionInputs(MasterKeyManager fromMasterKeyManager, MasterKeyMa
|
||||
|
||||
String plainTextValue = readInputValue(fromMasterKeyManager, encryptedValue, encrypted, iv, hmac);
|
||||
|
||||
if (toMasterKeyManager != null && sensitive) {
|
||||
if (toMasterKeyManager != null && shoudEncryptInput(mInput)) {
|
||||
// We need to encrypt the input
|
||||
String newIv = toMasterKeyManager.generateRandomIv();
|
||||
String encryptedInput = toMasterKeyManager.encryptWithMasterKey(plainTextValue, newIv);
|
||||
@ -2308,7 +2341,7 @@ private void transitionInputs(MasterKeyManager fromMasterKeyManager, MasterKeyMa
|
||||
updateInputsStatement.setLong(5, inputId);
|
||||
}
|
||||
updateInputsStatement.executeUpdate();
|
||||
} else if (toMasterKeyManager != null && sensitive) {
|
||||
} else if (toMasterKeyManager != null && shoudEncryptInput(mInput)) {
|
||||
// We need to encrypt the input
|
||||
String plainTextValue = inputs.getString(10);
|
||||
String newIv = toMasterKeyManager.generateRandomIv();
|
||||
@ -2586,7 +2619,7 @@ private void createInputValues(String query, long id, List<MConfig> configs, Con
|
||||
try (PreparedStatement stmt = conn.prepareStatement(query)) {
|
||||
stmt.setLong(1, id);
|
||||
stmt.setLong(2, input.getPersistenceId());
|
||||
if (input.isSensitive() && encryptionEnabled) {
|
||||
if (shoudEncryptInput(input) && encryptionEnabled) {
|
||||
String iv = MasterKeyManager.getInstance().generateRandomIv();
|
||||
String hmac = null;
|
||||
String encryptedInput = masterKeyManager.encryptWithMasterKey(input.getUrlSafeValueString(), iv);
|
||||
@ -2610,6 +2643,11 @@ private void createInputValues(String query, long id, List<MConfig> configs, Con
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shoudEncryptInput(MInput input) {
|
||||
boolean hasSensitiveKeyPattern = (input instanceof MMapInput) && StringUtils.isNotEmpty(((MMapInput) input).getSensitiveKeyPattern());
|
||||
return input.isSensitive() || hasSensitiveKeyPattern;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute given query via a PreparedStatement.
|
||||
* A list of args can be passed to the query.
|
||||
|
@ -251,10 +251,17 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_VALUE) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_ENCRYPTED) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_IV) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_HMAC)
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_HMAC) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_NAME) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_TYPE) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQC_NAME)
|
||||
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_INPUT_NAME)
|
||||
+ " RIGHT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ID) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_INPUT);
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ID) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_INPUT)
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID)
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIGURABLE_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_CONFIGURABLE) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQC_ID);
|
||||
|
||||
private static final String UPDATE_LINK_INPUT =
|
||||
"UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME) + " SET "
|
||||
@ -305,11 +312,22 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQBI_VALUE) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQBI_ENCRYPTED) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQBI_IV) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQBI_HMAC)
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQBI_HMAC) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_NAME) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_TYPE) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQC_NAME) + ", "
|
||||
+ CommonRepoUtils.escapeColumnName(COLUMN_SQD_NAME)
|
||||
+ " FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_INPUT_NAME)
|
||||
+ " RIGHT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_INPUT) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_ID)
|
||||
+ " ORDER BY " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_INDEX);
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQI_CONFIG) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID)
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIGURABLE_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_CONFIGURABLE) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQC_ID)
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_CONFIG_DIRECTIONS_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_ID) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_DIR_CONFIG)
|
||||
+ " LEFT OUTER JOIN " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_DIRECTION_NAME)
|
||||
+ " ON " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_CFG_DIR_DIRECTION) + " = " + CommonRepoUtils.escapeColumnName(COLUMN_SQD_ID);
|
||||
|
||||
private static final String UPDATE_JOB_INPUT =
|
||||
"UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME) + " SET "
|
||||
|
@ -20,9 +20,14 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.sqoop.client.SqoopClient;
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.connector.ConnectorManager;
|
||||
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
|
||||
import org.apache.sqoop.core.PropertiesConfigurationProvider;
|
||||
import org.apache.sqoop.core.SqoopConfiguration;
|
||||
import org.apache.sqoop.model.MInput;
|
||||
import org.apache.sqoop.model.MJob;
|
||||
import org.apache.sqoop.model.MLink;
|
||||
import org.apache.sqoop.model.MMapInput;
|
||||
import org.apache.sqoop.model.MStringInput;
|
||||
import org.apache.sqoop.repository.MasterKeyManager;
|
||||
import org.apache.sqoop.repository.RepositoryManager;
|
||||
@ -45,6 +50,7 @@
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.sqoop.repository.common.CommonRepositorySchemaConstants.COLUMN_SQ_LNKI_ENCRYPTED;
|
||||
@ -83,6 +89,9 @@ public class RepositoryEncryptionToolTest extends SqoopTestCase {
|
||||
private int pbkdf2Rounds;
|
||||
private int ivLength;
|
||||
|
||||
private MStringInput sensitiveInput;
|
||||
private MMapInput sensitiveKeyPatternInput;
|
||||
|
||||
public static class SqoopMiniCluster extends JettySqoopMiniCluster {
|
||||
|
||||
private boolean repositoryEncryptionEnabled;
|
||||
@ -143,6 +152,10 @@ protected Map<String, String> getSecurityConfiguration() {
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
public boolean isRepositoryEncryptionEnabled() {
|
||||
return repositoryEncryptionEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeMethod
|
||||
@ -163,21 +176,12 @@ public void before() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testNotEncryptedToEncrypted() throws Exception {
|
||||
// Start nonencrypted sqoop instance
|
||||
sqoopMiniCluster = new SqoopMiniCluster(temporaryPath, getHadoopConf());
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
verifyMasterKeyDoesNotExist();
|
||||
|
||||
// Create a link and a job with a secure input
|
||||
SqoopClient client = new SqoopClient(sqoopMiniCluster.getServerUrl());
|
||||
MLink link = client.createLink("generic-jdbc-connector");
|
||||
link.setName("zelda");
|
||||
fillRdbmsLinkConfig(link);
|
||||
client.saveLink(link);
|
||||
|
||||
MStringInput sensitiveInput = link.getConnectorLinkConfig().getStringInput("linkConfig.password");
|
||||
verifyPlaintextInput(sensitiveInput.getPersistenceId(), sensitiveInput.getValue());
|
||||
createInputsAndJob();
|
||||
|
||||
// Stop sqoop instance
|
||||
sqoopMiniCluster.stop();
|
||||
@ -197,20 +201,13 @@ public void testNotEncryptedToEncrypted() throws Exception {
|
||||
|
||||
cleanUpAfterTool();
|
||||
|
||||
// Verify that the data is encrypted
|
||||
StringBuffer cipherText = new StringBuffer();
|
||||
StringBuffer iv = new StringBuffer();
|
||||
StringBuffer hmac = new StringBuffer();
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherText, iv, hmac);
|
||||
|
||||
// Read the encrypted data by using the MasterKeyManager the server initializes
|
||||
sqoopMiniCluster = new SqoopMiniCluster(temporaryPath, getHadoopConf(), passwordGenerator,
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
String decrypted = MasterKeyManager.getInstance().decryptWithMasterKey(cipherText.toString(), iv.toString(), hmac.toString());
|
||||
|
||||
Assert.assertEquals(sensitiveInput.getValue(), decrypted);
|
||||
verifyEncryptedInput(sensitiveInput);
|
||||
verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -219,21 +216,11 @@ public void testEncryptedToNotEncrypted() throws Exception {
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
SqoopClient client = new SqoopClient(sqoopMiniCluster.getServerUrl());
|
||||
MLink link = client.createLink("generic-jdbc-connector");
|
||||
link.setName("zelda");
|
||||
fillRdbmsLinkConfig(link);
|
||||
client.saveLink(link);
|
||||
MStringInput sensitiveInput = link.getConnectorLinkConfig().getStringInput("linkConfig.password");
|
||||
createInputsAndJob();
|
||||
|
||||
StringBuffer cipherText = new StringBuffer();
|
||||
StringBuffer iv = new StringBuffer();
|
||||
StringBuffer hmac = new StringBuffer();
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherText, iv, hmac);
|
||||
verifyEncryptedInput(sensitiveInput);
|
||||
verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
|
||||
String decrypted = MasterKeyManager.getInstance().decryptWithMasterKey(cipherText.toString(), iv.toString(), hmac.toString());
|
||||
|
||||
// Stop sqoop instance
|
||||
sqoopMiniCluster.stop();
|
||||
|
||||
// Run tool
|
||||
@ -254,7 +241,8 @@ public void testEncryptedToNotEncrypted() throws Exception {
|
||||
sqoopMiniCluster = new SqoopMiniCluster(temporaryPath, getHadoopConf());
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
verifyPlaintextInput(sensitiveInput.getPersistenceId(), decrypted);
|
||||
verifyPlaintextInput(sensitiveInput);
|
||||
verifyPlaintextInput(sensitiveKeyPatternInput);
|
||||
|
||||
verifyMasterKeyDoesNotExist();
|
||||
}
|
||||
@ -265,24 +253,13 @@ public void testEncryptedToEncrypted() throws Exception {
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
SqoopClient client = new SqoopClient(sqoopMiniCluster.getServerUrl());
|
||||
MLink link = client.createLink("generic-jdbc-connector");
|
||||
link.setName("zelda");
|
||||
fillRdbmsLinkConfig(link);
|
||||
client.saveLink(link);
|
||||
MStringInput sensitiveInput = link.getConnectorLinkConfig().getStringInput("linkConfig.password");
|
||||
createInputsAndJob();
|
||||
|
||||
StringBuffer cipherTextFrom = new StringBuffer();
|
||||
StringBuffer ivFrom = new StringBuffer();
|
||||
StringBuffer hmacFrom = new StringBuffer();
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherTextFrom, ivFrom, hmacFrom);
|
||||
String fromSensitiveCiphertext = verifyEncryptedInput(sensitiveInput);
|
||||
String fromSensitiveKeyPatternCiphertext = verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
|
||||
String decryptedFirst = MasterKeyManager.getInstance().decryptWithMasterKey(cipherTextFrom.toString(), ivFrom.toString(), hmacFrom.toString());
|
||||
|
||||
// Stop sqoop instance
|
||||
sqoopMiniCluster.stop();
|
||||
|
||||
// Run tool
|
||||
RepositoryEncryptionTool repositoryEncryptionTool = new RepositoryEncryptionTool();
|
||||
repositoryEncryptionTool.runToolWithConfiguration(new String[] {
|
||||
"-F" + SecurityConstants.REPO_ENCRYPTION_PASSWORD_GENERATOR + "=" + passwordGenerator,
|
||||
@ -306,22 +283,15 @@ public void testEncryptedToEncrypted() throws Exception {
|
||||
|
||||
cleanUpAfterTool();
|
||||
|
||||
StringBuffer cipherTextTo = new StringBuffer();
|
||||
StringBuffer ivTo = new StringBuffer();
|
||||
StringBuffer hmacTo = new StringBuffer();
|
||||
|
||||
Assert.assertNotEquals(cipherTextFrom, cipherTextTo);
|
||||
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherTextTo, ivTo, hmacTo);
|
||||
|
||||
// Read the encrypted data by using the MasterKeyManager the server initializes
|
||||
sqoopMiniCluster = new SqoopMiniCluster(temporaryPath, getHadoopConf(), passwordGenerator,
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
String decryptedSecond = MasterKeyManager.getInstance().decryptWithMasterKey(cipherTextTo.toString(), ivTo.toString(), hmacTo.toString());
|
||||
String toSesitiveCipherText = verifyEncryptedInput(sensitiveInput);
|
||||
String toSensitiveKeyPatternCiphertext = verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
|
||||
Assert.assertEquals(decryptedFirst, decryptedSecond);
|
||||
Assert.assertNotEquals(fromSensitiveCiphertext, toSesitiveCipherText);
|
||||
Assert.assertNotEquals(fromSensitiveKeyPatternCiphertext, toSensitiveKeyPatternCiphertext);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -330,24 +300,14 @@ public void testEncryptedToEncryptedUsingConfiguration() throws Exception {
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
SqoopClient client = new SqoopClient(sqoopMiniCluster.getServerUrl());
|
||||
MLink link = client.createLink("generic-jdbc-connector");
|
||||
link.setName("zelda");
|
||||
fillRdbmsLinkConfig(link);
|
||||
client.saveLink(link);
|
||||
MStringInput sensitiveInput = link.getConnectorLinkConfig().getStringInput("linkConfig.password");
|
||||
createInputsAndJob();
|
||||
|
||||
StringBuffer cipherTextFrom = new StringBuffer();
|
||||
StringBuffer ivFrom = new StringBuffer();
|
||||
StringBuffer hmacFrom = new StringBuffer();
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherTextFrom, ivFrom, hmacFrom);
|
||||
|
||||
String decryptedFirst = MasterKeyManager.getInstance().decryptWithMasterKey(cipherTextFrom.toString(), ivFrom.toString(), hmacFrom.toString());
|
||||
String fromSensitiveCiphertext = verifyEncryptedInput(sensitiveInput);
|
||||
String fromSensitiveKeyPatternCiphertext = verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
|
||||
// Read the configuration context that we will need for the tool
|
||||
MapContext configurationMapContext = SqoopConfiguration.getInstance().getContext();
|
||||
|
||||
// Stop sqoop instance
|
||||
sqoopMiniCluster.stop();
|
||||
|
||||
// Set the configuration
|
||||
@ -356,8 +316,6 @@ public void testEncryptedToEncryptedUsingConfiguration() throws Exception {
|
||||
when(configurationMock.getContext()).thenReturn(configurationMapContext);
|
||||
when(configurationMock.getProvider()).thenReturn(new PropertiesConfigurationProvider());
|
||||
SqoopConfiguration.setInstance(configurationMock);
|
||||
|
||||
// Run tool
|
||||
RepositoryEncryptionTool repositoryEncryptionTool = new RepositoryEncryptionTool();
|
||||
repositoryEncryptionTool.runToolWithConfiguration(new String[] {
|
||||
"-FuseConf",
|
||||
@ -366,27 +324,59 @@ public void testEncryptedToEncryptedUsingConfiguration() throws Exception {
|
||||
|
||||
cleanUpAfterTool();
|
||||
|
||||
StringBuffer cipherTextTo = new StringBuffer();
|
||||
StringBuffer ivTo = new StringBuffer();
|
||||
StringBuffer hmacTo = new StringBuffer();
|
||||
|
||||
Assert.assertNotEquals(cipherTextFrom, cipherTextTo);
|
||||
|
||||
readEncryptedInput(sensitiveInput.getPersistenceId(), cipherTextTo, ivTo, hmacTo);
|
||||
|
||||
// Read the encrypted data by using the MasterKeyManager the server initializes
|
||||
sqoopMiniCluster = new SqoopMiniCluster(temporaryPath, getHadoopConf(), passwordGenerator,
|
||||
hmacAlgorithm, cipherAlgorithm, cipherKeySize, cipherSpec, pbkdf2Algorithm, pbkdf2Rounds, ivLength);
|
||||
sqoopMiniCluster.start();
|
||||
|
||||
String decryptedSecond = MasterKeyManager.getInstance().decryptWithMasterKey(cipherTextTo.toString(), ivTo.toString(), hmacTo.toString());
|
||||
String toSesitiveCipherText = verifyEncryptedInput(sensitiveInput);
|
||||
String toSensitiveKeyPatternCiphertext = verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
|
||||
Assert.assertEquals(decryptedFirst, decryptedSecond);
|
||||
Assert.assertNotEquals(fromSensitiveCiphertext, toSesitiveCipherText);
|
||||
Assert.assertNotEquals(fromSensitiveKeyPatternCiphertext, toSensitiveKeyPatternCiphertext);
|
||||
|
||||
SqoopConfiguration.setInstance(oldSqoopConfiguration);
|
||||
}
|
||||
|
||||
private void createInputsAndJob() throws Exception {
|
||||
SqoopClient client = new SqoopClient(sqoopMiniCluster.getServerUrl());
|
||||
MLink jdbcLink = client.createLink("generic-jdbc-connector");
|
||||
jdbcLink.setName("jdbcLink");
|
||||
fillRdbmsLinkConfig(jdbcLink);
|
||||
client.saveLink(jdbcLink);
|
||||
|
||||
MLink hdfsLink = client.createLink("hdfs-connector");
|
||||
hdfsLink.setName("hdfsLink");
|
||||
hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue((sqoopMiniCluster.getConfigurationPath()));
|
||||
Map<String, String> hdfsConfigOverrides = new HashMap<>();
|
||||
|
||||
// This will be considered sensitive
|
||||
hdfsConfigOverrides.put("password", "secret");
|
||||
hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(hdfsConfigOverrides);
|
||||
|
||||
client.saveLink(hdfsLink);
|
||||
|
||||
sensitiveInput = jdbcLink.getConnectorLinkConfig().getStringInput("linkConfig.password");
|
||||
|
||||
sensitiveKeyPatternInput = hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides");
|
||||
|
||||
if (sqoopMiniCluster.isRepositoryEncryptionEnabled()) {
|
||||
verifyEncryptedInput(sensitiveInput);
|
||||
verifyEncryptedInput(sensitiveKeyPatternInput);
|
||||
} else {
|
||||
verifyPlaintextInput(sensitiveInput);
|
||||
verifyPlaintextInput(sensitiveKeyPatternInput);
|
||||
}
|
||||
|
||||
MJob job = client.createJob(jdbcLink.getName(), hdfsLink.getName());
|
||||
job.setName("job");
|
||||
job.getDriverConfig().getIntegerInput("throttlingConfig.numExtractors").setValue(1);
|
||||
fillRdbmsFromConfig(job, "id");
|
||||
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
|
||||
client.saveJob(job);
|
||||
}
|
||||
|
||||
private void cleanUpAfterTool() {
|
||||
ConnectorManager.getInstance().destroy();
|
||||
RepositoryManager.getInstance().destroy();
|
||||
MasterKeyManager.getInstance().destroy();
|
||||
SqoopConfiguration.getInstance().destroy();
|
||||
@ -400,12 +390,24 @@ private void verifyMasterKeyDoesNotExist() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyPlaintextInput(long persistenceId, String expectedValue) throws Exception {
|
||||
private String verifyEncryptedInput(MInput<?> input) throws Exception {
|
||||
StringBuffer cipherText = new StringBuffer();
|
||||
StringBuffer iv = new StringBuffer();
|
||||
StringBuffer hmac = new StringBuffer();
|
||||
readEncryptedInput(input.getPersistenceId(), cipherText, iv, hmac);
|
||||
|
||||
String sensitiveDecrypted = MasterKeyManager.getInstance().decryptWithMasterKey(cipherText.toString(), iv.toString(), hmac.toString());
|
||||
Assert.assertEquals(input.getUrlSafeValueString(), sensitiveDecrypted);
|
||||
|
||||
return cipherText.toString();
|
||||
}
|
||||
|
||||
private void verifyPlaintextInput(MInput<?> input) throws Exception {
|
||||
try (PreparedStatement inputSelection = DriverManager.getConnection(JDBC_URL).prepareStatement(INPUT_VALUE_QUERY)) {
|
||||
inputSelection.setLong(1, persistenceId);
|
||||
inputSelection.setLong(1, input.getPersistenceId());
|
||||
try (ResultSet resultSet = inputSelection.executeQuery()) {
|
||||
while (resultSet.next()) {
|
||||
Assert.assertEquals(expectedValue, resultSet.getString(2));
|
||||
Assert.assertEquals(input.getUrlSafeValueString(), resultSet.getString(2));
|
||||
Assert.assertFalse(resultSet.getBoolean(3));
|
||||
Assert.assertNull(resultSet.getString(4));
|
||||
Assert.assertNull(resultSet.getString(5));
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.sqoop.cli.SqoopGnuParser;
|
||||
import org.apache.sqoop.common.MapContext;
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.apache.sqoop.connector.ConnectorManager;
|
||||
import org.apache.sqoop.core.SqoopConfiguration;
|
||||
import org.apache.sqoop.repository.MasterKeyManager;
|
||||
import org.apache.sqoop.repository.Repository;
|
||||
@ -57,6 +58,7 @@ public boolean runToolWithConfiguration(String[] arguments) {
|
||||
CommandLineParser parser = new SqoopGnuParser();
|
||||
SqoopConfiguration.getInstance().initialize();
|
||||
RepositoryManager.getInstance().initialize();
|
||||
ConnectorManager.getInstance().initialize();
|
||||
Repository repository = RepositoryManager.getInstance().getRepository();
|
||||
|
||||
CommandLine line;
|
||||
|
Loading…
Reference in New Issue
Block a user