diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index d4528b11..2235a1f4 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -412,6 +412,7 @@ private void deleteJobs(List jobs, RepositoryTransaction tx) { public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) { LOG.info("Upgrading connector: " + oldConnector.getUniqueName()); long connectorId = oldConnector.getPersistenceId(); + String connectorName = oldConnector.getUniqueName(); newConnector.setPersistenceId(connectorId); RepositoryTransaction tx = null; @@ -439,6 +440,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec // dont always rely on the repository implementation to return empty list for links if (existingLinksByConnector != null) { for (MLink link : existingLinksByConnector) { + LOG.info(" Link upgrade for link:" + link.getName() + " for connector:" + connectorName); // Make a new copy of the configs List linkConfig = newConnector.getLinkConfig().clone(false).getConfigs(); MLinkConfig newLinkConfig = new MLinkConfig(linkConfig); @@ -458,14 +460,17 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec // and stop the bootup of Sqoop server logInvalidModelObject("link", newlink, validationResult); upgradeSuccessful = false; + LOG.info(" LINK config upgrade FAILED for link: " + link.getName() + " for connector:" + connectorName); } } } + LOG.info(" All Link and configs for this connector processed"); // 8. Run upgrade logic for the configs related to the job objects if (existingJobsByConnector != null) { for (MJob job : existingJobsByConnector) { // every job has 2 parts, the FROM and the TO links and their // corresponding connectors. + LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName); List fromConfig = newConnector.getFromConfig().clone(false).getConfigs(); if (job.getFromConnectorId() == newConnector.getPersistenceId()) { MFromConfig newFromConfig = new MFromConfig(fromConfig); @@ -484,8 +489,9 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec if(validationResult.getStatus().canProceed()) { updateJob(newJob, tx); } else { - logInvalidModelObject("job", newJob, validationResult); + logInvalidModelObject("fromJob", newJob, validationResult); upgradeSuccessful = false; + LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); } } @@ -507,17 +513,18 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec if(validationResult.getStatus().canProceed()) { updateJob(newJob, tx); } else { - logInvalidModelObject("job", newJob, validationResult); + logInvalidModelObject("toJob", newJob, validationResult); upgradeSuccessful = false; + LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName); } } } } - + LOG.info(" All Job and configs for this connector processed"); if (upgradeSuccessful) { tx.commit(); } else { - throw new SqoopException(RepositoryError.JDBCREPO_0027); + throw new SqoopException(RepositoryError.JDBCREPO_0027, " for connector:" + connectorName); } } catch (SqoopException ex) { if (tx != null) { @@ -533,7 +540,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec if (tx != null) { tx.close(); } - LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName()); + LOG.info("Connector upgrade finished for: " + connectorName); } } @@ -582,7 +589,7 @@ public final void upgradeDriver(MDriver driver) { if (upgradeSuccessful) { tx.commit(); } else { - throw new SqoopException(RepositoryError.JDBCREPO_0027); + throw new SqoopException(RepositoryError.JDBCREPO_0027, " Driver"); } } catch (SqoopException ex) { if(tx != null) { diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index aa3a1e4b..f152859a 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -20,6 +20,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaCreateQuery.*; import static org.apache.sqoop.repository.derby.DerbySchemaInsertUpdateDeleteSelectQuery.*; import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*; + import java.net.URL; import java.sql.Connection; import java.sql.DriverManager; @@ -426,11 +427,16 @@ public void createOrUpgradeRepository(Connection conn) { // Data modifications only for non-fresh install. if (repositoryVersion > 0) { - // Register HDFS connector - updateJobRepositorySchemaAndData(conn, registerHdfsConnector(conn)); + LOG.info("Force registering the HDFS connector as a new configurable"); + long hdfsConnectorId = registerHdfsConnector(conn); + LOG.info("Finished Force registering the HDFS connector as a new configurable"); + + LOG.info("Updating config and inputs for the hdfs connector."); + updateJobConfigInputForHdfsConnector(conn, hdfsConnectorId); + LOG.info("Finished Updating config and inputs for the hdfs connector."); } - // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data. + // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update all the job data. // Data updates depend on knowledge of the type of job. runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn); @@ -645,13 +651,10 @@ protected void updateDirections(Connection conn, Map directionM /** * Upgrade job data from IMPORT/EXPORT to FROM/TO. - * Since the framework is no longer responsible for HDFS, + * Since Sqoop is no longer responsible for HDFS, * the HDFS connector/link must be added. - * Also, the framework configs are moved around such that - * they belong to the added HDFS connector. Any extra configs - * are removed. * NOTE: Connector configs should have a direction (FROM/TO), - * but framework configs should not. + * but link configs should not. * * Here's a brief list describing the data migration process. * 1. Change SQ_CONFIG.SQ_CFG_DIRECTION from IMPORT to FROM. @@ -663,28 +666,23 @@ protected void updateDirections(Connection conn, Map directionM * This should affect connectors only since Connector configs * should have had a value for SQ_CFG_OPERATION. * 5. Add HDFS connector for jobs to reference. - * 6. Set 'input' and 'output' configs connector. - * to HDFS connector. - * 7. Throttling config was originally the second config in - * the framework. It should now be the first config. + * 6. Set 'fromJobConfig' and 'toJobConfig' configs for HDFS connector. + * 7. Add 'linkConfig' and 'linkConfig.uri' to the configs for the hdfs * 8. Remove the EXPORT throttling config and ensure all of * its dependencies point to the IMPORT throttling config. * Then make sure the throttling config does not have a direction. - * Framework configs should not have a direction. * 9. Create an HDFS link to reference and update * jobs to reference that link. IMPORT jobs * should have TO HDFS connector, EXPORT jobs should have * FROM HDFS connector. - * 10. Update 'table' config names to 'fromJobConfig' and 'toTable'. + * 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'. * Also update the relevant inputs as well. * @param conn */ // NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection // tables instead of the latest config/link tables - private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Updating existing data for generic connectors."); - } + @Deprecated + private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConnectorId) { runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn, Direction.FROM.toString(), "IMPORT"); @@ -699,7 +697,21 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) "output"); runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn, - new Long(connectorId), "input", "output"); + new Long(hdfsConnectorId), "input", "output"); + //update the names of the configs + // 1. input ==> fromJobConfig + runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn, + "fromJobConfig", + "input"); + // 2. output ===> toJobConfig + runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn, + "toJobConfig", + "output"); + + // create the link config + Long linkFormId = createHdfsLinkForm(conn, hdfsConnectorId); + // create the link config input + runQuery(STMT_INSERT_INTO_INPUT_WITH_FORM, conn, "linkConfig.uri", linkFormId, 0, MInputType.STRING.name(), false, 255, null); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn, "IMPORT", "EXPORT"); @@ -712,7 +724,7 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn, new Long(0), "throttling"); - Long connectionId = createHdfsConnection(conn, connectorId); + Long connectionId = createHdfsConnection(conn, hdfsConnectorId); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn, "EXPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn, @@ -720,20 +732,13 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn, new Long(connectionId), "IMPORT"); - runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn, - "fromJobConfig", "table", Direction.FROM.toString()); - runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn, - Direction.FROM.toString().toLowerCase(), "fromJobConfig", Direction.FROM.toString()); - runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn, - "toJobConfig", "table", Direction.TO.toString()); - runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn, - Direction.TO.toString().toLowerCase(), "toJobConfig", Direction.TO.toString()); - - if (LOG.isTraceEnabled()) { - LOG.trace("Updated existing data for generic connectors."); - } + runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES, conn, + "fromJobConfig", "fromJobConfig", Direction.FROM.toString()); + runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES, conn, + "toJobConfig", "toJobConfig", Direction.TO.toString()); } + /** * Pre-register HDFS Connector so that config upgrade will work. * NOTE: This should be used only in the upgrade path @@ -761,7 +766,7 @@ protected long registerHdfsConnector(Connection conn) { if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { try { PreparedStatement baseConnectorStmt = conn.prepareStatement( - STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS, + STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName()); @@ -814,13 +819,13 @@ private Long createHdfsConnection(Connection conn, Long connectorId) { result = stmt.executeUpdate(); if (result != 1) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0012, + throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(result)); } ResultSet rsetConnectionId = stmt.getGeneratedKeys(); if (!rsetConnectionId.next()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + throw new SqoopException(DerbyRepoError.DERBYREPO_0004); } if (LOG.isTraceEnabled()) { @@ -828,6 +833,49 @@ private Long createHdfsConnection(Connection conn, Long connectorId) { } return rsetConnectionId.getLong(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex); + } finally { + closeStatements(stmt); + } + } + + + /** + * We are creating the LINK FORM for HDFS and later it the schema will + * be renamed to LINK CONFIG + * NOTE: Should be used only in the upgrade path! + */ + @Deprecated + private Long createHdfsLinkForm(Connection conn, Long connectorId) { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating HDFS link."); + } + + PreparedStatement stmt = null; + int result; + try { + short index = 0; + stmt = conn.prepareStatement(STMT_INSERT_INTO_FORM, Statement.RETURN_GENERATED_KEYS); + stmt.setLong(1, connectorId); + stmt.setString(2, "linkConfig"); + // it could also be set to the deprecated "CONNECTION" + stmt.setString(3, MConfigType.LINK.name()); + stmt.setShort(4, index); + result = stmt.executeUpdate(); + if (result != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(result)); + } + ResultSet rsetFormId = stmt.getGeneratedKeys(); + + if (!rsetFormId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Created HDFS connector link FORM."); + } + return rsetFormId.getLong(1); } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex); } finally { diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java index f349a757..2e24805e 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaInsertUpdateDeleteSelectQuery.java @@ -103,7 +103,7 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery { + ") VALUES (?, ?, ?, ?)"; @Deprecated // used only in the upgrade path - public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS = + public static final String STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS = "INSERT INTO " + TABLE_SQ_CONNECTOR+ " (" + COLUMN_SQC_NAME + ", " + COLUMN_SQC_CLASS + ", " diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java index 0bdb4c62..968cf2b8 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java @@ -119,6 +119,28 @@ public final class DerbySchemaUpgradeQuery { "RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION + " TO " + COLUMN_SQF_DIRECTION; + //DML: Insert into form + public static final String STMT_INSERT_INTO_FORM = + "INSERT INTO " + TABLE_SQ_FORM+ " (" + + COLUMN_SQF_CONNECTOR + ", " + + COLUMN_SQF_NAME + ", " + + COLUMN_SQF_TYPE + ", " + + COLUMN_SQF_INDEX + + ") VALUES ( ?, ?, ?, ?)"; + + // DML: Insert into inpu with form name + public static final String STMT_INSERT_INTO_INPUT_WITH_FORM = + "INSERT INTO " + TABLE_SQ_INPUT + " (" + + COLUMN_SQI_NAME + ", " + + COLUMN_SQI_FORM + ", " + + COLUMN_SQI_INDEX + ", " + + COLUMN_SQI_TYPE + ", " + + COLUMN_SQI_STRMASK + ", " + + COLUMN_SQI_STRLENGTH + ", " + + COLUMN_SQI_ENUMVALS + + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; + + public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION = "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "=? WHERE " + COLUMN_SQF_DIRECTION + "=?" @@ -129,6 +151,10 @@ public final class DerbySchemaUpgradeQuery { + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND " + COLUMN_SQF_NAME + " IN (?, ?)"; + public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME = + "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_NAME + "= ?" + + " WHERE " + COLUMN_SQF_NAME + "= ?"; + public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION = "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?" + " WHERE " + COLUMN_SQF_NAME + "= ?"; @@ -152,16 +178,20 @@ public final class DerbySchemaUpgradeQuery { + " WHERE " + COLUMN_SQF_NAME + "= ?" + " AND " + COLUMN_SQF_DIRECTION + "= ?"; - /** - * Intended to rename forms based on direction. - * e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from' - * then SQ_FORM.SQF_NAME = 'fromTable'. - */ - public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES = + // remove "input" from the prefix of the name for hdfs configs + public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES = "UPDATE " + TABLE_SQ_INPUT + " SET " + COLUMN_SQI_NAME + "=(" - + "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))" - + " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )" + + "? || SUBSTR(" + COLUMN_SQI_NAME + ", 6) )" + + " WHERE " + COLUMN_SQI_FORM + " IN (" + + " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?" + + " AND " + COLUMN_SQF_DIRECTION + "= ?)"; + + // remove output from the prefix of the name for hdfs configs + public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES = + "UPDATE " + TABLE_SQ_INPUT + " SET " + + COLUMN_SQI_NAME + "=(" + + "? || SUBSTR(" + COLUMN_SQI_NAME + ", 7) )" + " WHERE " + COLUMN_SQI_FORM + " IN (" + " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?" + " AND " + COLUMN_SQF_DIRECTION + "= ?)";