mirror of
https://github.com/apache/sqoop.git
synced 2025-05-10 20:52:21 +08:00
SQOOP-1111. Sqoop2: Implement version detection and structure updates into Derby repository
(Jarek Jarcec Cecho via Hari Shreedharan)
This commit is contained in:
parent
debf60d5cf
commit
76fbc835c9
@ -127,11 +127,8 @@ public void createOrUpdateInternals() {
|
|||||||
doWithConnection(new DoWithConnection() {
|
doWithConnection(new DoWithConnection() {
|
||||||
@Override
|
@Override
|
||||||
public Object doIt(Connection conn) throws Exception {
|
public Object doIt(Connection conn) throws Exception {
|
||||||
if (!handler.schemaExists()) {
|
LOG.info("Creating repository schema objects");
|
||||||
LOG.info("Creating repository schema objects");
|
handler.createOrUpdateInternals(conn);
|
||||||
handler.createSchema();
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -145,7 +142,7 @@ public boolean haveSuitableInternals() {
|
|||||||
return (Boolean) doWithConnection(new DoWithConnection() {
|
return (Boolean) doWithConnection(new DoWithConnection() {
|
||||||
@Override
|
@Override
|
||||||
public Object doIt(Connection conn) throws Exception {
|
public Object doIt(Connection conn) throws Exception {
|
||||||
return handler.schemaExists();
|
return handler.haveSuitableInternals(conn);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -135,16 +135,24 @@ public abstract List<MJob> findJobsForConnector(long connectorID,
|
|||||||
public abstract void registerFramework(MFramework mf, Connection conn);
|
public abstract void registerFramework(MFramework mf, Connection conn);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if schema is already present in the repository.
|
* Return true if repository tables exists and are suitable for use.
|
||||||
*
|
*
|
||||||
* @return true if schema is already present or false if it's not
|
* This method should return false in case that the tables do exists, but
|
||||||
|
* are not suitable for use or if they requires upgrade.
|
||||||
|
*
|
||||||
|
* @return Boolean values if internal structures are suitable for use
|
||||||
*/
|
*/
|
||||||
public abstract boolean schemaExists();
|
public abstract boolean haveSuitableInternals(Connection conn);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create required schema in repository.
|
* Create or update tables in the repository.
|
||||||
|
*
|
||||||
|
* This method will be called only if Sqoop server is enabled with changing
|
||||||
|
* repository on disk structures. Repository should not change its disk structures
|
||||||
|
* outside of this method. This method must be no-op in case that the structures
|
||||||
|
* do not need any maintenance.
|
||||||
*/
|
*/
|
||||||
public abstract void createSchema();
|
public abstract void createOrUpdateInternals(Connection conn);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Termination callback for repository.
|
* Termination callback for repository.
|
||||||
|
@ -17,11 +17,23 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.repository.derby;
|
package org.apache.sqoop.repository.derby;
|
||||||
|
|
||||||
public final class DerbyRepoConfigurationConstants {
|
public final class DerbyRepoConstants {
|
||||||
|
|
||||||
public static final String PREFIX_DERBY = "derby.";
|
public static final String CONF_PREFIX_DERBY = "derby.";
|
||||||
|
|
||||||
private DerbyRepoConfigurationConstants() {
|
public static final String SYSKEY_VERSION = "version";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Expected version of the repository structures.
|
||||||
|
*
|
||||||
|
* History:
|
||||||
|
* 0 - empty/unknown state
|
||||||
|
* 1 - First two releases (1.99.1, 1.99.2)
|
||||||
|
* 2 - added SQ_SYSTEM
|
||||||
|
*/
|
||||||
|
public static final int VERSION = 2;
|
||||||
|
|
||||||
|
private DerbyRepoConstants() {
|
||||||
// Disable explicit object creation
|
// Disable explicit object creation
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -169,6 +169,9 @@ public enum DerbyRepoError implements ErrorCode {
|
|||||||
/** Can't retrieve submissions for a job **/
|
/** Can't retrieve submissions for a job **/
|
||||||
DERBYREPO_0040("Can't retrieve submissions for a job"),
|
DERBYREPO_0040("Can't retrieve submissions for a job"),
|
||||||
|
|
||||||
|
/** Can't detect version of the database structures **/
|
||||||
|
DERBYREPO_0041("Can't detect version of repository storage"),
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final String message;
|
private final String message;
|
||||||
|
@ -30,9 +30,11 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
@ -245,64 +247,126 @@ public synchronized void shutdown() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Detect version of underlying database structures.
|
||||||
|
*
|
||||||
|
* @param conn JDBC Connection
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public int detectVersion(Connection conn) {
|
||||||
|
ResultSet rs = null;
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
|
||||||
|
// First release went out without system table, so we have to detect
|
||||||
|
// this version differently.
|
||||||
|
try {
|
||||||
|
rs = conn.getMetaData().getTables(null, null, null, null);
|
||||||
|
|
||||||
|
Set<String> tableNames = new HashSet<String>();
|
||||||
|
while(rs.next()) {
|
||||||
|
tableNames.add(rs.getString("TABLE_NAME"));
|
||||||
|
}
|
||||||
|
closeResultSets(rs);
|
||||||
|
|
||||||
|
LOG.debug("Detecting old version of repository");
|
||||||
|
boolean foundAll = true;
|
||||||
|
for( String expectedTable : DerbySchemaConstants.tablesV1) {
|
||||||
|
if(!tableNames.contains(expectedTable)) {
|
||||||
|
foundAll = false;
|
||||||
|
LOG.debug("Missing table " + expectedTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we find all expected tables, then we are on version 1
|
||||||
|
if(foundAll && !tableNames.contains(DerbySchemaConstants.TABLE_SQ_SYSTEM_NAME)) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0041, e);
|
||||||
|
} finally {
|
||||||
|
closeResultSets(rs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal version detection, select and return the version
|
||||||
|
try {
|
||||||
|
stmt = conn.prepareStatement(STMT_SELECT_SYSTEM);
|
||||||
|
stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
|
||||||
|
rs = stmt.executeQuery();
|
||||||
|
|
||||||
|
if(!rs.next()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs.getInt(1);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
LOG.info("Can't fetch repository structure version.", e);
|
||||||
|
return 0;
|
||||||
|
} finally {
|
||||||
|
closeResultSets(rs);
|
||||||
|
closeStatements(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void createSchema() {
|
public void createOrUpdateInternals(Connection conn) {
|
||||||
runQuery(QUERY_CREATE_SCHEMA_SQOOP);
|
int version = detectVersion(conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
|
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_FORM);
|
if(version <= 0) {
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
|
runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION);
|
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_JOB);
|
runQuery(QUERY_CREATE_TABLE_SQ_FORM, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
|
runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
|
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
|
runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
|
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
|
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
|
runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
|
||||||
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
|
||||||
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn);
|
||||||
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
|
||||||
|
}
|
||||||
|
if(version <= 1) {
|
||||||
|
runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResultSet rs = null;
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
try {
|
||||||
|
stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
|
||||||
|
stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
|
||||||
|
stmt.executeUpdate();
|
||||||
|
|
||||||
|
closeStatements(stmt);
|
||||||
|
|
||||||
|
stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
|
||||||
|
stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
|
||||||
|
stmt.setString(2, "" + DerbyRepoConstants.VERSION);
|
||||||
|
stmt.executeUpdate();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
LOG.error("Can't persist the repository version", e);
|
||||||
|
} finally {
|
||||||
|
closeResultSets(rs);
|
||||||
|
closeStatements(stmt);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean schemaExists() {
|
public boolean haveSuitableInternals(Connection conn) {
|
||||||
Connection connection = null;
|
int version = detectVersion(conn);
|
||||||
Statement stmt = null;
|
|
||||||
try {
|
|
||||||
connection = dataSource.getConnection();
|
|
||||||
stmt = connection.createStatement();
|
|
||||||
ResultSet rset = stmt.executeQuery(QUERY_SYSSCHEMA_SQOOP);
|
|
||||||
|
|
||||||
if (!rset.next()) {
|
if(version != DerbyRepoConstants.VERSION) {
|
||||||
LOG.warn("Schema for SQOOP does not exist");
|
return false;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
String sqoopSchemaId = rset.getString(1);
|
|
||||||
LOG.debug("SQOOP schema ID: " + sqoopSchemaId);
|
|
||||||
connection.commit();
|
|
||||||
} catch (SQLException ex) {
|
|
||||||
if (connection != null) {
|
|
||||||
try {
|
|
||||||
connection.rollback();
|
|
||||||
} catch (SQLException ex2) {
|
|
||||||
LOG.error("Unable to rollback transaction", ex2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new SqoopException(DerbyRepoError.DERBYREPO_0001, ex);
|
|
||||||
} finally {
|
|
||||||
closeStatements(stmt);
|
|
||||||
if (connection != null) {
|
|
||||||
try {
|
|
||||||
connection.close();
|
|
||||||
} catch (SQLException ex) {
|
|
||||||
LOG.error("Unable to close connection", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1656,52 +1720,27 @@ private void registerFormInputs(long formId, List<MInput<?>> inputs,
|
|||||||
/**
|
/**
|
||||||
* Execute given query on database.
|
* Execute given query on database.
|
||||||
*
|
*
|
||||||
* Passed query will be executed in it's own transaction
|
|
||||||
*
|
|
||||||
* @param query Query that should be executed
|
* @param query Query that should be executed
|
||||||
*/
|
*/
|
||||||
private void runQuery(String query) {
|
private void runQuery(String query, Connection conn) {
|
||||||
Connection connection = null;
|
|
||||||
Statement stmt = null;
|
Statement stmt = null;
|
||||||
try {
|
try {
|
||||||
connection = dataSource.getConnection();
|
stmt = conn.createStatement();
|
||||||
stmt = connection.createStatement();
|
|
||||||
if (stmt.execute(query)) {
|
if (stmt.execute(query)) {
|
||||||
ResultSet rset = stmt.getResultSet();
|
ResultSet rset = stmt.getResultSet();
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (rset.next()) {
|
while (rset.next()) {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
LOG.info("QUERY(" + query + ") produced unused resultset with "
|
LOG.info("QUERY(" + query + ") produced unused resultset with "+ count + " rows");
|
||||||
+ count + " rows");
|
|
||||||
} else {
|
} else {
|
||||||
int updateCount = stmt.getUpdateCount();
|
int updateCount = stmt.getUpdateCount();
|
||||||
LOG.info("QUERY(" + query + ") Update count: " + updateCount);
|
LOG.info("QUERY(" + query + ") Update count: " + updateCount);
|
||||||
}
|
}
|
||||||
connection.commit();
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
try {
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0003, query, ex);
|
||||||
connection.rollback();
|
|
||||||
} catch (SQLException ex2) {
|
|
||||||
LOG.error("Unable to rollback transaction", ex2);
|
|
||||||
}
|
|
||||||
throw new SqoopException(DerbyRepoError.DERBYREPO_0003,
|
|
||||||
query, ex);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (stmt != null) {
|
closeStatements(stmt);
|
||||||
try {
|
|
||||||
stmt.close();
|
|
||||||
} catch (SQLException ex) {
|
|
||||||
LOG.error("Unable to close statement", ex);
|
|
||||||
}
|
|
||||||
if (connection != null) {
|
|
||||||
try {
|
|
||||||
connection.close();
|
|
||||||
} catch (SQLException ex) {
|
|
||||||
LOG.error("Unable to close connection", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.sqoop.repository.derby;
|
package org.apache.sqoop.repository.derby;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public final class DerbySchemaConstants {
|
public final class DerbySchemaConstants {
|
||||||
|
|
||||||
public static final String SCHEMA_SQOOP = "SQOOP";
|
public static final String SCHEMA_SQOOP = "SQOOP";
|
||||||
@ -25,6 +28,19 @@ public final class DerbySchemaConstants {
|
|||||||
|
|
||||||
private static final String CONSTRAINT_PREFIX = "FK_";
|
private static final String CONSTRAINT_PREFIX = "FK_";
|
||||||
|
|
||||||
|
// SQ_SYSTEM
|
||||||
|
|
||||||
|
public static final String TABLE_SQ_SYSTEM_NAME = "SQ_SYSTEM";
|
||||||
|
|
||||||
|
public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX
|
||||||
|
+ TABLE_SQ_SYSTEM_NAME;
|
||||||
|
|
||||||
|
public static final String COLUMN_SQM_ID = "SQM_ID";
|
||||||
|
|
||||||
|
public static final String COLUMN_SQM_KEY = "SQM_KEY";
|
||||||
|
|
||||||
|
public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
|
||||||
|
|
||||||
// SQ_CONNECTOR
|
// SQ_CONNECTOR
|
||||||
|
|
||||||
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
|
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
|
||||||
@ -260,6 +276,26 @@ public final class DerbySchemaConstants {
|
|||||||
|
|
||||||
public static final String CONSTRAINT_SQRS_SQS = SCHEMA_PREFIX + CONSTRAINT_SQRS_SQS_NAME;
|
public static final String CONSTRAINT_SQRS_SQS = SCHEMA_PREFIX + CONSTRAINT_SQRS_SQS_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of expected tables for first version;
|
||||||
|
* This list here is for backward compatibility.
|
||||||
|
*/
|
||||||
|
public static final Set<String> tablesV1;
|
||||||
|
static {
|
||||||
|
tablesV1 = new HashSet<String>();
|
||||||
|
tablesV1.add(TABLE_SQ_CONNECTOR_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_CONNECTION_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_COUNTER_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_FORM_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_INPUT_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_JOB_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);
|
||||||
|
tablesV1.add(TABLE_SQ_SUBMISSION_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
private DerbySchemaConstants() {
|
private DerbySchemaConstants() {
|
||||||
// Disable explicit object creation
|
// Disable explicit object creation
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,18 @@
|
|||||||
/**
|
/**
|
||||||
* DDL queries that create the Sqoop repository schema in Derby database. These
|
* DDL queries that create the Sqoop repository schema in Derby database. These
|
||||||
* queries create the following tables:
|
* queries create the following tables:
|
||||||
*
|
* <p>
|
||||||
|
* <strong>SQ_SYSTEM</strong>: Store for various state information
|
||||||
|
* <pre>
|
||||||
|
* +----------------------------+
|
||||||
|
* | SQ_SYSTEM |
|
||||||
|
* +----------------------------+
|
||||||
|
* | SQM_ID: BIGINT PK |
|
||||||
|
* | SQM_KEY: VARCHAR(64) |
|
||||||
|
* | SQM_VALUE: VARCHAR(64) |
|
||||||
|
* +----------------------------+
|
||||||
|
* </pre>
|
||||||
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* <strong>SQ_CONNECTOR</strong>: Connector registration.
|
* <strong>SQ_CONNECTOR</strong>: Connector registration.
|
||||||
* <pre>
|
* <pre>
|
||||||
@ -185,6 +196,14 @@ public final class DerbySchemaQuery {
|
|||||||
"SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
|
"SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
|
||||||
+ SCHEMA_SQOOP + "'";
|
+ SCHEMA_SQOOP + "'";
|
||||||
|
|
||||||
|
// DDL: Create table SQ_SYSTEM
|
||||||
|
public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
|
||||||
|
"CREATE TABLE " + TABLE_SQ_SYSTEM + " ("
|
||||||
|
+ COLUMN_SQM_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
|
||||||
|
+ COLUMN_SQM_KEY + " VARCHAR(64), "
|
||||||
|
+ COLUMN_SQM_VALUE + " VARCHAR(64) "
|
||||||
|
+ ")";
|
||||||
|
|
||||||
// DDL: Create table SQ_CONNECTOR
|
// DDL: Create table SQ_CONNECTOR
|
||||||
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
|
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
|
||||||
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
|
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
|
||||||
@ -336,6 +355,25 @@ public final class DerbySchemaQuery {
|
|||||||
+ "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
|
+ "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
|
||||||
+ ")";
|
+ ")";
|
||||||
|
|
||||||
|
// DML: Get system key
|
||||||
|
public static final String STMT_SELECT_SYSTEM =
|
||||||
|
"SELECT "
|
||||||
|
+ COLUMN_SQM_VALUE
|
||||||
|
+ " FROM " + TABLE_SQ_SYSTEM
|
||||||
|
+ " WHERE " + COLUMN_SQM_KEY + " = ?";
|
||||||
|
|
||||||
|
// DML: Remove system key
|
||||||
|
public static final String STMT_DELETE_SYSTEM =
|
||||||
|
"DELETE FROM " + TABLE_SQ_SYSTEM
|
||||||
|
+ " WHERE " + COLUMN_SQM_KEY + " = ?";
|
||||||
|
|
||||||
|
// DML: Insert new system key
|
||||||
|
public static final String STMT_INSERT_SYSTEM =
|
||||||
|
"INSERT INTO " + TABLE_SQ_SYSTEM + "("
|
||||||
|
+ COLUMN_SQM_KEY + ", "
|
||||||
|
+ COLUMN_SQM_VALUE + ") "
|
||||||
|
+ "VALUES(?, ?)";
|
||||||
|
|
||||||
// DML: Fetch connector Given Name
|
// DML: Fetch connector Given Name
|
||||||
public static final String STMT_FETCH_BASE_CONNECTOR =
|
public static final String STMT_FETCH_BASE_CONNECTOR =
|
||||||
"SELECT "
|
"SELECT "
|
||||||
|
@ -98,6 +98,8 @@ protected void createSchema() throws Exception {
|
|||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER);
|
||||||
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
|
runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION);
|
||||||
|
runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM);
|
||||||
|
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '2')");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,47 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.sqoop.repository.derby;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TestInternals extends DerbyTestCase {
|
||||||
|
|
||||||
|
DerbyRepositoryHandler handler;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
handler = new DerbyRepositoryHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSuitableInternals() throws Exception {
|
||||||
|
assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
|
||||||
|
createSchema(); // Test code is building the structures
|
||||||
|
assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCreateorUpdateInternals() throws Exception {
|
||||||
|
assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
|
||||||
|
handler.createOrUpdateInternals(getDerbyConnection());
|
||||||
|
assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user