5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-07 23:20:42 +08:00

SQOOP-1629: Sqoop2: Add unique constraint on the Config table for name and type

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-10-28 17:54:47 -07:00
parent a203e77264
commit 31f30cc279
7 changed files with 391 additions and 302 deletions

View File

@ -444,6 +444,7 @@ public void createOrUpgradeRepository(Connection conn) {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIGURABLE_ID, conn);
} }
// last step upgrade the repository version to the latest value in the code // last step upgrade the repository version to the latest value in the code
@ -2743,7 +2744,9 @@ public void loadConnectorConfigTypes(List<MConfig> linkConfig, List<MConfig> fro
+ "; config: " + config + "; config: " + config
+ "; index: " + configIndex + "; index: " + configIndex
+ "; expected: " + jobConfigs.size() + "; expected: " + jobConfigs.size()
); + "; configIndex: " + configIndex
);
} }
jobConfigs.add(config); jobConfigs.add(config);

View File

@ -140,6 +140,10 @@ public final class DerbySchemaConstants {
public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC"; public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC";
public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME; public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
// unique name constraint
public static final String CONSTRAINT_SQ_CONFIG_UNIQUE_NAME_TYPE_CONFIGURABLE = CONSTRAINT_PREFIX + "SQ_CFG_NAME_TYPE_CONFIGURABLE_UNIQUE";
public static final String CONSTRAINT_SQ_CONFIG_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_CONFIG_UNIQUE_NAME_TYPE_CONFIGURABLE;
// SQ_CONFIG_DIRECTIONS // SQ_CONFIG_DIRECTIONS
public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS"; public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS";

View File

@ -421,13 +421,21 @@ public final class DerbySchemaUpgradeQuery {
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR = "ALTER TABLE " public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR = "ALTER TABLE "
+ TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION; + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
// add unique constraint on the configurable table // add unique constraint on the configurable table for name
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE " public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
+ TABLE_SQ_CONFIGURABLE + TABLE_SQ_CONFIGURABLE
+ " ADD CONSTRAINT " + " ADD CONSTRAINT "
+ CONSTRAINT_SQ_CONFIGURABLE_UNIQUE + CONSTRAINT_SQ_CONFIGURABLE_UNIQUE
+ " UNIQUE (" + COLUMN_SQC_NAME + ")"; + " UNIQUE (" + COLUMN_SQC_NAME + ")";
// add unique constraint on the config table for name and type and configurableId
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIGURABLE_ID = "ALTER TABLE "
+ TABLE_SQ_CONFIG
+ " ADD CONSTRAINT "
+ CONSTRAINT_SQ_CONFIG_UNIQUE
+ " UNIQUE ("
+ COLUMN_SQ_CFG_NAME + ", " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_CONFIGURABLE + ")";
private DerbySchemaUpgradeQuery() { private DerbySchemaUpgradeQuery() {
// Disable explicit object creation // Disable explicit object creation
} }

View File

@ -57,11 +57,9 @@
*/ */
abstract public class DerbyTestCase { abstract public class DerbyTestCase {
public static final String DERBY_DRIVER = public static final String DERBY_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
"org.apache.derby.jdbc.EmbeddedDriver";
public static final String JDBC_URL = public static final String JDBC_URL = "jdbc:derby:memory:myDB";
"jdbc:derby:memory:myDB";
private Connection connection; private Connection connection;
@ -75,7 +73,7 @@ public void setUp() throws Exception {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
// Close active link // Close active link
if(connection != null) { if (connection != null) {
connection.close(); connection.close();
} }
@ -87,30 +85,6 @@ public void tearDown() throws Exception {
} }
} }
private Map<String, List<Long>> getNameToIdListMap(PreparedStatement ps) throws SQLException {
Map<String, List<Long>> nameToIdListMap = new TreeMap<String, List<Long>>();
ResultSet rs = null;
try {
rs = ps.executeQuery();
while (rs.next()) {
if (!nameToIdListMap.containsKey(rs.getString(1))) {
nameToIdListMap.put(rs.getString(1), new LinkedList<Long>());
}
nameToIdListMap.get(rs.getString(1)).add(rs.getLong(2));
}
} finally {
if (rs != null) {
rs.close();
}
if (ps != null) {
ps.close();
}
}
return nameToIdListMap;
}
void renameEntitiesForConnectionAndForm() throws Exception { void renameEntitiesForConnectionAndForm() throws Exception {
// SQ_LINK schema upgrades // SQ_LINK schema upgrades
// drop the constraint before rename and add it back later // drop the constraint before rename and add it back later
@ -237,12 +211,14 @@ protected void createOrUpgradeSchema(int version) throws Exception {
renameConnectorToConfigurable(); renameConnectorToConfigurable();
// add the name constraint for configurable // add the name constraint for configurable
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME); runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIGURABLE_ID);
} }
// deprecated repository version // deprecated repository version
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')"); runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')");
// new repository version // new repository version
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('repository.version', '" + version + "')"); runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('repository.version', '"
+ version + "')");
} }
@ -252,7 +228,6 @@ protected void createOrUpgradeSchemaForLatestVersion() throws Exception {
/** /**
* Run arbitrary query on derby memory repository. * Run arbitrary query on derby memory repository.
*
* @param query Query to execute * @param query Query to execute
* @throws Exception * @throws Exception
*/ */
@ -263,9 +238,9 @@ protected void runQuery(String query, Object... args) throws Exception {
for (int i = 0; i < args.length; ++i) { for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) { if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]); stmt.setString(i + 1, (String) args[i]);
} else if (args[i] instanceof Long) { } else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long)args[i]); stmt.setLong(i + 1, (Long) args[i]);
} else { } else {
stmt.setString(i + 1, args[i].toString()); stmt.setString(i + 1, args[i].toString());
} }
@ -289,13 +264,14 @@ protected void runQuery(String query, Object... args) throws Exception {
protected Long runInsertQuery(String query, Object... args) throws Exception { protected Long runInsertQuery(String query, Object... args) throws Exception {
PreparedStatement stmt = null; PreparedStatement stmt = null;
try { try {
stmt = getDerbyDatabaseConnection().prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS); stmt = getDerbyDatabaseConnection().prepareStatement(query,
PreparedStatement.RETURN_GENERATED_KEYS);
for (int i = 0; i < args.length; ++i) { for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) { if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]); stmt.setString(i + 1, (String) args[i]);
} else if (args[i] instanceof Long) { } else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long)args[i]); stmt.setLong(i + 1, (Long) args[i]);
} else { } else {
stmt.setString(i + 1, args[i].toString()); stmt.setString(i + 1, args[i].toString());
} }
@ -337,67 +313,57 @@ protected void loadConnectorAndDriverConfigVersion2() throws Exception {
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')"); + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
String connector = "1"; String connector = "1";
int index = 0;
// Connector config entries // Connector config entries
for(String operation : new String[] {"null", "'IMPORT'", "'EXPORT'"}) { for (String operation : new String[] { "null", "'IMPORT'", "'EXPORT'" }) {
String type; String type;
if(operation.equals("null")) { if (operation.equals("null")) {
type = "LINK"; type = "LINK";
} else { } else {
type = "JOB"; type = "JOB";
} }
String configName = "C1" + type + index;
runQuery("INSERT INTO SQOOP.SQ_FORM" runQuery("INSERT INTO SQOOP.SQ_FORM"
+ "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) " + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) " + "VALUES("
+ "VALUES(" + connector + ", " + operation + ",'" + configName + "'," + "'" + type + "'," + " 0)");
+ connector + ", " configName = "C2" + type + index;
+ operation
+ ", 'C1', '"
+ type
+ "', 0)");
runQuery("INSERT INTO SQOOP.SQ_FORM" runQuery("INSERT INTO SQOOP.SQ_FORM"
+ "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) " + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) " + "VALUES("
+ "VALUES(" + connector + ", " + operation + ",'" + configName + "'," + "'" + type + "'," + " 1)");
+ connector + ", " index++;
+ operation
+ ", 'C2', '"
+ type
+ "', 1)");
} }
// Driver config entries // Driver config entries
runQuery("INSERT INTO SQOOP.SQ_FORM" runQuery("INSERT INTO SQOOP.SQ_FORM"
+ "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) VALUES" + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) VALUES"
+ "(NULL, 'IMPORT', 'output', 'JOB', 0)," + "(NULL, 'IMPORT', 'output', 'JOB', 0)," + "(NULL, 'IMPORT', 'throttling', 'JOB', 1),"
+ "(NULL, 'IMPORT', 'throttling', 'JOB', 1)," + "(NULL, 'EXPORT', 'input', 'JOB', 0)," + "(NULL, 'EXPORT', 'throttling', 'JOB', 1),"
+ "(NULL, 'EXPORT', 'input', 'JOB', 0),"
+ "(NULL, 'EXPORT', 'throttling', 'JOB', 1),"
+ "(NULL, NULL, 'security', 'LINK', 0)"); + "(NULL, NULL, 'security', 'LINK', 0)");
// Connector input entries // Connector input entries
for(int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
// First config // First config
runQuery("INSERT INTO SQOOP.SQ_INPUT" runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)"); + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT" runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)"); + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
// Second config // Second config
runQuery("INSERT INTO SQOOP.SQ_INPUT" runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)"); + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT" runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)"); + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
} }
// Driver input entries. // Driver input entries.
runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_FORM, SQI_INDEX," runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_FORM, SQI_INDEX,"
+ " SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_ENUMVALS)" + " SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_ENUMVALS)"
+" VALUES ('security.maxConnections',11,0,'INTEGER','false',NULL,NULL)," + " VALUES ('security.maxConnections',11,0,'INTEGER','false',NULL,NULL),"
+ "('input.inputDirectory',9,0,'STRING','false',255,NULL)," + "('input.inputDirectory',9,0,'STRING','false',255,NULL),"
+ "('throttling.extractors',8,0,'INTEGER','false',NULL,NULL)," + "('throttling.extractors',8,0,'INTEGER','false',NULL,NULL),"
+ "('throttling.loaders',8,1,'INTEGER','false',NULL,NULL)," + "('throttling.loaders',8,1,'INTEGER','false',NULL,NULL),"
@ -409,20 +375,92 @@ protected void loadConnectorAndDriverConfigVersion2() throws Exception {
+ "('throttling.loaders',10,1,'INTEGER','false',NULL,NULL)"); + "('throttling.loaders',10,1,'INTEGER','false',NULL,NULL)");
} }
protected void addConnectorA() throws Exception {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')");
// insert 2 directions for connector A
runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(1, 1)");// FROM
runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(1, 2)");// TO
// insert the 2 link configs
// NOTE: link config has no direction
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'l1', 'LINK', 0)");// SQ_CFG_INDEX holds the ordering of
// the config for each type
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'l2', 'LINK', 1)");
// insert 2 FROM JOB configs
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'from1', 'JOB', 0)");
// insert the config direction for the from1-config
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(3, 1)");
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'from2', 'JOB', 1)");
// insert the config direction for the from2-config
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(4, 1)");
// insert 2 TO JOB configs
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'to1', 'JOB', 0)");
// insert the config direction for the to1-config
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(5, 2)");
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'to2', 'JOB', 1)");
// insert the config direction for the to2-config
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(6, 2)");
for (int i = 0; i < 3; i++) {
// First config
runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
// Second config
runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
}
}
protected void loadConnectorAndDriverConfigVersion4() throws Exception { protected void loadConnectorAndDriverConfigVersion4() throws Exception {
Long configId; Long configId;
runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')"); + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')");
for (String connector : new String[] { "1" }) { for (String connector : new String[] { "1" }) {
// Directions // Directions
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)" runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 1)"); + "VALUES(" + connector + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)" runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 2)"); + "VALUES(" + connector + ", 2)");
// connector configs with connectorId as 1 // connector configs with connectorId as 1
for (String direction : new String[]{null, "1", "2"}) { // all config names have to be unique per type
int index = 0;
for (String direction : new String[] { null, "1", "2" }) {
String type; String type;
if (direction == null) { if (direction == null) {
@ -430,41 +468,42 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
} else { } else {
type = "JOB"; type = "JOB";
} }
String configName = "C1" + type + index;
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG" configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES("
+ "VALUES(" + connector + ", 'C1', '" + type + "', 0)"); + connector + ", '" + configName + "', '" + type + "', 0)");
if (direction != null) { if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(" + configId + ", "
+ "VALUES(" + configId + ", " + direction + ")"); + direction + ")");
} }
configName = "C2" + type + index;
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG" configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES("
+ "VALUES(" + connector + ", 'C2', '" + type + "', 1)"); + connector + ", '" + configName + "', '" + type + "', 1)");
if (direction != null) { if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) " + "VALUES(" + configId + ", "
+ "VALUES(" + configId + ", " + direction + ")"); + direction + ")");
} }
index++;
} }
} }
// insert a driver // insert a driver
runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('SqoopDriver', 'org.apache.sqoop.driver.Driver', '1.0-test', 'DRIVER')"); + "VALUES('SqoopDriver', 'org.apache.sqoop.driver.Driver', '1.0-test', 'DRIVER')");
// driver config with driverId as 2 // driver config with driverId as 2
for (String type : new String[]{"JOB"}) { for (String type : new String[] { "JOB" }) {
runQuery("INSERT INTO SQOOP.SQ_CONFIG" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES(2"
+ "VALUES(2" + ", 'C1', '" + type + "', 0)"); + ", 'd1', '" + type + "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG" runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) " + "VALUES(2"
+ "VALUES(2" + ", 'C2', '" + type + "', 1)"); + ", 'd2', '" + type + "', 1)");
} }
// Input entries // Input entries
@ -473,44 +512,43 @@ protected void loadConnectorAndDriverConfigVersion4() throws Exception {
// Connector job (TO) config: 8-11 // Connector job (TO) config: 8-11
// Driver JOB config: 12-15 // Driver JOB config: 12-15
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
// First config // First config
runQuery("INSERT INTO SQOOP.SQ_INPUT" runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)"); + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT" runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)"); + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
// Second config // Second config
runQuery("INSERT INTO SQOOP.SQ_INPUT" runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)"); + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT" runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
+ "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)"); + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
} }
} }
/** /**
* Load testing connector and driver config into repository. * Load testing connector and driver config into repository.
* *
* @param version system version (2 or 4) * @param version
* system version (2 or 4)
* @throws Exception * @throws Exception
*/ */
protected void loadConnectorAndDriverConfig(int version) throws Exception { protected void loadConnectorAndDriverConfig(int version) throws Exception {
switch(version) { switch (version) {
case 2: case 2:
loadConnectorAndDriverConfigVersion2(); loadConnectorAndDriverConfigVersion2();
break; break;
case 4: case 4:
loadConnectorAndDriverConfigVersion4(); loadConnectorAndDriverConfigVersion4();
break; break;
default: default:
throw new AssertionError("Invalid connector and framework version: " + version); throw new AssertionError("Invalid connector and framework version: " + version);
} }
} }
@ -519,8 +557,7 @@ protected void loadConnectorAndDriverConfig() throws Exception {
} }
/** /**
* Load testing link objects into repository. * Load testing link objects into repository.
*
* @param version system version (2 or 4) * @param version system version (2 or 4)
* @throws Exception * @throws Exception
*/ */
@ -528,38 +565,34 @@ public void loadConnectionsOrLinks(int version) throws Exception {
switch (version) { switch (version) {
case 2: case 2:
// Insert two connections - CA and CB // Insert two connections - CA and CB
runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) " runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) " + "VALUES('CA', 1)");
+ "VALUES('CA', 1)"); runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) " + "VALUES('CB', 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+ "VALUES('CB', 1)");
for(String ci : new String[] {"1", "2"}) { for (String ci : new String[] { "1", "2" }) {
for(String i : new String[] {"1", "3", "13", "15"}) { for (String i : new String[] { "1", "3", "13", "15" }) {
runQuery("INSERT INTO SQOOP.SQ_CONNECTION_INPUT" runQuery("INSERT INTO SQOOP.SQ_CONNECTION_INPUT"
+ "(SQNI_CONNECTION, SQNI_INPUT, SQNI_VALUE) " + "(SQNI_CONNECTION, SQNI_INPUT, SQNI_VALUE) " + "VALUES(" + ci + ", " + i
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')"); + ", 'Value" + i + "')");
} }
} }
break; break;
case 4: case 4:
// Insert two links - CA and CB // Insert two links - CA and CB
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)");
+ "VALUES('CA', 1)"); runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CB', 1)");
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
+ "VALUES('CB', 1)");
for (String ci : new String[]{"1", "2"}) { for (String ci : new String[] { "1", "2" }) {
for (String i : new String[]{"1", "3", "13", "15"}) { for (String i : new String[] { "1", "3", "13", "15" }) {
runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT" runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT"
+ "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) " + "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) " + "VALUES(" + ci + ", " + i
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')"); + ", 'Value" + i + "')");
}
} }
break; }
break;
default: default:
throw new AssertionError("Invalid connector and framework version: " + version); throw new AssertionError("Invalid connector and framework version: " + version);
} }
} }
@ -568,74 +601,65 @@ public void loadLinksForLatestVersion() throws Exception {
} }
/** /**
* Load testing job objects into repository. * Load testing job objects into repository.
*
* @param version system version (2 or 4) * @param version system version (2 or 4)
* @throws Exception * @throws Exception
*/ */
public void loadJobs(int version) throws Exception { public void loadJobs(int version) throws Exception {
int index = 0; int index = 0;
switch (version) { switch (version) {
case 2: case 2:
for(String type : new String[] {"IMPORT", "EXPORT"}) { for (String type : new String[] { "IMPORT", "EXPORT" }) {
for(String name : new String[] {"JA", "JB"} ) { for (String name : new String[] { "JA", "JB" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_CONNECTION, SQB_TYPE)" runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_CONNECTION, SQB_TYPE)" + " VALUES('"
+ " VALUES('" + type + "_" + name + "', 1, '" + type + "')"); + type + "_" + name + "', 1, '" + type + "')");
} }
}
// Import inputs
for (String ci : new String[] { "1", "2" }) {
for (String i : new String[] { "5", "7", "17", "19" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT" + "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
// Export inputs
for (String ci : new String[] { "3", "4" }) {
for (String i : new String[] { "9", "11" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT" + "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
case 4:
for (String name : new String[] { "JA", "JB", "JC", "JD" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)" + " VALUES('"
+ name + index + "', 1, 1)");
}
// Odd IDs inputs have values
for (String ci : new String[] { "1", "2", "3", "4" }) {
for (String i : new String[] { "5", "9", "13" }) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT" + "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
} }
// Import inputs for (String i : new String[] { "7", "11", "15" }) {
for(String ci : new String[] {"1", "2"}) { runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT" + "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
for(String i : new String[] {"5", "7", "17", "19"}) { + "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
} }
}
break;
// Export inputs default:
for(String ci : new String[] {"3", "4"}) { throw new AssertionError("Invalid connector and framework version: " + version);
for(String i : new String[] {"9", "11"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
case 4:
for (String name : new String[]{"JA", "JB", "JC", "JD"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_FROM_LINK, SQB_TO_LINK)"
+ " VALUES('" + name + index + "', 1, 1)");
}
// Odd IDs inputs have values
for (String ci : new String[]{"1", "2", "3", "4"}) {
for (String i : new String[]{"5", "9", "13"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
for (String i : new String[]{"7", "11", "15"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
break;
default:
throw new AssertionError("Invalid connector and framework version: " + version);
} }
} }
/** /**
* testing job with non unique name objects into repository. * testing job with non unique name objects into repository.
*
* @param version
* system version 4
* @throws Exception * @throws Exception
*/ */
public void loadNonUniqueJobsInVersion4() throws Exception { public void loadNonUniqueJobsInVersion4() throws Exception {
@ -649,23 +673,18 @@ public void loadNonUniqueJobsInVersion4() throws Exception {
/** /**
* testing link with non unique name objects into repository. * testing link with non unique name objects into repository.
*
* @param version
* system version 4
* @throws Exception * @throws Exception
*/ */
public void loadNonUniqueLinksInVersion4() throws Exception { public void loadNonUniqueLinksInVersion4() throws Exception {
// Insert two links - CA and CA // Insert two links - CA and CA
runInsertQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)"); runInsertQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
+ "VALUES('CA', 1)");
runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)"); runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) " + "VALUES('CA', 1)");
} }
/** /**
* testing configurable with non unique name objects into repository. * testing configurable with non unique name objects into repository.
*
* @param version
* system version 4
* @throws Exception * @throws Exception
*/ */
public void loadNonUniqueConfigurablesInVersion4() throws Exception { public void loadNonUniqueConfigurablesInVersion4() throws Exception {
@ -678,6 +697,48 @@ public void loadNonUniqueConfigurablesInVersion4() throws Exception {
} }
/**
* testing config with non unique nam/type objects into repository.
* @throws Exception
*/
public void loadNonUniqueConfigNameTypeInVersion4() throws Exception {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'C1', 'LINK', 0)");
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'C1', 'LINK', 0)");
}
/**
* testing config with non unique nam/type objects into repository.
* @throws Exception
*/
public void loadNonUniqueConfigNameButUniqueTypeInVersion4() throws Exception {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'C1', 'LINK', 0)");
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'C1', 'JOB', 0)");
}
/**
* testing config with non unique name/type objects into repository.
* @throws Exception
*/
public void loadNonUniqueConfigNameAndTypeButUniqueConfigurableInVersion4() throws Exception {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(1, 'C1', 'LINK', 0)");
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(2, 'C1', 'LINK', 0)");
}
public void loadJobsForLatestVersion() throws Exception { public void loadJobsForLatestVersion() throws Exception {
loadJobs(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION); loadJobs(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
} }
@ -689,50 +750,37 @@ public void addConnectorB() throws Exception {
// Connector entry // Connector entry
Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)" Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')"); + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)"); runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES ("
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)"); + connectorId + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES ("
+ connectorId + ", 2)");
} }
/** /**
* Load testing submissions into the repository. * Load testing submissions into the repository.
*
* @throws Exception * @throws Exception
*/ */
public void loadSubmissions() throws Exception { public void loadSubmissions() throws Exception {
runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP " runQuery("INSERT INTO SQOOP.SQ_COUNTER_GROUP " + "(SQG_NAME) " + "VALUES" + "('gA'), ('gB')");
+ "(SQG_NAME) "
+ "VALUES"
+ "('gA'), ('gB')"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER " runQuery("INSERT INTO SQOOP.SQ_COUNTER " + "(SQR_NAME) " + "VALUES" + "('cA'), ('cB')");
+ "(SQR_NAME) "
+ "VALUES"
+ "('cA'), ('cB')"
);
runQuery("INSERT INTO SQOOP.SQ_SUBMISSION" runQuery("INSERT INTO SQOOP.SQ_SUBMISSION"
+ "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE," + "(SQS_JOB, SQS_STATUS, SQS_CREATION_DATE, SQS_UPDATE_DATE,"
+ " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION," + " SQS_EXTERNAL_ID, SQS_EXTERNAL_LINK, SQS_EXCEPTION," + " SQS_EXCEPTION_TRACE)"
+ " SQS_EXCEPTION_TRACE)" + "VALUES " + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1',"
+ "VALUES " + "NULL, NULL, NULL),"
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-01 01:01:01', 'job_1'," + "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2',"
+ "NULL, NULL, NULL)," + " NULL, NULL, NULL),"
+ "(2, 'SUCCEEDED', '2012-01-01 01:01:01', '2012-01-02 01:01:01', 'job_2'," + "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3',"
+ " NULL, NULL, NULL)," + " NULL, NULL, NULL),"
+ "(3, 'FAILED', '2012-01-01 01:01:01', '2012-01-03 01:01:01', 'job_3'," + "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4',"
+ " NULL, NULL, NULL)," + " NULL, NULL, NULL),"
+ "(4, 'UNKNOWN', '2012-01-01 01:01:01', '2012-01-04 01:01:01', 'job_4'," + "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5',"
+ " NULL, NULL, NULL)," + " NULL, NULL, NULL)");
+ "(1, 'RUNNING', '2012-01-01 01:01:01', '2012-01-05 01:01:01', 'job_5',"
+ " NULL, NULL, NULL)"
);
runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION " runQuery("INSERT INTO SQOOP.SQ_COUNTER_SUBMISSION "
+ "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) " + "(SQRS_GROUP, SQRS_COUNTER, SQRS_SUBMISSION, SQRS_VALUE) " + "VALUES" + "(1, 1, 4, 300)");
+ "VALUES"
+ "(1, 1, 4, 300)"
);
} }
@ -741,79 +789,76 @@ protected MConnector getConnector() {
} }
protected MConnector getConnector(boolean from, boolean to) { protected MConnector getConnector(boolean from, boolean to) {
MFromConfig fromJobForms = null; MFromConfig fromConfig = null;
MToConfig toJobForms = null; MToConfig toConfig = null;
if (from) { if (from) {
fromJobForms = getFromConfig(); fromConfig = getFromConfig();
} }
if (to) { if (to) {
toJobForms = getToConfig(); toConfig = getToConfig();
} }
return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test", return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test", getLinkConfig(), fromConfig,
getLinkConfig(), fromJobForms, toJobForms); toConfig);
} }
protected MDriver getDriver() { protected MDriver getDriver() {
return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION); return new MDriver(getDriverConfig(), DriverBean.CURRENT_DRIVER_VERSION);
} }
protected void fillLink(MLink link) { protected void fillLink(MLink link) {
List<MConfig> configs = link.getConnectorLinkConfig().getConfigs(); List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1"); ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2"); ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
} }
protected void fillJob(MJob job) { protected void fillJob(MJob job) {
List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs(); List<MConfig> configs = job.getJobConfig(Direction.FROM).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1"); ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2"); ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getJobConfig(Direction.TO).getConfigs(); configs = job.getJobConfig(Direction.TO).getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value1"); ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value2"); ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value2");
configs = job.getDriverConfig().getConfigs(); configs = job.getDriverConfig().getConfigs();
((MStringInput)configs.get(0).getInputs().get(0)).setValue("Value13"); ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Value13");
((MStringInput)configs.get(1).getInputs().get(0)).setValue("Value15"); ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Value15");
} }
protected MLinkConfig getLinkConfig() { protected MLinkConfig getLinkConfig() {
return new MLinkConfig(getConfigs()); return new MLinkConfig(getConfigs("l1", "l2"));
} }
protected MFromConfig getFromConfig() { protected MFromConfig getFromConfig() {
return new MFromConfig(getConfigs()); return new MFromConfig(getConfigs("from1", "from2"));
} }
protected MToConfig getToConfig() { protected MToConfig getToConfig() {
return new MToConfig(getConfigs()); return new MToConfig(getConfigs("to1", "to2"));
} }
protected MDriverConfig getDriverConfig() { protected MDriverConfig getDriverConfig() {
return new MDriverConfig(getConfigs()); return new MDriverConfig(getConfigs("d1", "d2"));
} }
protected List<MConfig> getConfigs() { protected List<MConfig> getConfigs(String configName1, String configName2) {
List<MConfig> jobConfigs = new LinkedList<MConfig>(); List<MConfig> configs = new LinkedList<MConfig>();
List<MInput<?>> inputs = new LinkedList<MInput<?>>(); List<MInput<?>> inputs = new LinkedList<MInput<?>>();
MInput input = new MStringInput("I1", false, (short) 30); MInput input = new MStringInput("I1", false, (short) 30);
inputs.add(input); inputs.add(input);
input = new MMapInput("I2", false); input = new MMapInput("I2", false);
inputs.add(input); inputs.add(input);
// adding the from part of the job config configs.add(new MConfig(configName1, inputs));
jobConfigs.add(new MConfig("C1", inputs));
// to
inputs = new LinkedList<MInput<?>>(); inputs = new LinkedList<MInput<?>>();
input = new MStringInput("I3", false, (short)30); input = new MStringInput("I3", false, (short) 30);
inputs.add(input); inputs.add(input);
input = new MMapInput("I4", false); input = new MMapInput("I4", false);
inputs.add(input); inputs.add(input);
// adding the to part of the job config configs.add(new MConfig(configName2, inputs));
jobConfigs.add(new MConfig("C2", inputs));
return jobConfigs; return configs;
} }
/** /**
@ -830,15 +875,15 @@ protected long countForTable(String table) throws Exception {
try { try {
stmt = getDerbyDatabaseConnection().createStatement(); stmt = getDerbyDatabaseConnection().createStatement();
rs = stmt.executeQuery("SELECT COUNT(*) FROM "+ table); rs = stmt.executeQuery("SELECT COUNT(*) FROM " + table);
rs.next(); rs.next();
return rs.getLong(1); return rs.getLong(1);
} finally { } finally {
if(stmt != null) { if (stmt != null) {
stmt.close(); stmt.close();
} }
if(rs != null) { if (rs != null) {
rs.close(); rs.close();
} }
} }
@ -846,13 +891,11 @@ protected long countForTable(String table) throws Exception {
/** /**
* Assert row count for given table. * Assert row count for given table.
*
* @param table Table name * @param table Table name
* @param expected Expected number of rows * @param expected Expected number of rows
* @throws Exception * @throws Exception
*/ */
protected void assertCountForTable(String table, long expected) protected void assertCountForTable(String table, long expected) throws Exception {
throws Exception {
long count = countForTable(table); long count = countForTable(table);
assertEquals(expected, count); assertEquals(expected, count);
} }
@ -860,21 +903,20 @@ protected void assertCountForTable(String table, long expected)
/** /**
* Printout repository content for advance debugging. * Printout repository content for advance debugging.
* *
* This method is currently unused, but might be helpful in the future, so * This method is currently unused, but might be helpful in the future, so I'm
* I'm letting it here. * letting it here.
* *
* @throws Exception * @throws Exception
*/ */
protected void generateDatabaseState() throws Exception { protected void generateDatabaseState() throws Exception {
for(String tbl : new String[] {"SQ_CONNECTOR", "SQ_CONFIG", "SQ_INPUT", for (String tbl : new String[] { "SQ_CONNECTOR", "SQ_CONFIG", "SQ_INPUT", "SQ_LINK",
"SQ_LINK", "SQ_LINK_INPUT", "SQ_JOB", "SQ_JOB_INPUT"}) { "SQ_LINK_INPUT", "SQ_JOB", "SQ_JOB_INPUT" }) {
generateTableState("SQOOP." + tbl); generateTableState("SQOOP." + tbl);
} }
} }
/** /**
* Printout one single table. * Printout one single table.
*
* @param table Table name * @param table Table name
* @throws Exception * @throws Exception
*/ */
@ -892,15 +934,15 @@ protected void generateTableState(String table) throws Exception {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
System.out.println("Table " + table + ":"); System.out.println("Table " + table + ":");
for(int i = 1; i <= rsmt.getColumnCount(); i++) { for (int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rsmt.getColumnName(i)).append(" "); sb.append("| ").append(rsmt.getColumnName(i)).append(" ");
} }
sb.append("|"); sb.append("|");
System.out.println(sb.toString()); System.out.println(sb.toString());
while(rs.next()) { while (rs.next()) {
sb = new StringBuilder(); sb = new StringBuilder();
for(int i = 1; i <= rsmt.getColumnCount(); i++) { for (int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rs.getString(i)).append(" "); sb.append("| ").append(rs.getString(i)).append(" ");
} }
sb.append("|"); sb.append("|");
@ -910,12 +952,12 @@ protected void generateTableState(String table) throws Exception {
System.out.println(""); System.out.println("");
} finally { } finally {
if(rs != null) { if (rs != null) {
rs.close(); rs.close();
} }
if(ps != null) { if (ps != null) {
ps.close(); ps.close();
} }
} }
} }
} }

View File

@ -46,10 +46,8 @@ public void setUp() throws Exception {
public void testFindConnector() throws Exception { public void testFindConnector() throws Exception {
// On empty repository, no connectors should be there // On empty repository, no connectors should be there
assertNull(handler.findConnector("A", getDerbyDatabaseConnection())); assertNull(handler.findConnector("A", getDerbyDatabaseConnection()));
assertNull(handler.findConnector("B", getDerbyDatabaseConnection()));
// Load connector into repository // Load connector into repository
loadConnectorAndDriverConfig(); addConnectorA();
// Retrieve it // Retrieve it
MConnector connector = handler.findConnector("A", getDerbyDatabaseConnection()); MConnector connector = handler.findConnector("A", getDerbyDatabaseConnection());
assertNotNull(connector); assertNotNull(connector);

View File

@ -72,7 +72,8 @@ public void testEntitySerialization() throws Exception {
assertNotSame(connector.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT); assertNotSame(connector.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT);
// Retrieve registered connector // Retrieve registered connector
MConnector retrieved = handler.findConnector(connector.getUniqueName(), getDerbyDatabaseConnection()); MConnector retrieved = handler.findConnector(connector.getUniqueName(),
getDerbyDatabaseConnection());
assertNotNull(retrieved); assertNotNull(retrieved);
// Original and retrieved connectors should be the same // Original and retrieved connectors should be the same
@ -97,12 +98,12 @@ public void testEntityDataSerialization() throws Exception {
// Connection object with all various values // Connection object with all various values
MLink link = new MLink(connector.getPersistenceId(), connector.getLinkConfig()); MLink link = new MLink(connector.getPersistenceId(), connector.getLinkConfig());
MLinkConfig forms = link.getConnectorLinkConfig(); MLinkConfig linkConfig = link.getConnectorLinkConfig();
forms.getStringInput("f.String").setValue("A"); linkConfig.getStringInput("l1.String").setValue("A");
forms.getMapInput("f.Map").setValue(map); linkConfig.getMapInput("l1.Map").setValue(map);
forms.getIntegerInput("f.Integer").setValue(1); linkConfig.getIntegerInput("l1.Integer").setValue(1);
forms.getBooleanInput("f.Boolean").setValue(true); linkConfig.getBooleanInput("l1.Boolean").setValue(true);
forms.getEnumInput("f.Enum").setValue("YES"); linkConfig.getEnumInput("l1.Enum").setValue("YES");
// Create the link in repository // Create the link in repository
handler.createLink(link, getDerbyDatabaseConnection()); handler.createLink(link, getDerbyDatabaseConnection());
@ -110,12 +111,12 @@ public void testEntityDataSerialization() throws Exception {
// Retrieve created link // Retrieve created link
MLink retrieved = handler.findLink(link.getPersistenceId(), getDerbyDatabaseConnection()); MLink retrieved = handler.findLink(link.getPersistenceId(), getDerbyDatabaseConnection());
forms = retrieved.getConnectorLinkConfig(); linkConfig = retrieved.getConnectorLinkConfig();
assertEquals("A", forms.getStringInput("f.String").getValue()); assertEquals("A", linkConfig.getStringInput("l1.String").getValue());
assertEquals(map, forms.getMapInput("f.Map").getValue()); assertEquals(map, linkConfig.getMapInput("l1.Map").getValue());
assertEquals(1, (int)forms.getIntegerInput("f.Integer").getValue()); assertEquals(1, (int) linkConfig.getIntegerInput("l1.Integer").getValue());
assertEquals(true, (boolean)forms.getBooleanInput("f.Boolean").getValue()); assertEquals(true, (boolean) linkConfig.getBooleanInput("l1.Boolean").getValue());
assertEquals("YES", forms.getEnumInput("f.Enum").getValue()); assertEquals("YES", linkConfig.getEnumInput("l1.Enum").getValue());
} }
/** /**
@ -124,30 +125,30 @@ public void testEntityDataSerialization() throws Exception {
* @return Forms with all data types * @return Forms with all data types
*/ */
@Override @Override
protected List<MConfig> getConfigs() { protected List<MConfig> getConfigs(String configName1, String configName2) {
List<MConfig> forms = new LinkedList<MConfig>(); List<MConfig> configs = new LinkedList<MConfig>();
List<MInput<?>> inputs; List<MInput<?>> inputs;
MInput input; MInput input;
inputs = new LinkedList<MInput<?>>(); inputs = new LinkedList<MInput<?>>();
input = new MStringInput("f.String", false, (short)30); input = new MStringInput(configName1 + ".String", false, (short) 30);
inputs.add(input); inputs.add(input);
input = new MMapInput("f.Map", false); input = new MMapInput(configName1 + ".Map", false);
inputs.add(input); inputs.add(input);
input = new MIntegerInput("f.Integer", false); input = new MIntegerInput(configName1 + ".Integer", false);
inputs.add(input); inputs.add(input);
input = new MBooleanInput("f.Boolean", false); input = new MBooleanInput(configName1 + ".Boolean", false);
inputs.add(input); inputs.add(input);
input = new MEnumInput("f.Enum", false, new String[] {"YES", "NO"}); input = new MEnumInput(configName1 + ".Enum", false, new String[] { "YES", "NO" });
inputs.add(input); inputs.add(input);
forms.add(new MConfig("f", inputs)); configs.add(new MConfig(configName1, inputs));
return forms; return configs;
} }
} }

View File

@ -39,7 +39,8 @@ public void setUp() throws Exception {
@Test @Test
public void testHasLatestRepositoryVersion() throws Exception { public void testHasLatestRepositoryVersion() throws Exception {
assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection())); assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
createOrUpgradeSchemaForLatestVersion(); // Test code is building the structures createOrUpgradeSchemaForLatestVersion(); // Test code is building the
// structures
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection())); assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
} }
@ -50,26 +51,58 @@ public void testCreateorUpdateRepositorySchema() throws Exception {
assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection())); assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
} }
@Test(expected=SQLIntegrityConstraintViolationException.class) @Test(expected = SQLIntegrityConstraintViolationException.class)
public void testUpgradeVersion4WithNonUniqueJobNameFailure() throws Exception { public void testUpgradeVersion4WithNonUniqueJobNameFailure() throws Exception {
super.createOrUpgradeSchema(4); super.createOrUpgradeSchema(4);
// try loading duplicate job names in version 4 and it should throw an exception // try loading duplicate job names in version 4 and it should throw an
// exception
super.loadNonUniqueJobsInVersion4(); super.loadNonUniqueJobsInVersion4();
} }
@Test(expected=SQLIntegrityConstraintViolationException.class)
@Test(expected = SQLIntegrityConstraintViolationException.class)
public void testUpgradeVersion4WithNonUniqueLinkNamesAdded() throws Exception { public void testUpgradeVersion4WithNonUniqueLinkNamesAdded() throws Exception {
super.createOrUpgradeSchema(4); super.createOrUpgradeSchema(4);
// try loading duplicate link names in version 4 and it should throw an exception // try loading duplicate link names in version 4 and it should throw an
// exception
super.loadNonUniqueLinksInVersion4(); super.loadNonUniqueLinksInVersion4();
} }
@Test(expected=SQLIntegrityConstraintViolationException.class) @Test(expected = SQLIntegrityConstraintViolationException.class)
public void testUpgradeVersion4WithNonUniqueConfigurableNamesAdded() throws Exception { public void testUpgradeVersion4WithNonUniqueConfigurableNamesAdded() throws Exception {
super.createOrUpgradeSchema(4); super.createOrUpgradeSchema(4);
// try loading duplicate configurable names in version 4 and it should throw an exception // try loading duplicate configurable names in version 4 and it should throw
// an exception
super.loadNonUniqueConfigurablesInVersion4(); super.loadNonUniqueConfigurablesInVersion4();
} }
@Test(expected = SQLIntegrityConstraintViolationException.class)
public void testUpgradeVersion4WithNonUniqueConfigNameAndTypeAdded() throws Exception {
super.createOrUpgradeSchema(4);
super.addConnectorB();
// try loading duplicate config names in version 4 and it should throw an
// exception
super.loadNonUniqueConfigNameTypeInVersion4();
}
@Test
public void testUpgradeVersion4WithNonUniqueConfigNameButUniqueTypeAdded() throws Exception {
super.createOrUpgradeSchema(4);
super.addConnectorB();
// try loading duplicate config names but unique type, hence no exception
super.loadNonUniqueConfigNameButUniqueTypeInVersion4();
}
@Test
public void testUpgradeVersion4WithNonUniqueConfigNameAndTypeButUniqueConfigurable()
throws Exception {
super.createOrUpgradeSchema(4);
super.addConnectorA();
super.addConnectorB();
// try loading duplicate config names and type but unique connector, hence
// no exception
super.loadNonUniqueConfigNameAndTypeButUniqueConfigurableInVersion4();
}
@Test @Test
public void testUpgradeRepoVersion2ToVersion4() throws Exception { public void testUpgradeRepoVersion2ToVersion4() throws Exception {
// in case of version 2 schema there is no unique job/ link constraint // in case of version 2 schema there is no unique job/ link constraint
@ -88,7 +121,7 @@ protected long registerHdfsConnector(Connection conn) {
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)" runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')"); + "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')");
return 2L; return 2L;
} catch(Exception e) { } catch (Exception e) {
return -1L; return -1L;
} }
} }