5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-05 06:12:25 +08:00

SQOOP-2629: Sqoop2: Add ability to "blacklist" connectors

(Abraham Fine via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2015-10-27 20:14:19 -07:00
parent c54f928547
commit f33554a19e
12 changed files with 169 additions and 15 deletions

View File

@ -18,6 +18,7 @@
package org.apache.sqoop.connector; package org.apache.sqoop.connector;
import java.net.URL; import java.net.URL;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -27,6 +28,7 @@
import java.util.ResourceBundle; import java.util.ResourceBundle;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
@ -40,6 +42,7 @@
import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.repository.RepositoryTransaction; import org.apache.sqoop.repository.RepositoryTransaction;
import org.apache.sqoop.utils.ContextUtils;
public class ConnectorManager implements Reconfigurable { public class ConnectorManager implements Reconfigurable {
@ -58,6 +61,8 @@ public class ConnectorManager implements Reconfigurable {
*/ */
private static boolean DEFAULT_AUTO_UPGRADE = false; private static boolean DEFAULT_AUTO_UPGRADE = false;
private Set<String> blacklistedConnectors;
/** /**
* Create default object by default. * Create default object by default.
* *
@ -70,7 +75,15 @@ public class ConnectorManager implements Reconfigurable {
/** /**
* The private constructor for the singleton class. * The private constructor for the singleton class.
*/ */
private ConnectorManager() {} private ConnectorManager() {
String blacklistedConnectorsString =
SqoopConfiguration.getInstance().getContext().getString(ConfigurationConstants.BLACKLISTED_CONNECTORS);
if (blacklistedConnectorsString == null) {
blacklistedConnectors = Collections.EMPTY_SET;
} else {
blacklistedConnectors = ContextUtils.getUniqueStrings(blacklistedConnectorsString);
}
}
/** /**
* Return current instance. * Return current instance.
@ -176,7 +189,7 @@ public synchronized void initialize(boolean autoUpgrade) {
LOG.trace("Begin connector manager initialization"); LOG.trace("Begin connector manager initialization");
} }
List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs(); List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs(blacklistedConnectors);
LOG.info("Connector config urls: " + connectorConfigs); LOG.info("Connector config urls: " + connectorConfigs);

View File

@ -19,6 +19,7 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.error.code.ConnectorError; import org.apache.sqoop.error.code.ConnectorError;
import java.io.File; import java.io.File;
@ -27,6 +28,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.jar.JarEntry; import java.util.jar.JarEntry;
import java.util.jar.JarFile; import java.util.jar.JarFile;
@ -40,15 +43,18 @@ public class ConnectorManagerUtils {
* *
* @return List of URLs. * @return List of URLs.
*/ */
public static List<URL> getConnectorConfigs() { public static List<URL> getConnectorConfigs(Set<String> connectorBlacklist) {
List<URL> connectorConfigs = new ArrayList<URL>(); List<URL> connectorConfigs = new ArrayList<>();
try { try {
// Check ConnectorManager classloader. // Check ConnectorManager classloader.
Enumeration<URL> appPathConfigs = ConnectorManager.class.getClassLoader().getResources( Enumeration<URL> appPathConfigs = ConnectorManager.class.getClassLoader().getResources(
ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES); ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
while (appPathConfigs.hasMoreElements()) { while (appPathConfigs.hasMoreElements()) {
connectorConfigs.add(appPathConfigs.nextElement()); URL connectorConfig = appPathConfigs.nextElement();
if (!isBlacklisted(connectorConfig, connectorBlacklist)) {
connectorConfigs.add(connectorConfig);
}
} }
// Check thread context classloader. // Check thread context classloader.
@ -58,7 +64,7 @@ public static List<URL> getConnectorConfigs() {
while (ctxPathConfigs.hasMoreElements()) { while (ctxPathConfigs.hasMoreElements()) {
URL configUrl = ctxPathConfigs.nextElement(); URL configUrl = ctxPathConfigs.nextElement();
if (!connectorConfigs.contains(configUrl)) { if (!connectorConfigs.contains(configUrl) && !isBlacklisted(configUrl, connectorBlacklist)) {
connectorConfigs.add(configUrl); connectorConfigs.add(configUrl);
} }
} }
@ -70,6 +76,17 @@ public static List<URL> getConnectorConfigs() {
return connectorConfigs; return connectorConfigs;
} }
static boolean isBlacklisted(URL connectorConfig, Set<String> connectorBlacklist) throws IOException {
Properties properties = new Properties();
properties.load(connectorConfig.openStream());
String connectorName = properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
if (connectorName == null) {
throw new IOException("malformed connector properties: " + connectorConfig.getPath());
} else {
return connectorBlacklist.contains(connectorName);
}
}
static boolean isConnectorJar(File file) { static boolean isConnectorJar(File file) {
try (JarFile jarFile = new JarFile(file)) { try (JarFile jarFile = new JarFile(file)) {
@SuppressWarnings("resource") @SuppressWarnings("resource")

View File

@ -86,6 +86,11 @@ public final class ConfigurationConstants {
*/ */
public static final String JOB_CLASSPATH = "org.apache.sqoop.classpath.job"; public static final String JOB_CLASSPATH = "org.apache.sqoop.classpath.job";
/**
* List of connectors that will not be loaded at startup
*/
public static final String BLACKLISTED_CONNECTORS = "org.apache.sqoop.connector.blacklist";
private ConfigurationConstants() { private ConfigurationConstants() {
// Disable explicit object creation // Disable explicit object creation
} }

View File

@ -20,34 +20,55 @@
import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import org.apache.sqoop.core.ConfigurationConstants;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.File; import java.io.File;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
public class TestConnectorManagerUtils { public class TestConnectorManagerUtils {
private String workingDir; private String workingDir;
private String testConnectorPath;
private String testNonConnectorPath;
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
workingDir = System.getProperty("user.dir"); workingDir = System.getProperty("user.dir");
testConnectorPath = workingDir + "/src/test/resources/test-connector.jar";
testNonConnectorPath = workingDir + "/src/test/resources/test-non-connector.jar";
} }
@Test @Test
public void testIsConnectorJar() { public void testIsConnectorJar() {
String path = workingDir + "/src/test/resources/test-connector.jar"; File connectorJar = new File(testConnectorPath);
File connectorJar = new File(path);
assertTrue(connectorJar.exists()); assertTrue(connectorJar.exists());
assertTrue(ConnectorManagerUtils.isConnectorJar(connectorJar)); assertTrue(ConnectorManagerUtils.isConnectorJar(connectorJar));
} }
@Test @Test
public void testIsNotConnectorJar() { public void testIsNotConnectorJar() {
String path = workingDir + "/src/test/resources/test-non-connector.jar"; File file = new File(testNonConnectorPath);
File file = new File(path);
assertTrue(file.exists()); assertTrue(file.exists());
assertFalse(ConnectorManagerUtils.isConnectorJar(file)); assertFalse(ConnectorManagerUtils.isConnectorJar(file));
} }
@Test
public void testIsBlacklisted() throws Exception {
URL url = new URL("jar:file:" + testConnectorPath + "!/" + ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
Set<String> blacklistedConnectors = new HashSet<>();
// The test connector is a copy of the generic jdbc connector
blacklistedConnectors.add("generic-jdbc-connector");
assertTrue(ConnectorManagerUtils.isBlacklisted(url, blacklistedConnectors));
blacklistedConnectors = new HashSet<>();
blacklistedConnectors.add("not-there");
assertFalse(ConnectorManagerUtils.isBlacklisted(url, blacklistedConnectors));
}
} }

View File

@ -18,8 +18,10 @@
package org.apache.sqoop.driver; package org.apache.sqoop.driver;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -28,9 +30,11 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.error.code.DriverError; import org.apache.sqoop.error.code.DriverError;
import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MLink;
@ -48,9 +52,14 @@ public class TestJobManager {
private ConnectorManager connectorMgrMock; private ConnectorManager connectorMgrMock;
private RepositoryManager repositoryManagerMock; private RepositoryManager repositoryManagerMock;
private Repository jdbcRepoMock; private Repository jdbcRepoMock;
private SqoopConfiguration configurationMock;
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() { public void setUp() {
configurationMock = mock(SqoopConfiguration.class);
doReturn(new MapContext(Collections.EMPTY_MAP)).when(configurationMock).getContext();
SqoopConfiguration.setInstance(configurationMock);
jobManager = JobManager.getInstance(); jobManager = JobManager.getInstance();
connectorMgrMock = mock(ConnectorManager.class); connectorMgrMock = mock(ConnectorManager.class);
sqoopConnectorMock = mock(SqoopConnector.class); sqoopConnectorMock = mock(SqoopConnector.class);

View File

@ -40,11 +40,13 @@
import java.util.List; import java.util.List;
import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.common.EmptyConfiguration;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.driver.Driver; import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.driver.DriverUpgrader;
import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.json.DriverBean;
@ -76,9 +78,14 @@ public class TestJdbcRepository {
private JdbcRepositoryHandler repoHandlerMock; private JdbcRepositoryHandler repoHandlerMock;
private ConnectorConfigurableUpgrader connectorUpgraderMock; private ConnectorConfigurableUpgrader connectorUpgraderMock;
private DriverUpgrader driverUpgraderMock; private DriverUpgrader driverUpgraderMock;
private SqoopConfiguration configurationMock;
@BeforeMethod(alwaysRun = true) @BeforeMethod(alwaysRun = true)
public void setUp() throws Exception { public void setUp() throws Exception {
configurationMock = mock(SqoopConfiguration.class);
doReturn(new MapContext(Collections.EMPTY_MAP)).when(configurationMock).getContext();
SqoopConfiguration.setInstance(configurationMock);
repoTransactionMock = mock(JdbcRepositoryTransaction.class); repoTransactionMock = mock(JdbcRepositoryTransaction.class);
connectorMgrMock = mock(ConnectorManager.class); connectorMgrMock = mock(ConnectorManager.class);
driverMock = mock(Driver.class); driverMock = mock(Driver.class);

View File

@ -199,3 +199,8 @@ org.apache.sqoop.classpath.job=
#org.apache.sqoop.jetty.thread.pool.worker.min=5 #org.apache.sqoop.jetty.thread.pool.worker.min=5
#org.apache.sqoop.jetty.thread.pool.worker.alive.time=60 #org.apache.sqoop.jetty.thread.pool.worker.alive.time=60
#org.apache.sqoop.jetty.port=12000 #org.apache.sqoop.jetty.port=12000
# Blacklisted Connectors
# ":" separated list of connector names as specified in their
# sqoopconnector.properties file
org.apache.sqoop.connector.blacklist=

View File

@ -32,6 +32,7 @@
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types; import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -752,7 +753,7 @@ protected long registerHdfsConnector(Connection conn) {
LOG.trace("Begin HDFS Connector pre-loading."); LOG.trace("Begin HDFS Connector pre-loading.");
} }
List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs(); List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs(Collections.EMPTY_SET);
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Connector configs: " + connectorConfigs); LOG.info("Connector configs: " + connectorConfigs);

View File

@ -140,6 +140,7 @@ protected void prepareTemporaryPath() throws Exception {
mapToProperties(sqoopProperties, getConnectorManagerConfiguration()); mapToProperties(sqoopProperties, getConnectorManagerConfiguration());
mapToProperties(sqoopProperties, getDriverManagerConfiguration()); mapToProperties(sqoopProperties, getDriverManagerConfiguration());
mapToProperties(sqoopProperties, getClasspathConfiguration()); mapToProperties(sqoopProperties, getClasspathConfiguration());
mapToProperties(sqoopProperties, getBlacklistedConnectorConfiguration());
FileUtils.writeLines(f, sqoopProperties); FileUtils.writeLines(f, sqoopProperties);
@ -230,4 +231,8 @@ protected Map<String, String> getDriverManagerConfiguration() {
protected Map<String, String> getClasspathConfiguration() { protected Map<String, String> getClasspathConfiguration() {
return MapUtils.EMPTY_MAP; return MapUtils.EMPTY_MAP;
} }
protected Map<String, String> getBlacklistedConnectorConfiguration() {
return MapUtils.EMPTY_MAP;
}
} }

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.integration.connectorloading;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.apache.sqoop.test.utils.HdfsUtils;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
public class BlacklistedConnectorTest extends ConnectorTestCase {
public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster {
public DerbySqoopMiniCluster(String temporaryPath, Configuration configuration) throws Exception {
super(temporaryPath, configuration);
}
@Override
protected Map<String, String> getBlacklistedConnectorConfiguration() {
Map<String, String> properties = new HashMap<>();
properties.put(ConfigurationConstants.BLACKLISTED_CONNECTORS, "fake-connector:generic-jdbc-connector");
return properties;
}
}
public void startSqoopMiniCluster() throws Exception {
// And use them for new Derby repo instance
setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super
.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration()));
// Start server
getCluster().start();
// Initialize Sqoop Client API
setClient(new SqoopClient(getServerUrl()));
}
@Test(expectedExceptions = {SqoopException.class})
public void testCreateLinkWithNonexistantConnector() throws Exception {
startSqoopMiniCluster();
getClient().createLink("generic-jdbc-connector");
}
@Override
public void startSqoop() throws Exception {
// Do nothing so that Sqoop isn't started before Suite.
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.sqoop.integration.classpath; package org.apache.sqoop.integration.connectorloading;
import org.apache.commons.collections.ListUtils; import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;

View File

@ -18,15 +18,15 @@ limitations under the License.
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" > <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="ClasspathTests" verbose="2" parallel="false"> <suite name="ConnectorLoadingTests" verbose="2" parallel="false">
<listeners> <listeners>
<listener class-name="org.apache.sqoop.test.testng.SqoopTestListener" /> <listener class-name="org.apache.sqoop.test.testng.SqoopTestListener" />
</listeners> </listeners>
<test name="ClasspathTests"> <test name="ConnectorLoadingTests">
<packages> <packages>
<package name="org.apache.sqoop.integration.classpath"/> <package name="org.apache.sqoop.integration.connectorloading"/>
</packages> </packages>
</test> </test>