5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-06 14:03:08 +08:00

SQOOP-612 Provide unit tests for derby metadata repository

This commit is contained in:
Bilung Lee 2012-09-26 10:34:43 -07:00
parent 866d46dfe7
commit 982c76e03b
8 changed files with 1039 additions and 31 deletions

View File

@ -187,6 +187,7 @@ limitations under the License.
<exclude>.gitignore</exclude> <exclude>.gitignore</exclude>
<exclude>.idea/</exclude> <exclude>.idea/</exclude>
<exclude>*.iml</exclude> <exclude>*.iml</exclude>
<exclude>*.log</exclude>
</excludes> </excludes>
</configuration> </configuration>
</execution> </execution>

View File

@ -61,12 +61,6 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
private static final Logger LOG = private static final Logger LOG =
Logger.getLogger(DerbyRepositoryHandler.class); Logger.getLogger(DerbyRepositoryHandler.class);
private static final String SCHEMA_SQOOP = "SQOOP";
private static final String QUERY_SYSSCHEMA_SQOOP =
"SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
+ SCHEMA_SQOOP + "'";
private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME = private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME =
"org.apache.derby.jdbc.EmbeddedDriver"; "org.apache.derby.jdbc.EmbeddedDriver";
@ -177,8 +171,8 @@ public synchronized void shutdown() {
+ "45000 as expected."); + "45000 as expected.");
} }
} else { } else {
LOG.warn("Even though embedded Derby drvier was loaded, the connect " LOG.warn("Even though embedded Derby driver was loaded, the connect "
+ "URL is of an unexpected form: " + connectUrl + ". Therfore no " + "URL is of an unexpected form: " + connectUrl + ". Therefore no "
+ "attempt will be made to shutdown embedded Derby instance."); + "attempt will be made to shutdown embedded Derby instance.");
} }
} }
@ -209,7 +203,7 @@ public boolean schemaExists() {
try { try {
connection = dataSource.getConnection(); connection = dataSource.getConnection();
stmt = connection.createStatement(); stmt = connection.createStatement();
ResultSet rset = stmt.executeQuery(QUERY_SYSSCHEMA_SQOOP); ResultSet rset = stmt.executeQuery(QUERY_SYSSCHEMA_SQOOP);
if (!rset.next()) { if (!rset.next()) {
LOG.warn("Schema for SQOOP does not exist"); LOG.warn("Schema for SQOOP does not exist");
@ -276,7 +270,7 @@ public MConnector findConnector(String shortName, Connection conn) {
Map<MJob.Type, List<MForm>> jobForms = Map<MJob.Type, List<MForm>> jobForms =
new HashMap<MJob.Type, List<MForm>>(); new HashMap<MJob.Type, List<MForm>>();
loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt); loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
mc = new MConnector(connectorName, connectorClassName, mc = new MConnector(connectorName, connectorClassName,
new MConnectionForms(connectionForms), new MConnectionForms(connectionForms),
@ -353,7 +347,7 @@ public MFramework findFramework(Connection conn) {
Map<MJob.Type, List<MForm>> jobForms = Map<MJob.Type, List<MForm>> jobForms =
new HashMap<MJob.Type, List<MForm>>(); new HashMap<MJob.Type, List<MForm>>();
loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt); loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
// Return nothing If there aren't any framework metadata // Return nothing If there aren't any framework metadata
if(connectionForms.isEmpty() && jobForms.isEmpty()) { if(connectionForms.isEmpty() && jobForms.isEmpty()) {
@ -437,6 +431,8 @@ public void createConnection(MConnection connection, Connection conn) {
connection.getFrameworkPart().getForms(), connection.getFrameworkPart().getForms(),
conn); conn);
connection.setPersistenceId(connectionId);
} catch (SQLException ex) { } catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex); throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
} finally { } finally {
@ -607,6 +603,8 @@ public void createJob(MJob job, Connection conn) {
job.getFrameworkPart().getForms(), job.getFrameworkPart().getForms(),
conn); conn);
job.setPersistenceId(jobId);
} catch (SQLException ex) { } catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0026, ex); throw new SqoopException(DerbyRepoError.DERBYREPO_0026, ex);
} finally { } finally {
@ -764,8 +762,9 @@ private List<MConnection> loadConnections(PreparedStatement stmt,
conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK); conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_CONNECTION_INPUT); inputFetchStmt = conn.prepareStatement(STMT_FETCH_CONNECTION_INPUT);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms inputFetchStmt.setLong(1, id);
inputFetchStmt.setLong(2, id); //inputFetchStmt.setLong(2, XXX); // Will be filled by loadForms
inputFetchStmt.setLong(3, id);
List<MForm> connectorConnForms = new ArrayList<MForm>(); List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>(); List<MForm> frameworkConnForms = new ArrayList<MForm>();
@ -776,9 +775,9 @@ private List<MConnection> loadConnections(PreparedStatement stmt,
= new HashMap<MJob.Type, List<MForm>>(); = new HashMap<MJob.Type, List<MForm>>();
loadForms(connectorConnForms, connectorJobForms, loadForms(connectorConnForms, connectorJobForms,
formConnectorFetchStmt, inputFetchStmt); formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms, loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt); formFrameworkFetchStmt, inputFetchStmt, 2);
MConnection connection = new MConnection(connectorId, MConnection connection = new MConnection(connectorId,
new MConnectionForms(connectorConnForms), new MConnectionForms(connectorConnForms),
@ -827,8 +826,9 @@ private List<MJob> loadJobs(PreparedStatement stmt,
conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK); conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
inputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
inputFetchStmt.setLong(2, id); inputFetchStmt.setLong(3, id);
List<MForm> connectorConnForms = new ArrayList<MForm>(); List<MForm> connectorConnForms = new ArrayList<MForm>();
List<MForm> frameworkConnForms = new ArrayList<MForm>(); List<MForm> frameworkConnForms = new ArrayList<MForm>();
@ -839,9 +839,9 @@ private List<MJob> loadJobs(PreparedStatement stmt,
= new HashMap<MJob.Type, List<MForm>>(); = new HashMap<MJob.Type, List<MForm>>();
loadForms(connectorConnForms, connectorJobForms, loadForms(connectorConnForms, connectorJobForms,
formConnectorFetchStmt, inputFetchStmt); formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms, loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt); formFrameworkFetchStmt, inputFetchStmt, 2);
MJob job = new MJob(connectorId, connectionId, type, MJob job = new MJob(connectorId, connectionId, type,
new MJobForms(type, connectorJobForms.get(type)), new MJobForms(type, connectorJobForms.get(type)),
@ -1018,13 +1018,14 @@ private void runQuery(String query) {
public void loadForms(List<MForm> connectionForms, public void loadForms(List<MForm> connectionForms,
Map<MJob.Type, List<MForm>> jobForms, Map<MJob.Type, List<MForm>> jobForms,
PreparedStatement formFetchStmt, PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt) throws SQLException { PreparedStatement inputFetchStmt,
int formPosition) throws SQLException {
// Get list of structures from database // Get list of structures from database
ResultSet rsetForm = formFetchStmt.executeQuery(); ResultSet rsetForm = formFetchStmt.executeQuery();
while (rsetForm.next()) { while (rsetForm.next()) {
long formId = rsetForm.getLong(1); long formId = rsetForm.getLong(1);
long formConnectorId = rsetForm.getLong(2); Long formConnectorId = rsetForm.getLong(2);
String operation = rsetForm.getString(3); String operation = rsetForm.getString(3);
String formName = rsetForm.getString(4); String formName = rsetForm.getString(4);
String formType = rsetForm.getString(5); String formType = rsetForm.getString(5);
@ -1034,7 +1035,7 @@ public void loadForms(List<MForm> connectionForms,
MForm mf = new MForm(formName, formInputs); MForm mf = new MForm(formName, formInputs);
mf.setPersistenceId(formId); mf.setPersistenceId(formId);
inputFetchStmt.setLong(1, formId); inputFetchStmt.setLong(formPosition, formId);
ResultSet rsetInput = inputFetchStmt.executeQuery(); ResultSet rsetInput = inputFetchStmt.executeQuery();
while (rsetInput.next()) { while (rsetInput.next()) {
@ -1075,7 +1076,11 @@ public void loadForms(List<MForm> connectionForms,
if (mf.getInputs().size() != inputIndex) { if (mf.getInputs().size() != inputIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0009, throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
"form: " + mf + "; input: " + input); "form: " + mf
+ "; input: " + input
+ "; index: " + inputIndex
+ "; expected: " + mf.getInputs().size()
);
} }
mf.getInputs().add(input); mf.getInputs().add(input);
@ -1083,7 +1088,9 @@ public void loadForms(List<MForm> connectionForms,
if (mf.getInputs().size() == 0) { if (mf.getInputs().size() == 0) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0008, throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
"connector-" + formConnectorId + ": " + mf); "connector-" + formConnectorId
+ "; form: " + mf
);
} }
MFormType mft = MFormType.valueOf(formType); MFormType mft = MFormType.valueOf(formType);
@ -1091,7 +1098,11 @@ public void loadForms(List<MForm> connectionForms,
case CONNECTION: case CONNECTION:
if (connectionForms.size() != formIndex) { if (connectionForms.size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010, throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector-i" + formConnectorId + "; form: " + mf); "connector-" + formConnectorId
+ "; form: " + mf
+ "; index: " + formIndex
+ "; expected: " + connectionForms.size()
);
} }
connectionForms.add(mf); connectionForms.add(mf);
break; break;
@ -1103,7 +1114,11 @@ public void loadForms(List<MForm> connectionForms,
if (jobForms.get(jobType).size() != formIndex) { if (jobForms.get(jobType).size() != formIndex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0010, throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
"connector-" + formConnectorId + "; form: " + mf); "connector-" + formConnectorId
+ "; form: " + mf
+ "; index: " + formIndex
+ "; expected: " + jobForms.get(jobType).size()
);
} }
jobForms.get(jobType).add(mf); jobForms.get(jobType).add(mf);
break; break;

View File

@ -122,6 +122,10 @@ public final class DerbySchemaQuery {
public static final String QUERY_CREATE_SCHEMA_SQOOP = public static final String QUERY_CREATE_SCHEMA_SQOOP =
"CREATE SCHEMA " + SCHEMA_SQOOP; "CREATE SCHEMA " + SCHEMA_SQOOP;
public static final String QUERY_SYSSCHEMA_SQOOP =
"SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
+ SCHEMA_SQOOP + "'";
// DDL: Create table SQ_CONNECTOR // DDL: Create table SQ_CONNECTOR
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR = public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID "CREATE TABLE " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID
@ -223,10 +227,10 @@ public final class DerbySchemaQuery {
+ ", " + COLUMN_SQI_STRMASK + ", " + COLUMN_SQI_STRLENGTH + ", " + COLUMN_SQI_STRMASK + ", " + COLUMN_SQI_STRLENGTH
+ ", " + COLUMN_SQNI_VALUE + " FROM " + TABLE_SQ_INPUT + ", " + COLUMN_SQNI_VALUE + " FROM " + TABLE_SQ_INPUT
+ " LEFT OUTER JOIN " + TABLE_SQ_CONNECTION_INPUT + " ON " + " LEFT OUTER JOIN " + TABLE_SQ_CONNECTION_INPUT + " ON "
+ COLUMN_SQNI_INPUT + " = " + COLUMN_SQI_ID + " WHERE " + COLUMN_SQNI_INPUT + " = " + COLUMN_SQI_ID + " AND "
+ COLUMN_SQI_FORM + " = ? AND (" + COLUMN_SQNI_CONNECTION + COLUMN_SQNI_CONNECTION + " = ? WHERE " + COLUMN_SQI_FORM + " = ? AND ("
+ " = ? OR " + COLUMN_SQNI_CONNECTION + " IS NULL) ORDER BY " + COLUMN_SQNI_CONNECTION + " = ? OR " + COLUMN_SQNI_CONNECTION
+ COLUMN_SQI_INDEX; + " IS NULL) ORDER BY " + COLUMN_SQI_INDEX;
// DML: Fetch inputs and values for a given job // DML: Fetch inputs and values for a given job
public static final String STMT_FETCH_JOB_INPUT = public static final String STMT_FETCH_JOB_INPUT =
@ -235,8 +239,8 @@ public final class DerbySchemaQuery {
+ ", " + COLUMN_SQI_STRMASK + ", " + COLUMN_SQI_STRLENGTH + ", " + COLUMN_SQI_STRMASK + ", " + COLUMN_SQI_STRLENGTH
+ ", " + COLUMN_SQBI_VALUE + " FROM " + TABLE_SQ_INPUT + ", " + COLUMN_SQBI_VALUE + " FROM " + TABLE_SQ_INPUT
+ " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT + " ON " + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT + " ON "
+ COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID + " WHERE " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID + " AND " + COLUMN_SQBI_JOB
+ COLUMN_SQI_FORM + " = ? AND (" + COLUMN_SQBI_JOB + " = ? WHERE " + COLUMN_SQI_FORM + " = ? AND (" + COLUMN_SQBI_JOB
+ " = ? OR " + COLUMN_SQBI_JOB + " IS NULL) ORDER BY " + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL) ORDER BY "
+ COLUMN_SQI_INDEX; + COLUMN_SQI_INDEX;

View File

@ -0,0 +1,418 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import junit.framework.TestCase;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.model.MStringInput;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedList;
import java.util.List;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
/**
* Abstract class with convenience methods for testing derby repository.
*/
abstract public class DerbyTestCase extends TestCase {
public static final String DERBY_DRIVER =
"org.apache.derby.jdbc.EmbeddedDriver";
public static final String JDBC_URL =
"jdbc:derby:memory:myDB";
private Connection connection;
@Override
public void setUp() throws Exception {
super.setUp();
// Create connection to the database
Class.forName(DERBY_DRIVER).newInstance();
connection = DriverManager.getConnection(getStartJdbcUrl());
}
@Override
public void tearDown() throws Exception {
// Close active connection
if(connection != null) {
connection.close();
}
try {
// Drop in memory database
DriverManager.getConnection(getStopJdbcUrl());
} catch (SQLException ex) {
// Dropping Derby database leads always to exception
}
// Call parent tear down
super.tearDown();
}
/**
* Create derby schema.
*
* @throws Exception
*/
protected void createSchema() throws Exception {
runQuery(QUERY_CREATE_SCHEMA_SQOOP);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
runQuery(QUERY_CREATE_TABLE_SQ_FORM);
runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION);
runQuery(QUERY_CREATE_TABLE_SQ_JOB);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
}
/**
* Run arbitrary query on derby memory repository.
*
* @param query Query to execute
* @throws Exception
*/
protected void runQuery(String query) throws Exception {
Statement stmt = null;
try {
stmt = getDerbyConnection().createStatement();
stmt.execute(query);
} finally {
if (stmt != null) {
stmt.close();
}
}
}
protected Connection getDerbyConnection() {
return connection;
}
protected String getJdbcUrl() {
return JDBC_URL;
}
protected String getStartJdbcUrl() {
return JDBC_URL + ";create=true";
}
protected String getStopJdbcUrl() {
return JDBC_URL + ";drop=true";
}
/**
* Load testing connector and framework metadata into repository.
*
* @throws Exception
*/
protected void loadConnectorAndFramework() throws Exception {
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS)"
+ "VALUES('A', 'org.apache.sqoop.test.A')");
for(String connector : new String[] {"1", "NULL"}) {
// Form entries
for(String operation : new String[] {"null", "'IMPORT'", "'EXPORT'"}) {
String type;
if(operation.equals("null")) {
type = "CONNECTION";
} else {
type = "JOB";
}
runQuery("INSERT INTO SQOOP.SQ_FORM"
+ "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
+ "VALUES("
+ connector + ", "
+ operation
+ ", 'F1', '"
+ type
+ "', 0)");
runQuery("INSERT INTO SQOOP.SQ_FORM"
+ "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
+ "VALUES("
+ connector + ", "
+ operation
+ ", 'F2', '"
+ type
+ "', 1)");
}
}
// Input entries
for(int x = 0; x < 2; x++ ) {
for(int i = 0; i < 3; i++) {
// First form
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I1', " + (x * 6 + i * 2 + 1) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I2', " + (x * 6 + i * 2 + 1) + ", 1, 'STRING', false, 30)");
// Second form
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I3', " + (x * 6 + i * 2 + 2) + ", 0, 'STRING', false, 30)");
runQuery("INSERT INTO SQOOP.SQ_INPUT"
+"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+ " VALUES('I4', " + (x * 6 + i * 2 + 2) + ", 1, 'STRING', false, 30)");
}
}
}
/**
* Load testing connection objects into metadata repository.
*
* @throws Exception
*/
public void loadConnections() throws Exception {
// Insert two connections - CA and CB
runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+ "VALUES('CA', 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+ "VALUES('CB', 1)");
for(String ci : new String[] {"1", "2"}) {
for(String i : new String[] {"1", "3", "13", "15"}) {
runQuery("INSERT INTO SQOOP.SQ_CONNECTION_INPUT"
+ "(SQNI_CONNECTION, SQNI_INPUT, SQNI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
}
/**
* Load testing job objects into metadata repository.
*
* @throws Exception
*/
public void loadJobs() throws Exception {
for(String type : new String[] {"IMPORT", "EXPORT"}) {
for(String name : new String[] {"JA", "JB"} ) {
runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_CONNECTION, SQB_TYPE)"
+ " VALUES('" + name + "', 1, '" + type + "')");
}
}
// Import inputs
for(String ci : new String[] {"1", "2"}) {
for(String i : new String[] {"5", "7", "17", "19"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
// Export inputs
for(String ci : new String[] {"3", "4"}) {
for(String i : new String[] {"9", "11", "21", "23"}) {
runQuery("INSERT INTO SQOOP.SQ_JOB_INPUT"
+ "(SQBI_JOB, SQBI_INPUT, SQBI_VALUE) "
+ "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
}
}
}
protected MConnector getConnector() {
return new MConnector("A", "org.apache.sqoop.test.A",
getConnectionForms(), getJobForms());
}
protected MFramework getFramework() {
return new MFramework(getConnectionForms(), getJobForms());
}
protected void fillConnection(MConnection connection) {
List<MForm> forms;
forms = connection.getConnectorPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Value2");
forms = connection.getFrameworkPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Value13");
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Value15");
}
protected void fillJob(MJob job) {
List<MForm> forms;
forms = job.getConnectorPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Value1");
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Value2");
forms = job.getFrameworkPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Value13");
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Value15");
}
protected List<MJobForms> getJobForms() {
List <MJobForms> jobForms = new LinkedList<MJobForms>();
jobForms.add(new MJobForms(MJob.Type.IMPORT, getForms()));
jobForms.add(new MJobForms(MJob.Type.EXPORT, getForms()));
return jobForms;
}
protected MConnectionForms getConnectionForms() {
return new MConnectionForms(getForms());
}
protected List<MForm> getForms() {
List<MForm> forms = new LinkedList<MForm>();
List<MInput<?>> inputs;
MInput input;
inputs = new LinkedList<MInput<?>>();
input = new MStringInput("I1", false, (short)30);
inputs.add(input);
input = new MStringInput("I2", false, (short)30);
inputs.add(input);
forms.add(new MForm("F1", inputs));
inputs = new LinkedList<MInput<?>>();
input = new MStringInput("I3", false, (short)30);
inputs.add(input);
input = new MStringInput("I4", false, (short)30);
inputs.add(input);
forms.add(new MForm("F2", inputs));
return forms;
}
/**
* Find out number of entries in given table.
*
* @param table Table name
* @return Number of rows in the table
* @throws Exception
*/
protected long countForTable(String table) throws Exception {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = getDerbyConnection().createStatement();
rs = stmt.executeQuery("SELECT COUNT(*) FROM "+ table);
rs.next();
return rs.getLong(1);
} finally {
if(stmt != null) {
stmt.close();
}
if(rs != null) {
rs.close();
}
}
}
/**
* Assert row count for given table.
*
* @param table Table name
* @param expected Expected number of rows
* @throws Exception
*/
protected void assertCountForTable(String table, long expected)
throws Exception {
long count = countForTable(table);
assertEquals(expected, count);
}
/**
* Printout repository content for advance debugging.
*
* This method is currently unused, but might be helpful in the future, so
* I'm letting it here.
*
* @throws Exception
*/
protected void generateDatabaseState() throws Exception {
for(String tbl : new String[] {"SQ_CONNECTOR", "SQ_FORM", "SQ_INPUT",
"SQ_CONNECTION", "SQ_CONNECTION_INPUT", "SQ_JOB", "SQ_JOB_INPUT"}) {
generateTableState("SQOOP." + tbl);
}
}
/**
* Printout one single table.
*
* @param table Table name
* @throws Exception
*/
protected void generateTableState(String table) throws Exception {
PreparedStatement ps = null;
ResultSet rs = null;
ResultSetMetaData rsmt = null;
try {
ps = getDerbyConnection().prepareStatement("SELECT * FROM " + table);
rs = ps.executeQuery();
rsmt = rs.getMetaData();
StringBuilder sb = new StringBuilder();
System.out.println("Table " + table + ":");
for(int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rsmt.getColumnName(i)).append(" ");
}
sb.append("|");
System.out.println(sb.toString());
while(rs.next()) {
sb = new StringBuilder();
for(int i = 1; i <= rsmt.getColumnCount(); i++) {
sb.append("| ").append(rs.getString(i)).append(" ");
}
sb.append("|");
System.out.println(sb.toString());
}
System.out.println("");
} finally {
if(rs != null) {
rs.close();
}
if(ps != null) {
ps.close();
}
}
}
}

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MStringInput;
import java.util.List;
/**
* Test connection methods on Derby repository.
*/
public class TestConnectionHandling extends DerbyTestCase {
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
// We always needs connector and framework structures in place
loadConnectorAndFramework();
}
public void testFindConnection() throws Exception {
// Let's try to find non existing connection
try {
handler.findConnection(1, getDerbyConnection());
fail();
} catch(SqoopException ex) {
assertEquals(DerbyRepoError.DERBYREPO_0024, ex.getErrorCode());
}
// Load prepared connections into database
loadConnections();
MConnection connA = handler.findConnection(1, getDerbyConnection());
assertNotNull(connA);
assertEquals(1, connA.getPersistenceId());
assertEquals("CA", connA.getName());
List<MForm> forms;
// Check connector part
forms = connA.getConnectorPart().getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value3", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Check framework part
forms = connA.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
}
public void testFindConnections() throws Exception {
List<MConnection> list;
// Load empty list on empty repository
list = handler.findConnections(getDerbyConnection());
assertEquals(0, list.size());
loadConnections();
// Load all two connections on loaded repository
list = handler.findConnections(getDerbyConnection());
assertEquals(2, list.size());
assertEquals("CA", list.get(0).getName());
assertEquals("CB", list.get(1).getName());
}
public void testExistsConnection() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsConnection(1, getDerbyConnection()));
assertFalse(handler.existsConnection(2, getDerbyConnection()));
assertFalse(handler.existsConnection(3, getDerbyConnection()));
loadConnections();
assertTrue(handler.existsConnection(1, getDerbyConnection()));
assertTrue(handler.existsConnection(2, getDerbyConnection()));
assertFalse(handler.existsConnection(3, getDerbyConnection()));
}
public void testCreateConnection() throws Exception {
MConnection connection = getConnection();
// Load some data
fillConnection(connection);
handler.createConnection(connection, getDerbyConnection());
assertEquals(1, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 1);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
MConnection retrieved = handler.findConnection(1, getDerbyConnection());
assertEquals(1, retrieved.getPersistenceId());
List<MForm> forms;
forms = connection.getConnectorPart().getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value2", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
forms = connection.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Let's create second connection
connection = getConnection();
fillConnection(connection);
handler.createConnection(connection, getDerbyConnection());
assertEquals(2, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 2);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 8);
}
public void testUpdateConnection() throws Exception {
loadConnections();
MConnection connection = handler.findConnection(1, getDerbyConnection());
List<MForm> forms;
forms = connection.getConnectorPart().getForms();
((MStringInput)forms.get(0).getInputs().get(1)).setValue("Injected");
forms = connection.getFrameworkPart().getForms();
((MStringInput)forms.get(1).getInputs().get(1)).setValue("Injected");
handler.updateConnection(connection, getDerbyConnection());
assertEquals(1, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 2);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 10);
MConnection retrieved = handler.findConnection(1, getDerbyConnection());
forms = retrieved.getConnectorPart().getForms();
assertEquals("Injected", forms.get(0).getInputs().get(1).getValue());
forms = retrieved.getFrameworkPart().getForms();
assertEquals("Injected", forms.get(1).getInputs().get(1).getValue());
}
public void testDeleteConnection() throws Exception {
loadConnections();
handler.deleteConnection(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_CONNECTION", 1);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
handler.deleteConnection(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_CONNECTION", 0);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 0);
}
public MConnection getConnection() {
return new MConnection(1,
handler.findConnector("A", getDerbyConnection()).getConnectionForms(),
handler.findFramework(getDerbyConnection()).getConnectionForms()
);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.model.MConnector;
/**
* Test connector methods on Derby repository.
*/
public class TestConnectorHandling extends DerbyTestCase {
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
}
public void testFindConnector() throws Exception {
// On empty repository, no connectors should be there
assertNull(handler.findConnector("A", getDerbyConnection()));
assertNull(handler.findConnector("B", getDerbyConnection()));
// Load connector into repository
loadConnectorAndFramework();
// Retrieve it
MConnector connector = handler.findConnector("A", getDerbyConnection());
assertNotNull(connector);
// Get original structure
MConnector original = getConnector();
// And compare them
assertEquals(original, connector);
}
public void testRegisterConnector() throws Exception {
MConnector connector = getConnector();
handler.registerConnector(connector, getDerbyConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_FORM", 6);
assertCountForTable("SQOOP.SQ_INPUT", 12);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", getDerbyConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.model.MFramework;
/**
* Test framework methods on Derby repository.
*/
public class TestFrameworkHandling extends DerbyTestCase {
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
}
public void testFindFramework() throws Exception {
// On empty repository, no framework should be there
assertNull(handler.findFramework(getDerbyConnection()));
// Load framework into repository
loadConnectorAndFramework();
// Retrieve it
MFramework framework = handler.findFramework(getDerbyConnection());
assertNotNull(framework);
// Get original structure
MFramework original = getFramework();
// And compare them
assertEquals(original, framework);
}
public void testRegisterConnector() throws Exception {
MFramework framework = getFramework();
handler.registerFramework(framework, getDerbyConnection());
// Connector should get persistence ID
assertEquals(1, framework.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 0);
assertCountForTable("SQOOP.SQ_FORM", 6);
assertCountForTable("SQOOP.SQ_INPUT", 12);
// Registered framework should be easily recovered back
MFramework retrieved = handler.findFramework(getDerbyConnection());
assertNotNull(retrieved);
assertEquals(framework, retrieved);
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MStringInput;
import java.util.List;
/**
* Test job methods on Derby repository.
*/
public class TestJobHandling extends DerbyTestCase {
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
// We always needs connector and framework structures in place
loadConnectorAndFramework();
// We always needs connection metadata in place
loadConnections();
}
public void testFindJob() throws Exception {
// Let's try to find non existing job
try {
handler.findJob(1, getDerbyConnection());
fail();
} catch(SqoopException ex) {
assertEquals(DerbyRepoError.DERBYREPO_0030, ex.getErrorCode());
}
// Load prepared connections into database
loadJobs();
MJob jobImport = handler.findJob(1, getDerbyConnection());
assertNotNull(jobImport);
assertEquals(1, jobImport.getPersistenceId());
assertEquals("JA", jobImport.getName());
assertEquals(MJob.Type.IMPORT, jobImport.getType());
List<MForm> forms;
// Check connector part
forms = jobImport.getConnectorPart().getForms();
assertEquals("Value5", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value7", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Check framework part
forms = jobImport.getFrameworkPart().getForms();
assertEquals("Value17", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value19", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
}
public void testFindJobs() throws Exception {
List<MJob> list;
// Load empty list on empty repository
list = handler.findJobs(getDerbyConnection());
assertEquals(0, list.size());
loadJobs();
// Load all two connections on loaded repository
list = handler.findJobs(getDerbyConnection());
assertEquals(4, list.size());
assertEquals("JA", list.get(0).getName());
assertEquals(MJob.Type.IMPORT, list.get(0).getType());
assertEquals("JB", list.get(1).getName());
assertEquals(MJob.Type.IMPORT, list.get(1).getType());
assertEquals("JA", list.get(2).getName());
assertEquals(MJob.Type.EXPORT, list.get(2).getType());
assertEquals("JB", list.get(3).getName());
assertEquals(MJob.Type.EXPORT, list.get(3).getType());
}
public void testExistsJob() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsJob(1, getDerbyConnection()));
assertFalse(handler.existsJob(2, getDerbyConnection()));
assertFalse(handler.existsJob(3, getDerbyConnection()));
assertFalse(handler.existsJob(4, getDerbyConnection()));
assertFalse(handler.existsJob(5, getDerbyConnection()));
loadJobs();
assertTrue(handler.existsJob(1, getDerbyConnection()));
assertTrue(handler.existsJob(2, getDerbyConnection()));
assertTrue(handler.existsJob(3, getDerbyConnection()));
assertTrue(handler.existsJob(4, getDerbyConnection()));
assertFalse(handler.existsJob(5, getDerbyConnection()));
}
public void testCreateJob() throws Exception {
MJob job = getJob();
// Load some data
fillJob(job);
handler.createJob(job, getDerbyConnection());
assertEquals(1, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 1);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 4);
MJob retrieved = handler.findJob(1, getDerbyConnection());
assertEquals(1, retrieved.getPersistenceId());
List<MForm> forms;
forms = job.getConnectorPart().getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value2", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
forms = job.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Let's create second job
job = getJob();
fillJob(job);
handler.createJob(job, getDerbyConnection());
assertEquals(2, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 2);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 8);
}
public void testUpdateJob() throws Exception {
loadJobs();
MJob job = handler.findJob(1, getDerbyConnection());
List<MForm> forms;
forms = job.getConnectorPart().getForms();
((MStringInput)forms.get(0).getInputs().get(1)).setValue("Injected");
forms = job.getFrameworkPart().getForms();
((MStringInput)forms.get(1).getInputs().get(1)).setValue("Injected");
handler.updateJob(job, getDerbyConnection());
assertEquals(1, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 4);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 18);
MJob retrieved = handler.findJob(1, getDerbyConnection());
forms = retrieved.getConnectorPart().getForms();
assertEquals("Injected", forms.get(0).getInputs().get(1).getValue());
forms = retrieved.getFrameworkPart().getForms();
assertEquals("Injected", forms.get(1).getInputs().get(1).getValue());
}
public void testDeleteJob() throws Exception {
loadJobs();
handler.deleteJob(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 3);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 12);
handler.deleteJob(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 2);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 8);
handler.deleteJob(3, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 1);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 4);
handler.deleteJob(4, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 0);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 0);
}
public MJob getJob() {
return new MJob(1, 1, MJob.Type.IMPORT,
handler.findConnector("A",
getDerbyConnection()).getJobForms(MJob.Type.IMPORT
),
handler.findFramework(
getDerbyConnection()).getJobForms(MJob.Type.IMPORT
)
);
}
}