mirror of
https://github.com/apache/sqoop.git
synced 2025-05-10 00:13:17 +08:00
SQOOP-1733: Port SQOOP-1728 to sqoop2 branch
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
deacc90c49
commit
dfd1fd3481
@ -412,6 +412,7 @@ private void deleteJobs(List<MJob> jobs, RepositoryTransaction tx) {
|
|||||||
public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
|
public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
|
||||||
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
|
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
|
||||||
long connectorId = oldConnector.getPersistenceId();
|
long connectorId = oldConnector.getPersistenceId();
|
||||||
|
String connectorName = oldConnector.getUniqueName();
|
||||||
newConnector.setPersistenceId(connectorId);
|
newConnector.setPersistenceId(connectorId);
|
||||||
|
|
||||||
RepositoryTransaction tx = null;
|
RepositoryTransaction tx = null;
|
||||||
@ -439,6 +440,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
// dont always rely on the repository implementation to return empty list for links
|
// dont always rely on the repository implementation to return empty list for links
|
||||||
if (existingLinksByConnector != null) {
|
if (existingLinksByConnector != null) {
|
||||||
for (MLink link : existingLinksByConnector) {
|
for (MLink link : existingLinksByConnector) {
|
||||||
|
LOG.info(" Link upgrade for link:" + link.getName() + " for connector:" + connectorName);
|
||||||
// Make a new copy of the configs
|
// Make a new copy of the configs
|
||||||
List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
|
List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
|
||||||
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
|
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
|
||||||
@ -458,14 +460,17 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
// and stop the bootup of Sqoop server
|
// and stop the bootup of Sqoop server
|
||||||
logInvalidModelObject("link", newlink, validationResult);
|
logInvalidModelObject("link", newlink, validationResult);
|
||||||
upgradeSuccessful = false;
|
upgradeSuccessful = false;
|
||||||
|
LOG.info(" LINK config upgrade FAILED for link: " + link.getName() + " for connector:" + connectorName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info(" All Link and configs for this connector processed");
|
||||||
// 8. Run upgrade logic for the configs related to the job objects
|
// 8. Run upgrade logic for the configs related to the job objects
|
||||||
if (existingJobsByConnector != null) {
|
if (existingJobsByConnector != null) {
|
||||||
for (MJob job : existingJobsByConnector) {
|
for (MJob job : existingJobsByConnector) {
|
||||||
// every job has 2 parts, the FROM and the TO links and their
|
// every job has 2 parts, the FROM and the TO links and their
|
||||||
// corresponding connectors.
|
// corresponding connectors.
|
||||||
|
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName);
|
||||||
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
|
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
|
||||||
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
|
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
|
||||||
MFromConfig newFromConfig = new MFromConfig(fromConfig);
|
MFromConfig newFromConfig = new MFromConfig(fromConfig);
|
||||||
@ -484,8 +489,9 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
if(validationResult.getStatus().canProceed()) {
|
if(validationResult.getStatus().canProceed()) {
|
||||||
updateJob(newJob, tx);
|
updateJob(newJob, tx);
|
||||||
} else {
|
} else {
|
||||||
logInvalidModelObject("job", newJob, validationResult);
|
logInvalidModelObject("fromJob", newJob, validationResult);
|
||||||
upgradeSuccessful = false;
|
upgradeSuccessful = false;
|
||||||
|
LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -507,17 +513,18 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
if(validationResult.getStatus().canProceed()) {
|
if(validationResult.getStatus().canProceed()) {
|
||||||
updateJob(newJob, tx);
|
updateJob(newJob, tx);
|
||||||
} else {
|
} else {
|
||||||
logInvalidModelObject("job", newJob, validationResult);
|
logInvalidModelObject("toJob", newJob, validationResult);
|
||||||
upgradeSuccessful = false;
|
upgradeSuccessful = false;
|
||||||
|
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info(" All Job and configs for this connector processed");
|
||||||
if (upgradeSuccessful) {
|
if (upgradeSuccessful) {
|
||||||
tx.commit();
|
tx.commit();
|
||||||
} else {
|
} else {
|
||||||
throw new SqoopException(RepositoryError.JDBCREPO_0027);
|
throw new SqoopException(RepositoryError.JDBCREPO_0027, " for connector:" + connectorName);
|
||||||
}
|
}
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
@ -533,7 +540,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
|
|||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
tx.close();
|
tx.close();
|
||||||
}
|
}
|
||||||
LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName());
|
LOG.info("Connector upgrade finished for: " + connectorName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -582,7 +589,7 @@ public final void upgradeDriver(MDriver driver) {
|
|||||||
if (upgradeSuccessful) {
|
if (upgradeSuccessful) {
|
||||||
tx.commit();
|
tx.commit();
|
||||||
} else {
|
} else {
|
||||||
throw new SqoopException(RepositoryError.JDBCREPO_0027);
|
throw new SqoopException(RepositoryError.JDBCREPO_0027, " Driver");
|
||||||
}
|
}
|
||||||
} catch (SqoopException ex) {
|
} catch (SqoopException ex) {
|
||||||
if(tx != null) {
|
if(tx != null) {
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import static org.apache.sqoop.repository.common.CommonRepositoryInsertUpdateDeleteSelectQuery.*;
|
import static org.apache.sqoop.repository.common.CommonRepositoryInsertUpdateDeleteSelectQuery.*;
|
||||||
|
|
||||||
import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*;
|
import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*;
|
||||||
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
@ -43,6 +44,8 @@
|
|||||||
import org.apache.sqoop.common.SqoopException;
|
import org.apache.sqoop.common.SqoopException;
|
||||||
import org.apache.sqoop.connector.ConnectorHandler;
|
import org.apache.sqoop.connector.ConnectorHandler;
|
||||||
import org.apache.sqoop.connector.ConnectorManagerUtils;
|
import org.apache.sqoop.connector.ConnectorManagerUtils;
|
||||||
|
import org.apache.sqoop.model.MConfigType;
|
||||||
|
import org.apache.sqoop.model.MInputType;
|
||||||
import org.apache.sqoop.repository.JdbcRepositoryContext;
|
import org.apache.sqoop.repository.JdbcRepositoryContext;
|
||||||
import org.apache.sqoop.repository.common.CommonRepoConstants;
|
import org.apache.sqoop.repository.common.CommonRepoConstants;
|
||||||
import org.apache.sqoop.repository.common.CommonRepositoryHandler;
|
import org.apache.sqoop.repository.common.CommonRepositoryHandler;
|
||||||
@ -241,11 +244,16 @@ public void createOrUpgradeRepository(Connection conn) {
|
|||||||
|
|
||||||
// Data modifications only for non-fresh install.
|
// Data modifications only for non-fresh install.
|
||||||
if (repositoryVersion > 0) {
|
if (repositoryVersion > 0) {
|
||||||
// Register HDFS connector
|
LOG.info("Force registering the HDFS connector as a new configurable");
|
||||||
updateJobRepositorySchemaAndData(conn, registerHdfsConnector(conn));
|
long hdfsConnectorId = registerHdfsConnector(conn);
|
||||||
|
LOG.info("Finished Force registering the HDFS connector as a new configurable");
|
||||||
|
|
||||||
|
LOG.info("Updating config and inputs for the hdfs connector.");
|
||||||
|
updateJobConfigInputForHdfsConnector(conn, hdfsConnectorId);
|
||||||
|
LOG.info("Finished Updating config and inputs for the hdfs connector.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
|
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update all the job data.
|
||||||
// Data updates depend on knowledge of the type of job.
|
// Data updates depend on knowledge of the type of job.
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
|
||||||
|
|
||||||
@ -463,13 +471,10 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
|
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
|
||||||
* Since the framework is no longer responsible for HDFS,
|
* Since Sqoop is no longer responsible for HDFS,
|
||||||
* the HDFS connector/link must be added.
|
* the HDFS connector/link must be added.
|
||||||
* Also, the framework configs are moved around such that
|
|
||||||
* they belong to the added HDFS connector. Any extra configs
|
|
||||||
* are removed.
|
|
||||||
* NOTE: Connector configs should have a direction (FROM/TO),
|
* NOTE: Connector configs should have a direction (FROM/TO),
|
||||||
* but framework configs should not.
|
* but link configs should not.
|
||||||
*
|
*
|
||||||
* Here's a brief list describing the data migration process.
|
* Here's a brief list describing the data migration process.
|
||||||
* 1. Change SQ_CONFIG.SQ_CFG_DIRECTION from IMPORT to FROM.
|
* 1. Change SQ_CONFIG.SQ_CFG_DIRECTION from IMPORT to FROM.
|
||||||
@ -481,28 +486,23 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
|
|||||||
* This should affect connectors only since Connector configs
|
* This should affect connectors only since Connector configs
|
||||||
* should have had a value for SQ_CFG_OPERATION.
|
* should have had a value for SQ_CFG_OPERATION.
|
||||||
* 5. Add HDFS connector for jobs to reference.
|
* 5. Add HDFS connector for jobs to reference.
|
||||||
* 6. Set 'input' and 'output' configs connector.
|
* 6. Set 'fromJobConfig' and 'toJobConfig' configs for HDFS connector.
|
||||||
* to HDFS connector.
|
* 7. Add 'linkConfig' and 'linkConfig.uri' to the configs for the hdfs
|
||||||
* 7. Throttling config was originally the second config in
|
|
||||||
* the framework. It should now be the first config.
|
|
||||||
* 8. Remove the EXPORT throttling config and ensure all of
|
* 8. Remove the EXPORT throttling config and ensure all of
|
||||||
* its dependencies point to the IMPORT throttling config.
|
* its dependencies point to the IMPORT throttling config.
|
||||||
* Then make sure the throttling config does not have a direction.
|
* Then make sure the throttling config does not have a direction.
|
||||||
* Framework configs should not have a direction.
|
|
||||||
* 9. Create an HDFS link to reference and update
|
* 9. Create an HDFS link to reference and update
|
||||||
* jobs to reference that link. IMPORT jobs
|
* jobs to reference that link. IMPORT jobs
|
||||||
* should have TO HDFS connector, EXPORT jobs should have
|
* should have TO HDFS connector, EXPORT jobs should have
|
||||||
* FROM HDFS connector.
|
* FROM HDFS connector.
|
||||||
* 10. Update 'table' config names to 'fromJobConfig' and 'toTable'.
|
* 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'.
|
||||||
* Also update the relevant inputs as well.
|
* Also update the relevant inputs as well.
|
||||||
* @param conn
|
* @param conn
|
||||||
*/
|
*/
|
||||||
// NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
|
// NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
|
||||||
// tables instead of the latest config/link tables
|
// tables instead of the latest config/link tables
|
||||||
private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) {
|
@Deprecated
|
||||||
if (LOG.isTraceEnabled()) {
|
private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConnectorId) {
|
||||||
LOG.trace("Updating existing data for generic connectors.");
|
|
||||||
}
|
|
||||||
|
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
|
||||||
Direction.FROM.toString(), "IMPORT");
|
Direction.FROM.toString(), "IMPORT");
|
||||||
@ -517,7 +517,21 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
|
|||||||
"output");
|
"output");
|
||||||
|
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
|
||||||
new Long(connectorId), "input", "output");
|
new Long(hdfsConnectorId), "input", "output");
|
||||||
|
//update the names of the configs
|
||||||
|
// 1. input ==> fromJobConfig
|
||||||
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn,
|
||||||
|
"fromJobConfig",
|
||||||
|
"input");
|
||||||
|
// 2. output ===> toJobConfig
|
||||||
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn,
|
||||||
|
"toJobConfig",
|
||||||
|
"output");
|
||||||
|
|
||||||
|
// create the link config
|
||||||
|
Long linkFormId = createHdfsLinkForm(conn, hdfsConnectorId);
|
||||||
|
// create the link config input
|
||||||
|
runQuery(STMT_INSERT_INTO_INPUT_WITH_FORM, conn, "linkConfig.uri", linkFormId, 0, MInputType.STRING.name(), false, 255, null);
|
||||||
|
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
|
||||||
"IMPORT", "EXPORT");
|
"IMPORT", "EXPORT");
|
||||||
@ -530,7 +544,7 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
|
|||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
|
||||||
new Long(0), "throttling");
|
new Long(0), "throttling");
|
||||||
|
|
||||||
Long connectionId = createHdfsConnection(conn, connectorId);
|
Long connectionId = createHdfsConnection(conn, hdfsConnectorId);
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
|
||||||
"EXPORT");
|
"EXPORT");
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
|
||||||
@ -538,20 +552,13 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
|
|||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
|
||||||
new Long(connectionId), "IMPORT");
|
new Long(connectionId), "IMPORT");
|
||||||
|
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES, conn,
|
||||||
"fromJobConfig", "table", Direction.FROM.toString());
|
"fromJobConfig", "fromJobConfig", Direction.FROM.toString());
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
|
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES, conn,
|
||||||
Direction.FROM.toString().toLowerCase(), "fromJobConfig", Direction.FROM.toString());
|
"toJobConfig", "toJobConfig", Direction.TO.toString());
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
|
|
||||||
"toJobConfig", "table", Direction.TO.toString());
|
|
||||||
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
|
|
||||||
Direction.TO.toString().toLowerCase(), "toJobConfig", Direction.TO.toString());
|
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Updated existing data for generic connectors.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pre-register HDFS Connector so that config upgrade will work.
|
* Pre-register HDFS Connector so that config upgrade will work.
|
||||||
* NOTE: This should be used only in the upgrade path
|
* NOTE: This should be used only in the upgrade path
|
||||||
@ -579,7 +586,7 @@ protected long registerHdfsConnector(Connection conn) {
|
|||||||
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
|
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
|
||||||
try {
|
try {
|
||||||
PreparedStatement baseConnectorStmt = conn.prepareStatement(
|
PreparedStatement baseConnectorStmt = conn.prepareStatement(
|
||||||
STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS,
|
STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
|
||||||
Statement.RETURN_GENERATED_KEYS);
|
Statement.RETURN_GENERATED_KEYS);
|
||||||
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
|
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
|
||||||
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
|
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
|
||||||
@ -653,6 +660,49 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We are creating the LINK FORM for HDFS and later it the schema will
|
||||||
|
* be renamed to LINK CONFIG
|
||||||
|
* NOTE: Should be used only in the upgrade path!
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
private Long createHdfsLinkForm(Connection conn, Long connectorId) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Creating HDFS link.");
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedStatement stmt = null;
|
||||||
|
int result;
|
||||||
|
try {
|
||||||
|
short index = 0;
|
||||||
|
stmt = conn.prepareStatement(STMT_INSERT_INTO_FORM, Statement.RETURN_GENERATED_KEYS);
|
||||||
|
stmt.setLong(1, connectorId);
|
||||||
|
stmt.setString(2, "linkConfig");
|
||||||
|
// it could also be set to the deprecated "CONNECTION"
|
||||||
|
stmt.setString(3, MConfigType.LINK.name());
|
||||||
|
stmt.setShort(4, index);
|
||||||
|
result = stmt.executeUpdate();
|
||||||
|
if (result != 1) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(result));
|
||||||
|
}
|
||||||
|
ResultSet rsetFormId = stmt.getGeneratedKeys();
|
||||||
|
|
||||||
|
if (!rsetFormId.next()) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Created HDFS connector link FORM.");
|
||||||
|
}
|
||||||
|
return rsetFormId.getLong(1);
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex);
|
||||||
|
} finally {
|
||||||
|
closeStatements(stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
@ -63,8 +63,8 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
|
|||||||
+ COLUMN_SQC_VERSION
|
+ COLUMN_SQC_VERSION
|
||||||
+ " FROM " + TABLE_SQ_CONNECTOR;
|
+ " FROM " + TABLE_SQ_CONNECTOR;
|
||||||
|
|
||||||
@Deprecated // used only in the upgrade path
|
@Deprecated // used only in the upgrade path
|
||||||
public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS =
|
public static final String STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
|
||||||
"INSERT INTO " + TABLE_SQ_CONNECTOR+ " ("
|
"INSERT INTO " + TABLE_SQ_CONNECTOR+ " ("
|
||||||
+ COLUMN_SQC_NAME + ", "
|
+ COLUMN_SQC_NAME + ", "
|
||||||
+ COLUMN_SQC_CLASS + ", "
|
+ COLUMN_SQC_CLASS + ", "
|
||||||
|
@ -119,6 +119,28 @@ public final class DerbySchemaUpgradeQuery {
|
|||||||
"RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
|
"RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
|
||||||
+ " TO " + COLUMN_SQF_DIRECTION;
|
+ " TO " + COLUMN_SQF_DIRECTION;
|
||||||
|
|
||||||
|
//DML: Insert into form
|
||||||
|
public static final String STMT_INSERT_INTO_FORM =
|
||||||
|
"INSERT INTO " + TABLE_SQ_FORM+ " ("
|
||||||
|
+ COLUMN_SQF_CONNECTOR + ", "
|
||||||
|
+ COLUMN_SQF_NAME + ", "
|
||||||
|
+ COLUMN_SQF_TYPE + ", "
|
||||||
|
+ COLUMN_SQF_INDEX
|
||||||
|
+ ") VALUES ( ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
// DML: Insert into inpu with form name
|
||||||
|
public static final String STMT_INSERT_INTO_INPUT_WITH_FORM =
|
||||||
|
"INSERT INTO " + TABLE_SQ_INPUT + " ("
|
||||||
|
+ COLUMN_SQI_NAME + ", "
|
||||||
|
+ COLUMN_SQI_FORM + ", "
|
||||||
|
+ COLUMN_SQI_INDEX + ", "
|
||||||
|
+ COLUMN_SQI_TYPE + ", "
|
||||||
|
+ COLUMN_SQI_STRMASK + ", "
|
||||||
|
+ COLUMN_SQI_STRLENGTH + ", "
|
||||||
|
+ COLUMN_SQI_ENUMVALS
|
||||||
|
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
|
|
||||||
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
|
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
|
||||||
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
|
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
|
||||||
+ "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
|
+ "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
|
||||||
@ -129,6 +151,10 @@ public final class DerbySchemaUpgradeQuery {
|
|||||||
+ " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
|
+ " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
|
||||||
+ COLUMN_SQF_NAME + " IN (?, ?)";
|
+ COLUMN_SQF_NAME + " IN (?, ?)";
|
||||||
|
|
||||||
|
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME =
|
||||||
|
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_NAME + "= ?"
|
||||||
|
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
|
||||||
|
|
||||||
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
|
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
|
||||||
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
|
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
|
||||||
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
|
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
|
||||||
@ -152,16 +178,20 @@ public final class DerbySchemaUpgradeQuery {
|
|||||||
+ " WHERE " + COLUMN_SQF_NAME + "= ?"
|
+ " WHERE " + COLUMN_SQF_NAME + "= ?"
|
||||||
+ " AND " + COLUMN_SQF_DIRECTION + "= ?";
|
+ " AND " + COLUMN_SQF_DIRECTION + "= ?";
|
||||||
|
|
||||||
/**
|
// remove "input" from the prefix of the name for hdfs configs
|
||||||
* Intended to rename forms based on direction.
|
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES =
|
||||||
* e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
|
|
||||||
* then SQ_FORM.SQF_NAME = 'fromTable'.
|
|
||||||
*/
|
|
||||||
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
|
|
||||||
"UPDATE " + TABLE_SQ_INPUT + " SET "
|
"UPDATE " + TABLE_SQ_INPUT + " SET "
|
||||||
+ COLUMN_SQI_NAME + "=("
|
+ COLUMN_SQI_NAME + "=("
|
||||||
+ "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
|
+ "? || SUBSTR(" + COLUMN_SQI_NAME + ", 6) )"
|
||||||
+ " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
|
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
|
||||||
|
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
|
||||||
|
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";
|
||||||
|
|
||||||
|
// remove output from the prefix of the name for hdfs configs
|
||||||
|
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES =
|
||||||
|
"UPDATE " + TABLE_SQ_INPUT + " SET "
|
||||||
|
+ COLUMN_SQI_NAME + "=("
|
||||||
|
+ "? || SUBSTR(" + COLUMN_SQI_NAME + ", 7) )"
|
||||||
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
|
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
|
||||||
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
|
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
|
||||||
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";
|
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";
|
||||||
|
Loading…
Reference in New Issue
Block a user