diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 4fb9afdf..5a76af6f 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -220,14 +220,14 @@ public MDriver registerDriver(final MDriver mDriver, final boolean autoUpgrade) return (MDriver) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn); - if (existingDriverConfig == null) { + MDriver existingDriver = handler.findDriver(mDriver.getUniqueName(), conn); + if (existingDriver == null) { handler.registerDriver(mDriver, conn); return mDriver; } else { // We're currently not serializing version into repository // so let's just compare the structure to see if we need upgrade. - if(!mDriver.equals(existingDriverConfig)) { + if(!mDriver.equals(existingDriver)) { if (autoUpgrade) { upgradeDriver(mDriver); return mDriver; @@ -236,7 +236,7 @@ public Object doIt(Connection conn) { "Driver: " + mDriver.getPersistenceId()); } } - return existingDriverConfig; + return existingDriver; } } }); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index f152859a..060f2962 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -425,7 +425,10 @@ public void createOrUpgradeRepository(Connection conn) { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn); - // Data modifications only for non-fresh install. + // force register HDFS-connector as a first class citizen in the connector list + // and re-associate old frameworks configs and connections/links with the new hdfs connector + // Data modifications only for non-fresh install hence the > 0 check + if (repositoryVersion > 0) { LOG.info("Force registering the HDFS connector as a new configurable"); long hdfsConnectorId = registerHdfsConnector(conn); @@ -450,6 +453,17 @@ public void createOrUpgradeRepository(Connection conn) { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn); + // force register the driver as a first class citizen and re-associate the old framework configs with the new driver Id + // Data modifications only for non-fresh install hence the > 0 check + if (repositoryVersion > 0) { + LOG.info("Force registering the Driver as a new configurable"); + long driverId = registerDriver(conn); + LOG.info("Finished Force registering of the driver as a new configurable"); + + LOG.info("Updating config and inputs for the driver."); + updateDriverConfigInput(conn, driverId); + LOG.info("Finished Updating config and inputs for the driver."); + } } // last step upgrade the repository version to the latest value in the code @@ -678,6 +692,7 @@ protected void updateDirections(Connection conn, Map directionM * 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'. * Also update the relevant inputs as well. * @param conn + * @param hdfsConnectorId */ // NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection // tables instead of the latest config/link tables @@ -738,6 +753,74 @@ private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConn "toJobConfig", "toJobConfig", Direction.TO.toString()); } + // NOTE: This upgrade code happened after the SQOOP-1498 renaming, hence it + // uses the configurable and config + @Deprecated + private void updateDriverConfigInput(Connection conn, long driverId) { + + // update configs and inputs for driver + // update the name from throttling ==> throttlingConfig config and associate + // it with the driverId + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER, conn, + "throttlingConfig", "throttling"); + + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER, conn, + driverId, "throttlingConfig"); + + // nuke security.maxConnections + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER, conn, + "security.maxConnections"); + + // nuke the security config since 1.99.3 we do not use it + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER, conn, + "security"); + + // update throttling.extractors ==> throttlingConfig.numExtractors + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn, + "throttlingConfig.numExtractors", "throttling.extractors"); + + // update throttling.loaders ==> throttlingConfig.numLoaders + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn, + "throttlingConfig.numLoaders", "throttling.loaders"); + + } + + /** + * Pre-register Driver since the 1.99.3 release NOTE: This should be used only + * in the upgrade path + */ + @Deprecated + protected long registerDriver(Connection conn) { + if (LOG.isTraceEnabled()) { + LOG.trace("Begin Driver loading."); + } + + PreparedStatement baseDriverStmt = null; + try { + baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); + baseDriverStmt.setString(1, MDriver.DRIVER_NAME); + baseDriverStmt.setString(2, Driver.getClassName()); + baseDriverStmt.setString(3, "1"); + baseDriverStmt.setString(4, MConfigurableType.DRIVER.name()); + + int baseDriverCount = baseDriverStmt.executeUpdate(); + if (baseDriverCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount)); + } + + ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys(); + + if (!rsetDriverId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + return rsetDriverId.getLong(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0050, ex); + } finally { + closeStatements(baseDriverStmt); + } + } /** * Pre-register HDFS Connector so that config upgrade will work. @@ -762,10 +845,10 @@ protected long registerHdfsConnector(Connection conn) { if (handler.getConnectorConfigurable().getPersistenceId() != -1) { return handler.getConnectorConfigurable().getPersistenceId(); } - + PreparedStatement baseConnectorStmt = null; if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { try { - PreparedStatement baseConnectorStmt = conn.prepareStatement( + baseConnectorStmt = conn.prepareStatement( STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); @@ -782,6 +865,8 @@ protected long registerHdfsConnector(Connection conn) { } } catch (SQLException e) { throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } finally { + closeStatements(baseConnectorStmt); } break; @@ -968,7 +1053,7 @@ private void insertConfigsforDriver(MDriver mDriver, Connection conn) { baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); - // Register a driver config as a job type with no owner/connector and direction + // Register a driver config as a job type with no direction registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java index 968cf2b8..37311e68 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java @@ -200,6 +200,32 @@ public final class DerbySchemaUpgradeQuery { "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= NULL" + " WHERE " + COLUMN_SQF_NAME + "= ?"; + + /** Intended for force driver creation and its related upgrades*/ + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER = + "UPDATE " + TABLE_SQ_CONFIG + " SET " + + COLUMN_SQ_CFG_NAME + "= ?" + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER = + "UPDATE " + TABLE_SQ_CONFIG + " SET " + + COLUMN_SQ_CFG_CONFIGURABLE + "= ?" + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER = + "DELETE FROM " + TABLE_SQ_CONFIG + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER = + "DELETE FROM " + TABLE_SQ_INPUT + + " WHERE " + COLUMN_SQI_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER = + "UPDATE " + TABLE_SQ_INPUT + " SET " + + COLUMN_SQI_NAME + "= ?" + + " WHERE " + COLUMN_SQI_NAME + "= ?"; + /** * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT * throttling form, to IMPORT throttling form. diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java index 7687be78..9792d9b3 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java @@ -79,6 +79,7 @@ public void testUpgradeRepoVersion2ToVersion4() throws Exception { super.loadConnectionsOrLinks(2); super.loadJobs(2); handler.createOrUpgradeRepository(getDerbyDatabaseConnection()); + handler.findDriver("SqoopDriver", getDerbyDatabaseConnection()); assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection())); } diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java index f117411b..ba88ddd7 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java @@ -38,12 +38,12 @@ public boolean runToolWithConfiguration(String[] arguments) { LOG.info("Initializing the RepositoryManager with immutable option turned off."); RepositoryManager.getInstance().initialize(false); - LOG.info("Initializing the Driver with upgrade option turned on."); - Driver.getInstance().initialize(true); - LOG.info("Initializing the Connection Manager with upgrade option turned on."); ConnectorManager.getInstance().initialize(true); + LOG.info("Initializing the Driver with upgrade option turned on."); + Driver.getInstance().initialize(true); + LOG.info("Upgrade completed successfully."); LOG.info("Tearing all managers down.");