5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-20 19:00:48 +08:00

SQOOP-971: Sqoop2: Component reconfigurability

(Mengwei Ding via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2013-06-24 09:31:22 -07:00
parent 156facc491
commit d62567ddf3
9 changed files with 281 additions and 15 deletions

View File

@ -33,12 +33,15 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.repository.RepositoryTransaction; import org.apache.sqoop.repository.RepositoryTransaction;
import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MConnector;
public class ConnectorManager { public class ConnectorManager implements Reconfigurable {
/** /**
* Logger object. * Logger object.
@ -184,6 +187,8 @@ public synchronized void initialize() {
registerConnectors(); registerConnectors();
SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Connectors loaded: " + handlerMap); LOG.info("Connectors loaded: " + handlerMap);
} }
@ -231,4 +236,13 @@ public synchronized void destroy() {
handlerMap = null; handlerMap = null;
nameMap = null; nameMap = null;
} }
@Override
public synchronized void configurationChanged() {
LOG.info("Begin connector manager reconfiguring");
// If there are configuration options for ConnectorManager,
// implement the reconfiguration procedure right here.
LOG.info("Connector manager reconfigured");
}
} }

View File

@ -51,6 +51,9 @@ public enum CoreError implements ErrorCode {
/** The configuration system has not been initialized correctly. */ /** The configuration system has not been initialized correctly. */
CORE_0007("System not initialized"), CORE_0007("System not initialized"),
/** The system has not been reconfigured */
CORE_0008("System not reconfigured");
; ;
private final String message; private final String message;

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.core;
/**
* Interface that make Sqoop Server components sensitive to
* configuration file changes at the runtime
*/
public interface Reconfigurable {
/**
* Method to notify each reconfigurable components
*/
public void configurationChanged();
}

View File

@ -21,7 +21,6 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -33,7 +32,7 @@
/** /**
* Configuration manager that loads Sqoop configuration. * Configuration manager that loads Sqoop configuration.
*/ */
public class SqoopConfiguration { public class SqoopConfiguration implements Reconfigurable {
/** /**
* Logger object. * Logger object.
@ -79,6 +78,7 @@ public static void setInstance(SqoopConfiguration newInstance) {
private boolean initialized = false; private boolean initialized = false;
private ConfigurationProvider provider = null; private ConfigurationProvider provider = null;
private Map<String, String> config = null; private Map<String, String> config = null;
private Map<String, String> oldConfig = null;
public synchronized void initialize() { public synchronized void initialize() {
if (initialized) { if (initialized) {
@ -165,8 +165,9 @@ public synchronized void initialize() {
// Initialize the configuration provider // Initialize the configuration provider
provider.initialize(configDir, bootstrapProperties); provider.initialize(configDir, bootstrapProperties);
refreshConfiguration(); configurationChanged();
provider.registerListener(new CoreConfigurationListener());
provider.registerListener(new CoreConfigurationListener(SqoopConfiguration.getInstance()));
initialized = true; initialized = true;
} }
@ -176,10 +177,19 @@ public synchronized MapContext getContext() {
throw new SqoopException(CoreError.CORE_0007); throw new SqoopException(CoreError.CORE_0007);
} }
Map<String,String> parameters = new HashMap<String, String>(); return new MapContext(config);
parameters.putAll(config); }
return new MapContext(parameters); public synchronized MapContext getOldContext() {
if (!initialized) {
throw new SqoopException(CoreError.CORE_0007);
}
if (oldConfig == null) {
throw new SqoopException(CoreError.CORE_0008);
}
return new MapContext(oldConfig);
} }
public synchronized void destroy() { public synchronized void destroy() {
@ -193,6 +203,7 @@ public synchronized void destroy() {
provider = null; provider = null;
configDir = null; configDir = null;
config = null; config = null;
oldConfig = null;
initialized = false; initialized = false;
} }
@ -209,15 +220,28 @@ private synchronized void configureLogging() {
PropertyConfigurator.configure(props); PropertyConfigurator.configure(props);
} }
private synchronized void refreshConfiguration() { public ConfigurationProvider getProvider() {
return provider;
}
@Override
public synchronized void configurationChanged() {
oldConfig = config;
config = provider.getConfiguration(); config = provider.getConfiguration();
configureLogging(); configureLogging();
} }
public class CoreConfigurationListener implements ConfigurationListener { public static class CoreConfigurationListener implements ConfigurationListener {
private Reconfigurable listener;
public CoreConfigurationListener(Reconfigurable target) {
listener = target;
}
@Override @Override
public void configurationChanged() { public void configurationChanged() {
refreshConfiguration(); listener.configurationChanged();
} }
} }
} }

View File

@ -19,6 +19,9 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
@ -45,7 +48,7 @@
* be the fastest way and we might want to introduce internal structures for * be the fastest way and we might want to introduce internal structures for
* running jobs in case that this approach will be too slow. * running jobs in case that this approach will be too slow.
*/ */
public class FrameworkManager { public class FrameworkManager implements Reconfigurable {
/** /**
* Logger object. * Logger object.
@ -141,6 +144,8 @@ public synchronized void initialize() {
// Register framework metadata in repository // Register framework metadata in repository
mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework); mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework);
SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
LOG.info("Submission manager initialized: OK"); LOG.info("Submission manager initialized: OK");
} }
@ -165,4 +170,11 @@ public ResourceBundle getBundle(Locale locale) {
FrameworkConstants.RESOURCE_BUNDLE_NAME, locale); FrameworkConstants.RESOURCE_BUNDLE_NAME, locale);
} }
@Override
public void configurationChanged() {
LOG.info("Begin framework manager reconfiguring");
// If there are configuration options for FrameworkManager,
// implement the reconfiguration procedure right here.
LOG.info("Framework manager reconfigured");
}
} }

View File

@ -22,7 +22,9 @@
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.*; import org.apache.sqoop.job.etl.*;
@ -40,7 +42,7 @@
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
public class JobManager { public class JobManager implements Reconfigurable {
/** /**
* Logger object. * Logger object.
*/ */
@ -248,6 +250,8 @@ public synchronized void initialize() {
updateThread = new UpdateThread(); updateThread = new UpdateThread();
updateThread.start(); updateThread.start();
SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
LOG.info("Submission manager initialized: OK"); LOG.info("Submission manager initialized: OK");
} }
public MSubmission submit(long jobId) { public MSubmission submit(long jobId) {
@ -495,6 +499,57 @@ private void update(MSubmission submission) {
RepositoryManager.getInstance().getRepository().updateSubmission(submission); RepositoryManager.getInstance().getRepository().updateSubmission(submission);
} }
@Override
public synchronized void configurationChanged() {
LOG.info("Begin submission engine manager reconfiguring");
MapContext newContext = SqoopConfiguration.getInstance().getContext();
MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
if (newSubmissionEngineClassName == null
|| newSubmissionEngineClassName.trim().length() == 0) {
throw new SqoopException(FrameworkError.FRAMEWORK_0001,
newSubmissionEngineClassName);
}
String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
LOG.warn("Submission engine cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
if (newExecutionEngineClassName == null
|| newExecutionEngineClassName.trim().length() == 0) {
throw new SqoopException(FrameworkError.FRAMEWORK_0007,
newExecutionEngineClassName);
}
String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
LOG.warn("Execution engine cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
// Set up worker threads
purgeThreshold = newContext.getLong(
FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
DEFAULT_PURGE_THRESHOLD
);
purgeSleep = newContext.getLong(
FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
DEFAULT_PURGE_SLEEP
);
purgeThread.interrupt();
updateSleep = newContext.getLong(
FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
DEFAULT_UPDATE_SLEEP
);
updateThread.interrupt();
LOG.info("Submission engine manager reconfigured.");
}
private class PurgeThread extends Thread { private class PurgeThread extends Thread {
public PurgeThread() { public PurgeThread() {

View File

@ -164,4 +164,103 @@ private void initializeRepositoryHandler() {
public synchronized Repository getRepository() { public synchronized Repository getRepository() {
return repository; return repository;
} }
@Override
public void configurationChanged() {
LOG.info("Begin JdbcRepository reconfiguring.");
JdbcRepositoryContext oldRepoContext = repoContext;
repoContext = new JdbcRepositoryContext(SqoopConfiguration.getInstance().getContext());
// reconfigure jdbc handler
String newJdbcHandlerClassName = repoContext.getHandlerClassName();
if (newJdbcHandlerClassName == null
|| newJdbcHandlerClassName.trim().length() == 0) {
throw new SqoopException(RepositoryError.JDBCREPO_0001,
newJdbcHandlerClassName);
}
String oldJdbcHandlerClassName = oldRepoContext.getHandlerClassName();
if (!newJdbcHandlerClassName.equals(oldJdbcHandlerClassName)) {
LOG.warn("Repository JDBC handler cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
// reconfigure jdbc driver
String newJdbcDriverClassName = repoContext.getDriverClass();
if (newJdbcDriverClassName == null
|| newJdbcDriverClassName.trim().length() == 0) {
throw new SqoopException(RepositoryError.JDBCREPO_0003,
newJdbcDriverClassName);
}
String oldJdbcDriverClassName = oldRepoContext.getDriverClass();
if (!newJdbcDriverClassName.equals(oldJdbcDriverClassName)) {
LOG.warn("Repository JDBC driver cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
// reconfigure max connection
connectionPool.setMaxActive(repoContext.getMaximumConnections());
// reconfigure the url of repository
String connectUrl = repoContext.getConnectionUrl();
String oldurl = oldRepoContext.getConnectionUrl();
if (connectUrl != null && !connectUrl.equals(oldurl)) {
LOG.warn("Repository URL cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
// if connection properties or transaction isolation option changes
boolean connFactoryChanged = false;
// compare connection properties
if (!connFactoryChanged) {
Properties oldProp = oldRepoContext.getConnectionProperties();
Properties newProp = repoContext.getConnectionProperties();
if (newProp.size() != oldProp.size()) {
connFactoryChanged = true;
} else {
for (Object key : newProp.keySet()) {
if (!newProp.getProperty((String) key).equals(oldProp.getProperty((String) key))) {
connFactoryChanged = true;
break;
}
}
}
}
// compare the transaction isolation option
if (!connFactoryChanged) {
String oldTxOption = oldRepoContext.getTransactionIsolation().toString();
String newTxOption = repoContext.getTransactionIsolation().toString();
if (!newTxOption.equals(oldTxOption)) {
connFactoryChanged = true;
}
}
if (connFactoryChanged) {
// try to reconfigure connection factory
try {
LOG.info("Reconfiguring Connection Factory.");
Properties jdbcProps = repoContext.getConnectionProperties();
ConnectionFactory connFactory =
new DriverManagerConnectionFactory(connectUrl, jdbcProps);
new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
handler.validationQuery(), false, false,
repoContext.getTransactionIsolation().getCode());
} catch (IllegalStateException ex) {
// failed to reconfigure connection factory
LOG.warn("Repository connection cannot be reconfigured currently. " +
"You might need to restart the server.");
}
}
// ignore the create schema option, because the repo url is not allowed to change
LOG.info("JdbcRepository reconfigured.");
}
} }

View File

@ -22,10 +22,12 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.utils.ClassUtils;
public class RepositoryManager { public class RepositoryManager implements Reconfigurable {
/** /**
* Logger object. * Logger object.
@ -120,6 +122,8 @@ public synchronized void initialize() {
throw new SqoopException(RepositoryError.REPO_0002); throw new SqoopException(RepositoryError.REPO_0002);
} }
SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
LOG.info("Repository initialized: OK"); LOG.info("Repository initialized: OK");
} }
@ -134,4 +138,29 @@ public synchronized void destroy() {
public synchronized Repository getRepository() { public synchronized Repository getRepository() {
return provider.getRepository(); return provider.getRepository();
} }
@Override
public synchronized void configurationChanged() {
LOG.info("Begin repository manager reconfiguring");
MapContext newContext = SqoopConfiguration.getInstance().getContext();
MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
String newProviderClassName = newContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
if (newProviderClassName == null
|| newProviderClassName.trim().length() == 0) {
throw new SqoopException(RepositoryError.REPO_0001,
RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
}
String oldProviderClassName = oldContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER);
if (!newProviderClassName.equals(oldProviderClassName)) {
LOG.warn("Repository provider cannot be replaced at the runtime. " +
"You might need to restart the server.");
}
provider.configurationChanged();
LOG.info("Repository manager reconfigured.");
}
} }

View File

@ -18,8 +18,9 @@
package org.apache.sqoop.repository; package org.apache.sqoop.repository;
import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.core.Reconfigurable;
public interface RepositoryProvider { public interface RepositoryProvider extends Reconfigurable {
void initialize(MapContext context); void initialize(MapContext context);