diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 2d9bf4e7..095f3e9b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -127,11 +127,8 @@ public void createOrUpdateInternals() { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - if (!handler.schemaExists()) { - LOG.info("Creating repository schema objects"); - handler.createSchema(); - } - + LOG.info("Creating repository schema objects"); + handler.createOrUpdateInternals(conn); return null; } }); @@ -145,7 +142,7 @@ public boolean haveSuitableInternals() { return (Boolean) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - return handler.schemaExists(); + return handler.haveSuitableInternals(conn); } }); } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 538def56..d468b794 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -135,16 +135,24 @@ public abstract List findJobsForConnector(long connectorID, public abstract void registerFramework(MFramework mf, Connection conn); /** - * Check if schema is already present in the repository. + * Return true if repository tables exists and are suitable for use. * - * @return true if schema is already present or false if it's not + * This method should return false in case that the tables do exists, but + * are not suitable for use or if they requires upgrade. + * + * @return Boolean values if internal structures are suitable for use */ - public abstract boolean schemaExists(); + public abstract boolean haveSuitableInternals(Connection conn); /** - * Create required schema in repository. + * Create or update tables in the repository. + * + * This method will be called only if Sqoop server is enabled with changing + * repository on disk structures. Repository should not change its disk structures + * outside of this method. This method must be no-op in case that the structures + * do not need any maintenance. */ - public abstract void createSchema(); + public abstract void createOrUpdateInternals(Connection conn); /** * Termination callback for repository. diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConfigurationConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java similarity index 68% rename from repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConfigurationConstants.java rename to repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java index beb983c8..607b8d5f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConfigurationConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java @@ -17,11 +17,23 @@ */ package org.apache.sqoop.repository.derby; -public final class DerbyRepoConfigurationConstants { +public final class DerbyRepoConstants { - public static final String PREFIX_DERBY = "derby."; + public static final String CONF_PREFIX_DERBY = "derby."; - private DerbyRepoConfigurationConstants() { + public static final String SYSKEY_VERSION = "version"; + + /** + * Expected version of the repository structures. + * + * History: + * 0 - empty/unknown state + * 1 - First two releases (1.99.1, 1.99.2) + * 2 - added SQ_SYSTEM + */ + public static final int VERSION = 2; + + private DerbyRepoConstants() { // Disable explicit object creation } } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 4455f489..aeb7533d 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -169,6 +169,9 @@ public enum DerbyRepoError implements ErrorCode { /** Can't retrieve submissions for a job **/ DERBYREPO_0040("Can't retrieve submissions for a job"), + /** Can't detect version of the database structures **/ + DERBYREPO_0041("Can't detect version of repository storage"), + ; private final String message; diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 0ec4da1c..72c72bbf 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -30,9 +30,11 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import javax.sql.DataSource; @@ -245,64 +247,126 @@ public synchronized void shutdown() { } } + /** + * Detect version of underlying database structures. + * + * @param conn JDBC Connection + * @return + */ + public int detectVersion(Connection conn) { + ResultSet rs = null; + PreparedStatement stmt = null; + + // First release went out without system table, so we have to detect + // this version differently. + try { + rs = conn.getMetaData().getTables(null, null, null, null); + + Set tableNames = new HashSet(); + while(rs.next()) { + tableNames.add(rs.getString("TABLE_NAME")); + } + closeResultSets(rs); + + LOG.debug("Detecting old version of repository"); + boolean foundAll = true; + for( String expectedTable : DerbySchemaConstants.tablesV1) { + if(!tableNames.contains(expectedTable)) { + foundAll = false; + LOG.debug("Missing table " + expectedTable); + } + } + + // If we find all expected tables, then we are on version 1 + if(foundAll && !tableNames.contains(DerbySchemaConstants.TABLE_SQ_SYSTEM_NAME)) { + return 1; + } + + } catch (SQLException e) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0041, e); + } finally { + closeResultSets(rs); + } + + // Normal version detection, select and return the version + try { + stmt = conn.prepareStatement(STMT_SELECT_SYSTEM); + stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); + rs = stmt.executeQuery(); + + if(!rs.next()) { + return 0; + } + + return rs.getInt(1); + } catch (SQLException e) { + LOG.info("Can't fetch repository structure version.", e); + return 0; + } finally { + closeResultSets(rs); + closeStatements(stmt); + } + } + + /** * {@inheritDoc} */ @Override - public void createSchema() { - runQuery(QUERY_CREATE_SCHEMA_SQOOP); - runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR); - runQuery(QUERY_CREATE_TABLE_SQ_FORM); - runQuery(QUERY_CREATE_TABLE_SQ_INPUT); - runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION); - runQuery(QUERY_CREATE_TABLE_SQ_JOB); - runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT); - runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT); - runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION); - runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP); - runQuery(QUERY_CREATE_TABLE_SQ_COUNTER); - runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION); + public void createOrUpdateInternals(Connection conn) { + int version = detectVersion(conn); + + if(version <= 0) { + runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR, conn); + runQuery(QUERY_CREATE_TABLE_SQ_FORM, conn); + runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION, conn); + runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn); + runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn); + runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn); + runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn); + } + if(version <= 1) { + runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn); + } + + ResultSet rs = null; + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); + stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); + stmt.executeUpdate(); + + closeStatements(stmt); + + stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); + stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION); + stmt.setString(2, "" + DerbyRepoConstants.VERSION); + stmt.executeUpdate(); + } catch (SQLException e) { + LOG.error("Can't persist the repository version", e); + } finally { + closeResultSets(rs); + closeStatements(stmt); + } } /** * {@inheritDoc} */ @Override - public boolean schemaExists() { - Connection connection = null; - Statement stmt = null; - try { - connection = dataSource.getConnection(); - stmt = connection.createStatement(); - ResultSet rset = stmt.executeQuery(QUERY_SYSSCHEMA_SQOOP); + public boolean haveSuitableInternals(Connection conn) { + int version = detectVersion(conn); - if (!rset.next()) { - LOG.warn("Schema for SQOOP does not exist"); - return false; - } - String sqoopSchemaId = rset.getString(1); - LOG.debug("SQOOP schema ID: " + sqoopSchemaId); - connection.commit(); - } catch (SQLException ex) { - if (connection != null) { - try { - connection.rollback(); - } catch (SQLException ex2) { - LOG.error("Unable to rollback transaction", ex2); - } - } - throw new SqoopException(DerbyRepoError.DERBYREPO_0001, ex); - } finally { - closeStatements(stmt); - if (connection != null) { - try { - connection.close(); - } catch (SQLException ex) { - LOG.error("Unable to close connection", ex); - } - } + if(version != DerbyRepoConstants.VERSION) { + return false; } + // TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation) return true; } @@ -1656,52 +1720,27 @@ private void registerFormInputs(long formId, List> inputs, /** * Execute given query on database. * - * Passed query will be executed in it's own transaction - * * @param query Query that should be executed */ - private void runQuery(String query) { - Connection connection = null; + private void runQuery(String query, Connection conn) { Statement stmt = null; try { - connection = dataSource.getConnection(); - stmt = connection.createStatement(); + stmt = conn.createStatement(); if (stmt.execute(query)) { ResultSet rset = stmt.getResultSet(); int count = 0; while (rset.next()) { count++; } - LOG.info("QUERY(" + query + ") produced unused resultset with " - + count + " rows"); + LOG.info("QUERY(" + query + ") produced unused resultset with "+ count + " rows"); } else { int updateCount = stmt.getUpdateCount(); LOG.info("QUERY(" + query + ") Update count: " + updateCount); } - connection.commit(); } catch (SQLException ex) { - try { - connection.rollback(); - } catch (SQLException ex2) { - LOG.error("Unable to rollback transaction", ex2); - } - throw new SqoopException(DerbyRepoError.DERBYREPO_0003, - query, ex); + throw new SqoopException(DerbyRepoError.DERBYREPO_0003, query, ex); } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException ex) { - LOG.error("Unable to close statement", ex); - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException ex) { - LOG.error("Unable to close connection", ex); - } - } - } + closeStatements(stmt); } } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java index 8cdbc63d..68cb1c08 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.repository.derby; +import java.util.HashSet; +import java.util.Set; + public final class DerbySchemaConstants { public static final String SCHEMA_SQOOP = "SQOOP"; @@ -25,6 +28,19 @@ public final class DerbySchemaConstants { private static final String CONSTRAINT_PREFIX = "FK_"; + // SQ_SYSTEM + + public static final String TABLE_SQ_SYSTEM_NAME = "SQ_SYSTEM"; + + public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX + + TABLE_SQ_SYSTEM_NAME; + + public static final String COLUMN_SQM_ID = "SQM_ID"; + + public static final String COLUMN_SQM_KEY = "SQM_KEY"; + + public static final String COLUMN_SQM_VALUE = "SQM_VALUE"; + // SQ_CONNECTOR public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR"; @@ -260,6 +276,26 @@ public final class DerbySchemaConstants { public static final String CONSTRAINT_SQRS_SQS = SCHEMA_PREFIX + CONSTRAINT_SQRS_SQS_NAME; + /** + * List of expected tables for first version; + * This list here is for backward compatibility. + */ + public static final Set tablesV1; + static { + tablesV1 = new HashSet(); + tablesV1.add(TABLE_SQ_CONNECTOR_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME); + tablesV1.add(TABLE_SQ_COUNTER_NAME); + tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME); + tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME); + tablesV1.add(TABLE_SQ_FORM_NAME); + tablesV1.add(TABLE_SQ_INPUT_NAME); + tablesV1.add(TABLE_SQ_JOB_NAME); + tablesV1.add(TABLE_SQ_JOB_INPUT_NAME); + tablesV1.add(TABLE_SQ_SUBMISSION_NAME); + } + private DerbySchemaConstants() { // Disable explicit object creation } diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index 7a9ce505..b2cd6cc6 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -22,7 +22,18 @@ /** * DDL queries that create the Sqoop repository schema in Derby database. These * queries create the following tables: - * + *

+ * SQ_SYSTEM: Store for various state information + *

+ *    +----------------------------+
+ *    | SQ_SYSTEM                  |
+ *    +----------------------------+
+ *    | SQM_ID: BIGINT PK          |
+ *    | SQM_KEY: VARCHAR(64)       |
+ *    | SQM_VALUE: VARCHAR(64)     |
+ *    +----------------------------+
+ * 
+ *

*

* SQ_CONNECTOR: Connector registration. *

@@ -185,6 +196,14 @@ public final class DerbySchemaQuery {
     "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
     + SCHEMA_SQOOP + "'";
 
+  // DDL: Create table SQ_SYSTEM
+  public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
+    "CREATE TABLE " + TABLE_SQ_SYSTEM + " ("
+    + COLUMN_SQM_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+    + COLUMN_SQM_KEY + " VARCHAR(64), "
+    + COLUMN_SQM_VALUE + " VARCHAR(64) "
+    + ")";
+
   // DDL: Create table SQ_CONNECTOR
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
       "CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
@@ -336,6 +355,25 @@ public final class DerbySchemaQuery {
         + "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
     + ")";
 
+  // DML: Get system key
+  public static final String STMT_SELECT_SYSTEM =
+    "SELECT "
+    + COLUMN_SQM_VALUE
+    + " FROM " + TABLE_SQ_SYSTEM
+    + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+  // DML: Remove system key
+  public static final String STMT_DELETE_SYSTEM =
+    "DELETE FROM "  + TABLE_SQ_SYSTEM
+    + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+  // DML: Insert new system key
+  public static final String STMT_INSERT_SYSTEM =
+    "INSERT INTO " + TABLE_SQ_SYSTEM + "("
+    + COLUMN_SQM_KEY + ", "
+    + COLUMN_SQM_VALUE + ") "
+    + "VALUES(?, ?)";
+
   // DML: Fetch connector Given Name
   public static final String STMT_FETCH_BASE_CONNECTOR =
       "SELECT "
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 5ace598e..677b0bee 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -98,6 +98,8 @@ protected void createSchema() throws Exception {
     runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
     runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
     runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
+    runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM);
+    runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '2')");
   }
 
   /**
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
new file mode 100644
index 00000000..25e6196d
--- /dev/null
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.derby;
+
+/**
+ *
+ */
+public class TestInternals extends DerbyTestCase {
+
+  DerbyRepositoryHandler handler;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    handler = new DerbyRepositoryHandler();
+  }
+
+  public void testSuitableInternals() throws Exception {
+    assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
+    createSchema(); // Test code is building the structures
+    assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
+  }
+
+  public void testCreateorUpdateInternals() throws Exception {
+    assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
+    handler.createOrUpdateInternals(getDerbyConnection());
+    assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
+  }
+
+
+}