5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 23:41:35 +08:00

SQOOP-1728: Sqoop2: Force HDFS connector as a configurable fixes

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-11-14 11:15:55 -08:00
parent c1cb0bfc00
commit 68d5beeab4
4 changed files with 135 additions and 50 deletions

View File

@ -412,6 +412,7 @@ private void deleteJobs(List<MJob> jobs, RepositoryTransaction tx) {
public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
long connectorId = oldConnector.getPersistenceId();
String connectorName = oldConnector.getUniqueName();
newConnector.setPersistenceId(connectorId);
RepositoryTransaction tx = null;
@ -439,6 +440,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
// dont always rely on the repository implementation to return empty list for links
if (existingLinksByConnector != null) {
for (MLink link : existingLinksByConnector) {
LOG.info(" Link upgrade for link:" + link.getName() + " for connector:" + connectorName);
// Make a new copy of the configs
List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
@ -458,14 +460,17 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
// and stop the bootup of Sqoop server
logInvalidModelObject("link", newlink, validationResult);
upgradeSuccessful = false;
LOG.info(" LINK config upgrade FAILED for link: " + link.getName() + " for connector:" + connectorName);
}
}
}
LOG.info(" All Link and configs for this connector processed");
// 8. Run upgrade logic for the configs related to the job objects
if (existingJobsByConnector != null) {
for (MJob job : existingJobsByConnector) {
// every job has 2 parts, the FROM and the TO links and their
// corresponding connectors.
LOG.info(" Job upgrade for job:" + job.getName()+ " for connector:" + connectorName);
List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
MFromConfig newFromConfig = new MFromConfig(fromConfig);
@ -484,8 +489,9 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
if(validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validationResult);
logInvalidModelObject("fromJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" From JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
@ -507,17 +513,18 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
if(validationResult.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validationResult);
logInvalidModelObject("toJob", newJob, validationResult);
upgradeSuccessful = false;
LOG.error(" TO JOB config upgrade FAILED for job: " + job.getName() + " for connector:" + connectorName);
}
}
}
}
LOG.info(" All Job and configs for this connector processed");
if (upgradeSuccessful) {
tx.commit();
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0027);
throw new SqoopException(RepositoryError.JDBCREPO_0027, " for connector:" + connectorName);
}
} catch (SqoopException ex) {
if (tx != null) {
@ -533,7 +540,7 @@ public final void upgradeConnector(MConnector oldConnector, MConnector newConnec
if (tx != null) {
tx.close();
}
LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName());
LOG.info("Connector upgrade finished for: " + connectorName);
}
}
@ -582,7 +589,7 @@ public final void upgradeDriver(MDriver driver) {
if (upgradeSuccessful) {
tx.commit();
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0027);
throw new SqoopException(RepositoryError.JDBCREPO_0027, " Driver");
}
} catch (SqoopException ex) {
if(tx != null) {

View File

@ -20,6 +20,7 @@
import static org.apache.sqoop.repository.derby.DerbySchemaCreateQuery.*;
import static org.apache.sqoop.repository.derby.DerbySchemaInsertUpdateDeleteSelectQuery.*;
import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
@ -426,11 +427,16 @@ public void createOrUpgradeRepository(Connection conn) {
// Data modifications only for non-fresh install.
if (repositoryVersion > 0) {
// Register HDFS connector
updateJobRepositorySchemaAndData(conn, registerHdfsConnector(conn));
LOG.info("Force registering the HDFS connector as a new configurable");
long hdfsConnectorId = registerHdfsConnector(conn);
LOG.info("Finished Force registering the HDFS connector as a new configurable");
LOG.info("Updating config and inputs for the hdfs connector.");
updateJobConfigInputForHdfsConnector(conn, hdfsConnectorId);
LOG.info("Finished Updating config and inputs for the hdfs connector.");
}
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update all the job data.
// Data updates depend on knowledge of the type of job.
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
@ -645,13 +651,10 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
* Since Sqoop is no longer responsible for HDFS,
* the HDFS connector/link must be added.
* Also, the framework configs are moved around such that
* they belong to the added HDFS connector. Any extra configs
* are removed.
* NOTE: Connector configs should have a direction (FROM/TO),
* but framework configs should not.
* but link configs should not.
*
* Here's a brief list describing the data migration process.
* 1. Change SQ_CONFIG.SQ_CFG_DIRECTION from IMPORT to FROM.
@ -663,28 +666,23 @@ protected void updateDirections(Connection conn, Map<Direction, Long> directionM
* This should affect connectors only since Connector configs
* should have had a value for SQ_CFG_OPERATION.
* 5. Add HDFS connector for jobs to reference.
* 6. Set 'input' and 'output' configs connector.
* to HDFS connector.
* 7. Throttling config was originally the second config in
* the framework. It should now be the first config.
* 6. Set 'fromJobConfig' and 'toJobConfig' configs for HDFS connector.
* 7. Add 'linkConfig' and 'linkConfig.uri' to the configs for the hdfs
* 8. Remove the EXPORT throttling config and ensure all of
* its dependencies point to the IMPORT throttling config.
* Then make sure the throttling config does not have a direction.
* Framework configs should not have a direction.
* 9. Create an HDFS link to reference and update
* jobs to reference that link. IMPORT jobs
* should have TO HDFS connector, EXPORT jobs should have
* FROM HDFS connector.
* 10. Update 'table' config names to 'fromJobConfig' and 'toTable'.
* 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'.
* Also update the relevant inputs as well.
* @param conn
*/
// NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
// tables instead of the latest config/link tables
private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Updating existing data for generic connectors.");
}
@Deprecated
private void updateJobConfigInputForHdfsConnector(Connection conn, long hdfsConnectorId) {
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
Direction.FROM.toString(), "IMPORT");
@ -699,7 +697,21 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
"output");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
new Long(connectorId), "input", "output");
new Long(hdfsConnectorId), "input", "output");
//update the names of the configs
// 1. input ==> fromJobConfig
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn,
"fromJobConfig",
"input");
// 2. output ===> toJobConfig
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME, conn,
"toJobConfig",
"output");
// create the link config
Long linkFormId = createHdfsLinkForm(conn, hdfsConnectorId);
// create the link config input
runQuery(STMT_INSERT_INTO_INPUT_WITH_FORM, conn, "linkConfig.uri", linkFormId, 0, MInputType.STRING.name(), false, 255, null);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
"IMPORT", "EXPORT");
@ -712,7 +724,7 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
Long connectionId = createHdfsConnection(conn, connectorId);
Long connectionId = createHdfsConnection(conn, hdfsConnectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
@ -720,20 +732,13 @@ private void updateJobRepositorySchemaAndData(Connection conn, long connectorId)
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
new Long(connectionId), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"fromJobConfig", "table", Direction.FROM.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
Direction.FROM.toString().toLowerCase(), "fromJobConfig", Direction.FROM.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"toJobConfig", "table", Direction.TO.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
Direction.TO.toString().toLowerCase(), "toJobConfig", Direction.TO.toString());
if (LOG.isTraceEnabled()) {
LOG.trace("Updated existing data for generic connectors.");
}
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES, conn,
"fromJobConfig", "fromJobConfig", Direction.FROM.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES, conn,
"toJobConfig", "toJobConfig", Direction.TO.toString());
}
/**
* Pre-register HDFS Connector so that config upgrade will work.
* NOTE: This should be used only in the upgrade path
@ -761,7 +766,7 @@ protected long registerHdfsConnector(Connection conn) {
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS,
STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
@ -814,13 +819,13 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
result = stmt.executeUpdate();
if (result != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
throw new SqoopException(DerbyRepoError.DERBYREPO_0003,
Integer.toString(result));
}
ResultSet rsetConnectionId = stmt.getGeneratedKeys();
if (!rsetConnectionId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
}
if (LOG.isTraceEnabled()) {
@ -828,6 +833,49 @@ private Long createHdfsConnection(Connection conn, Long connectorId) {
}
return rsetConnectionId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex);
} finally {
closeStatements(stmt);
}
}
/**
* We are creating the LINK FORM for HDFS and later it the schema will
* be renamed to LINK CONFIG
* NOTE: Should be used only in the upgrade path!
*/
@Deprecated
private Long createHdfsLinkForm(Connection conn, Long connectorId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating HDFS link.");
}
PreparedStatement stmt = null;
int result;
try {
short index = 0;
stmt = conn.prepareStatement(STMT_INSERT_INTO_FORM, Statement.RETURN_GENERATED_KEYS);
stmt.setLong(1, connectorId);
stmt.setString(2, "linkConfig");
// it could also be set to the deprecated "CONNECTION"
stmt.setString(3, MConfigType.LINK.name());
stmt.setShort(4, index);
result = stmt.executeUpdate();
if (result != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(result));
}
ResultSet rsetFormId = stmt.getGeneratedKeys();
if (!rsetFormId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Created HDFS connector link FORM.");
}
return rsetFormId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
} finally {

View File

@ -103,7 +103,7 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
+ ") VALUES (?, ?, ?, ?)";
@Deprecated // used only in the upgrade path
public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS =
public static final String STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR+ " ("
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "

View File

@ -119,6 +119,28 @@ public final class DerbySchemaUpgradeQuery {
"RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
+ " TO " + COLUMN_SQF_DIRECTION;
//DML: Insert into form
public static final String STMT_INSERT_INTO_FORM =
"INSERT INTO " + TABLE_SQ_FORM+ " ("
+ COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_NAME + ", "
+ COLUMN_SQF_TYPE + ", "
+ COLUMN_SQF_INDEX
+ ") VALUES ( ?, ?, ?, ?)";
// DML: Insert into inpu with form name
public static final String STMT_INSERT_INTO_INPUT_WITH_FORM =
"INSERT INTO " + TABLE_SQ_INPUT + " ("
+ COLUMN_SQI_NAME + ", "
+ COLUMN_SQI_FORM + ", "
+ COLUMN_SQI_INDEX + ", "
+ COLUMN_SQI_TYPE + ", "
+ COLUMN_SQI_STRMASK + ", "
+ COLUMN_SQI_STRLENGTH + ", "
+ COLUMN_SQI_ENUMVALS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
+ "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
@ -129,6 +151,10 @@ public final class DerbySchemaUpgradeQuery {
+ " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
+ COLUMN_SQF_NAME + " IN (?, ?)";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_NAME =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_NAME + "= ?"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
@ -152,16 +178,20 @@ public final class DerbySchemaUpgradeQuery {
+ " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?";
/**
* Intended to rename forms based on direction.
* e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
* then SQ_FORM.SQF_NAME = 'fromTable'.
*/
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
// remove "input" from the prefix of the name for hdfs configs
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_FROM_JOB_INPUT_NAMES =
"UPDATE " + TABLE_SQ_INPUT + " SET "
+ COLUMN_SQI_NAME + "=("
+ "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
+ " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
+ "? || SUBSTR(" + COLUMN_SQI_NAME + ", 6) )"
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";
// remove output from the prefix of the name for hdfs configs
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_TO_JOB_INPUT_NAMES =
"UPDATE " + TABLE_SQ_INPUT + " SET "
+ COLUMN_SQI_NAME + "=("
+ "? || SUBSTR(" + COLUMN_SQI_NAME + ", 7) )"
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";