mirror of
https://github.com/apache/sqoop.git
synced 2025-05-06 11:09:26 +08:00
SQOOP-1619: Sqoop2: Enforce Unique constraint for name on configurable table
(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
parent
24e9b4c5b8
commit
d8e9ebc45c
@ -437,13 +437,13 @@ public void createOrUpgradeRepository(Connection conn) {
|
||||
renameEntitiesForConnectionAndForm(conn);
|
||||
// Change direction from VARCHAR to BIGINT + foreign key.
|
||||
updateDirections(conn, insertDirections(conn));
|
||||
|
||||
renameConnectorToConfigurable(conn);
|
||||
}
|
||||
// Add unique constraints on job and links for version 4 onwards
|
||||
if (repositoryVersion > 3) {
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
|
||||
}
|
||||
// last step upgrade the repository version to the latest value in the code
|
||||
upgradeRepositoryVersion(conn);
|
||||
|
@ -68,6 +68,9 @@ public final class DerbySchemaConstants {
|
||||
|
||||
public static final String COLUMN_SQC_TYPE = "SQC_TYPE";
|
||||
|
||||
public static final String CONSTRAINT_SQ_CONFIGURABLE_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQC_NAME_UNIQUE";
|
||||
public static final String CONSTRAINT_SQ_CONFIGURABLE_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_CONFIGURABLE_UNIQUE_NAME;
|
||||
|
||||
// SQ_CONNECTOR_DIRECTIONS
|
||||
|
||||
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
|
||||
|
@ -1423,6 +1423,11 @@ public final class DerbySchemaQuery {
|
||||
STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
|
||||
+ COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
|
||||
|
||||
//add unique constraint on the configurable table
|
||||
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
|
||||
+ TABLE_SQ_CONFIGURABLE + " ADD CONSTRAINT " + CONSTRAINT_SQ_CONFIGURABLE_UNIQUE + " UNIQUE ("
|
||||
+ COLUMN_SQC_NAME + ")";
|
||||
|
||||
private DerbySchemaQuery() {
|
||||
// Disable explicit object creation
|
||||
}
|
||||
|
@ -219,8 +219,9 @@ protected void createOrUpgradeSchema(int version) throws Exception {
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO);
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
|
||||
renameEntitiesForConnectionAndForm();
|
||||
// add the name constraints
|
||||
// add the name constraint for job
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
|
||||
// add the name constraint for link
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
|
||||
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
|
||||
@ -231,6 +232,8 @@ protected void createOrUpgradeSchema(int version) throws Exception {
|
||||
runQuery(STMT_INSERT_DIRECTION, direction.toString());
|
||||
}
|
||||
renameConnectorToConfigurable();
|
||||
// add the name constraint for configurable
|
||||
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME);
|
||||
}
|
||||
|
||||
// deprecated repository version
|
||||
@ -625,49 +628,57 @@ public void loadJobs(int version) throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* testing job with non unique name objects into repository.
|
||||
*
|
||||
* @param version
|
||||
* system version 4
|
||||
* @throws Exception
|
||||
*/
|
||||
public void loadNonUniqueJobsInVersion4() throws Exception {
|
||||
int index = 0;
|
||||
// insert JAs
|
||||
for (String name : new String[] { "JA", "JA", "JA", "JA" }) {
|
||||
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)" + " VALUES('"
|
||||
+ name + index + "', 1, 1)");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* testing link with non unique name objects into repository.
|
||||
*
|
||||
* @param version
|
||||
* system version 4
|
||||
* @throws Exception
|
||||
*/
|
||||
public void loadNonUniqueLinksInVersion4() throws Exception {
|
||||
|
||||
// Insert two links - CA and CA
|
||||
runInsertQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)");
|
||||
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)");
|
||||
}
|
||||
|
||||
/**
|
||||
* testing configurable with non unique name objects into repository.
|
||||
*
|
||||
* @param version
|
||||
* system version 4
|
||||
* @throws Exception
|
||||
*/
|
||||
public void loadNonUniqueConfigurablesInVersion4() throws Exception {
|
||||
|
||||
// Insert two configurable - CB and CB
|
||||
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
|
||||
+ "VALUES('CB', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')");
|
||||
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
|
||||
+ "VALUES('CB', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')");
|
||||
|
||||
}
|
||||
|
||||
public void loadJobsForLatestVersion() throws Exception {
|
||||
loadJobs(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
|
||||
}
|
||||
|
||||
protected void removeDuplicateLinkNames(int version) throws Exception {
|
||||
switch (version) {
|
||||
case 2:
|
||||
// nothing to do
|
||||
break;
|
||||
case 4:
|
||||
Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
|
||||
.prepareStatement("SELECT SQ_LNK_NAME, SQ_LNK_ID FROM SQOOP.SQ_LINK"));
|
||||
for (String name : nameIdMap.keySet()) {
|
||||
if (nameIdMap.get(name).size() > 1) {
|
||||
for (Long id : nameIdMap.get(name)) {
|
||||
runQuery("UPDATE SQOOP.SQ_LINK SET SQ_LNK_NAME=? WHERE SQ_LNK_ID=?", name + "-" + id,
|
||||
id);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
protected void removeDuplicateJobNames(int version) throws Exception {
|
||||
switch (version) {
|
||||
case 2:
|
||||
// nothing to do
|
||||
break;
|
||||
case 4:
|
||||
Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
|
||||
.prepareStatement("SELECT SQB_NAME, SQB_ID FROM SQOOP.SQ_JOB"));
|
||||
|
||||
for (String name : nameIdMap.keySet()) {
|
||||
if (nameIdMap.get(name).size() > 1) {
|
||||
for (Long id : nameIdMap.get(name)) {
|
||||
runQuery("UPDATE SQOOP.SQ_JOB SET SQB_NAME=? WHERE SQB_ID=?", name + "-" + id, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a second connector for testing with multiple connectors
|
||||
*/
|
||||
|
@ -21,8 +21,8 @@
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLIntegrityConstraintViolationException;
|
||||
|
||||
import org.apache.sqoop.common.SqoopException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -50,42 +50,34 @@ public void testCreateorUpdateRepositorySchema() throws Exception {
|
||||
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
|
||||
}
|
||||
|
||||
// TODO(VB): This should really test for a specific SQL exception that violates the constraints
|
||||
@Test(expected=SqoopException.class)
|
||||
public void testUpgradeVersion4WithLinkNameAndJobNameDuplicateFailure() throws Exception {
|
||||
@Test(expected=SQLIntegrityConstraintViolationException.class)
|
||||
public void testUpgradeVersion4WithNonUniqueJobNameFailure() throws Exception {
|
||||
super.createOrUpgradeSchema(4);
|
||||
super.loadConnectorAndDriverConfig(4);
|
||||
super.loadConnectionsOrLinks(4);
|
||||
super.loadJobs(4);
|
||||
// no removing of dupes for job name and link names, hence there should be a exception due to the unique name constraint
|
||||
handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
|
||||
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
|
||||
// try loading duplicate job names in version 4 and it should throw an exception
|
||||
super.loadNonUniqueJobsInVersion4();
|
||||
}
|
||||
// TODO: VB: follow up with the constraint code, which really does not test with examples that has
|
||||
// duplicate names, the id list is always of size 1
|
||||
//@Test
|
||||
public void testUpgradeVersion4WithLinkNameAndJobNameWithNoDuplication() throws Exception {
|
||||
@Test(expected=SQLIntegrityConstraintViolationException.class)
|
||||
public void testUpgradeVersion4WithNonUniqueLinkNamesAdded() throws Exception {
|
||||
super.createOrUpgradeSchema(4);
|
||||
super.loadConnectorAndDriverConfig(4);
|
||||
super.loadConnectionsOrLinks(4);
|
||||
super.loadJobs(4);
|
||||
super.removeDuplicateLinkNames(4);
|
||||
super.removeDuplicateJobNames(4);
|
||||
// removing duplicate job name and link name, hence there should be no exception with unique name constraint
|
||||
handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
|
||||
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
|
||||
// try loading duplicate link names in version 4 and it should throw an exception
|
||||
super.loadNonUniqueLinksInVersion4();
|
||||
}
|
||||
|
||||
@Test(expected=SQLIntegrityConstraintViolationException.class)
|
||||
public void testUpgradeVersion4WithNonUniqueConfigurableNamesAdded() throws Exception {
|
||||
super.createOrUpgradeSchema(4);
|
||||
// try loading duplicate configurable names in version 4 and it should throw an exception
|
||||
super.loadNonUniqueConfigurablesInVersion4();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpgradeRepoVersion2ToVersion4() throws Exception {
|
||||
// in case of version 2 schema there is no unique job/ link constraint
|
||||
super.createOrUpgradeSchema(2);
|
||||
assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
|
||||
loadConnectorAndDriverConfig(2);
|
||||
super.loadConnectionsOrLinks(2);
|
||||
super.loadJobs(2);
|
||||
super.removeDuplicateLinkNames(2);
|
||||
super.removeDuplicateJobNames(2);
|
||||
// in case of version 2 schema there is no unique job/ link constraint
|
||||
handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
|
||||
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user