5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 01:19:38 +08:00

SQOOP-1736: Sqoop2: Driver missing during upgrade

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-14 17:20:05 -08:00
parent 68d5beeab4
commit 6406f573ff
5 changed files with 123 additions and 11 deletions

View File

@ -220,14 +220,14 @@ public MDriver registerDriver(final MDriver mDriver, final boolean autoUpgrade)
return (MDriver) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn);
if (existingDriverConfig == null) {
MDriver existingDriver = handler.findDriver(mDriver.getUniqueName(), conn);
if (existingDriver == null) {
handler.registerDriver(mDriver, conn);
return mDriver;
} else {
// We're currently not serializing version into repository
// so let's just compare the structure to see if we need upgrade.
if(!mDriver.equals(existingDriverConfig)) {
if(!mDriver.equals(existingDriver)) {
if (autoUpgrade) {
upgradeDriver(mDriver);
return mDriver;
@ -236,7 +236,7 @@ public Object doIt(Connection conn) {
"Driver: " + mDriver.getPersistenceId());
}
}
return existingDriverConfig;
return existingDriver;
}
}
});

View File

@ -425,7 +425,10 @@ public void createOrUpgradeRepository(Connection conn) {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn);
// Data modifications only for non-fresh install.
// force register HDFS-connector as a first class citizen in the connector list
// and re-associate old frameworks configs and connections/links with the new hdfs connector
// Data modifications only for non-fresh install hence the > 0 check
if (repositoryVersion > 0) {
LOG.info("Force registering the HDFS connector as a new configurable");
long hdfsConnectorId = registerHdfsConnector(conn);
@ -450,6 +453,17 @@ public void createOrUpgradeRepository(Connection conn) {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
// force register the driver as a first class citizen and re-associate the old framework configs with the new driver Id
// Data modifications only for non-fresh install hence the > 0 check
if (repositoryVersion > 0) {
LOG.info("Force registering the Driver as a new configurable");
long driverId = registerDriver(conn);
LOG.info("Finished Force registering of the driver as a new configurable");
LOG.info("Updating config and inputs for the driver.");
updateDriverConfigInput(conn, driverId);
LOG.info("Finished Updating config and inputs for the driver.");
}
}
// last step upgrade the repository version to the latest value in the code
@ -678,6 +692,7 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
* 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'.
* Also update the relevant inputs as well.
* @param conn
* @param hdfsConnectorId
*/
// NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
// tables instead of the latest config/link tables
@ -738,6 +753,74 @@ private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConn
"toJobConfig", "toJobConfig", Direction.TO.toString());
}
// NOTE: This upgrade code happened after the SQOOP-1498 renaming, hence it
// uses the configurable and config
@Deprecated
private void updateDriverConfigInput(Connection conn, long driverId) {
// update configs and inputs for driver
// update the name from throttling ==> throttlingConfig config and associate
// it with the driverId
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER, conn,
"throttlingConfig", "throttling");
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER, conn,
driverId, "throttlingConfig");
// nuke security.maxConnections
runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER, conn,
"security.maxConnections");
// nuke the security config since 1.99.3 we do not use it
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER, conn,
"security");
// update throttling.extractors ==> throttlingConfig.numExtractors
runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn,
"throttlingConfig.numExtractors", "throttling.extractors");
// update throttling.loaders ==> throttlingConfig.numLoaders
runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn,
"throttlingConfig.numLoaders", "throttling.loaders");
}
/**
* Pre-register Driver since the 1.99.3 release NOTE: This should be used only
* in the upgrade path
*/
@Deprecated
protected long registerDriver(Connection conn) {
if (LOG.isTraceEnabled()) {
LOG.trace("Begin Driver loading.");
}
PreparedStatement baseDriverStmt = null;
try {
baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
Statement.RETURN_GENERATED_KEYS);
baseDriverStmt.setString(1, MDriver.DRIVER_NAME);
baseDriverStmt.setString(2, Driver.getClassName());
baseDriverStmt.setString(3, "1");
baseDriverStmt.setString(4, MConfigurableType.DRIVER.name());
int baseDriverCount = baseDriverStmt.executeUpdate();
if (baseDriverCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount));
}
ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
if (!rsetDriverId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
return rsetDriverId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0050, ex);
} finally {
closeStatements(baseDriverStmt);
}
}
/**
* Pre-register HDFS Connector so that config upgrade will work.
@ -762,10 +845,10 @@ protected long registerHdfsConnector(Connection conn) {
if (handler.getConnectorConfigurable().getPersistenceId() != -1) {
return handler.getConnectorConfigurable().getPersistenceId();
}
PreparedStatement baseConnectorStmt = null;
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
@ -782,6 +865,8 @@ protected long registerHdfsConnector(Connection conn) {
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
} finally {
closeStatements(baseConnectorStmt);
}
break;
@ -968,7 +1053,7 @@ private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register a driver config as a job type with no owner/connector and direction
// Register a driver config as a job type with no direction
registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);

View File

@ -200,6 +200,32 @@ public final class DerbySchemaUpgradeQuery {
"UPDATE " + TABLE_SQ_FORM + " SET "
+ COLUMN_SQF_DIRECTION + "= NULL"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
/** Intended for force driver creation and its related upgrades*/
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER =
"UPDATE " + TABLE_SQ_CONFIG + " SET "
+ COLUMN_SQ_CFG_NAME + "= ?"
+ " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER =
"UPDATE " + TABLE_SQ_CONFIG + " SET "
+ COLUMN_SQ_CFG_CONFIGURABLE + "= ?"
+ " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER =
"DELETE FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER =
"DELETE FROM " + TABLE_SQ_INPUT
+ " WHERE " + COLUMN_SQI_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER =
"UPDATE " + TABLE_SQ_INPUT + " SET "
+ COLUMN_SQI_NAME + "= ?"
+ " WHERE " + COLUMN_SQI_NAME + "= ?";
/**
* Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
* throttling form, to IMPORT throttling form.

View File

@ -79,6 +79,7 @@ public void testUpgradeRepoVersion2ToVersion4() throws Exception {
super.loadConnectionsOrLinks(2);
super.loadJobs(2);
handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
handler.findDriver("SqoopDriver", getDerbyDatabaseConnection());
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
}

View File

@ -38,12 +38,12 @@ public boolean runToolWithConfiguration(String[] arguments) {
LOG.info("Initializing the RepositoryManager with immutable option turned off.");
RepositoryManager.getInstance().initialize(false);
LOG.info("Initializing the Driver with upgrade option turned on.");
Driver.getInstance().initialize(true);
LOG.info("Initializing the Connection Manager with upgrade option turned on.");
ConnectorManager.getInstance().initialize(true);
LOG.info("Initializing the Driver with upgrade option turned on.");
Driver.getInstance().initialize(true);
LOG.info("Upgrade completed successfully.");
LOG.info("Tearing all managers down.");