5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-11 22:41:50 +08:00

SQOOP-1723: Sqoop2: findJobsByConnector fails

(Veena Basavaraj via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-11-12 23:41:59 -08:00
parent 0ace9710eb
commit a2e87bef01
6 changed files with 79 additions and 49 deletions

View File

@ -149,7 +149,8 @@ public void registerConnector(MConnector mc, Connection conn) {
public List<MJob> findJobsForConnector(long connectorId, Connection conn) { public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
PreparedStatement stmt = null; PreparedStatement stmt = null;
try { try {
stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE); stmt = conn
.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
stmt.setLong(1, connectorId); stmt.setLong(1, connectorId);
stmt.setLong(2, connectorId); stmt.setLong(2, connectorId);
return loadJobs(stmt, conn); return loadJobs(stmt, conn);
@ -894,10 +895,9 @@ public MJob findJob(String name, Connection conn) {
public List<MJob> findJobs(Connection conn) { public List<MJob> findJobs(Connection conn) {
PreparedStatement stmt = null; PreparedStatement stmt = null;
try { try {
stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB); stmt = conn
.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB_ALL);
return loadJobs(stmt, conn); return loadJobs(stmt, conn);
} catch (SQLException ex) { } catch (SQLException ex) {
logException(ex); logException(ex);
throw new SqoopException(CommonRepositoryError.COMMON_0028, ex); throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);

View File

@ -343,7 +343,7 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
+ " WHERE " + COLUMN_SQ_LNK_ID + " = ? "; + " WHERE " + COLUMN_SQ_LNK_ID + " = ? ";
//DML: Select all jobs //DML: Select all jobs
public static final String STMT_SELECT_JOB = public static final String STMT_SELECT_JOB_ALL =
"SELECT " "SELECT "
+ "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", " + "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", " + "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
@ -364,17 +364,19 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
// DML: Select one specific job // DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE_BY_ID = public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?"; STMT_SELECT_JOB_ALL +
" WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select one specific job // DML: Select one specific job
public static final String STMT_SELECT_JOB_SINGLE_BY_NAME = public static final String STMT_SELECT_JOB_SINGLE_BY_NAME =
STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_NAME + " = ?"; STMT_SELECT_JOB_ALL +
" WHERE " + COLUMN_SQB_NAME + " = ?";
// DML: Select all jobs for a Connector // DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE = public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
STMT_SELECT_JOB STMT_SELECT_JOB_ALL +
+ " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONFIGURABLE + " = ? OR TO_LINK." " WHERE FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + " = ?" +
+ COLUMN_SQ_LNK_CONFIGURABLE + " = ?"; " OR TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + " = ?";
/**********SUBMISSION TABLE **************/ /**********SUBMISSION TABLE **************/
// DML: Insert new submission // DML: Insert new submission

View File

@ -19,6 +19,8 @@
import static org.apache.sqoop.repository.derby.DerbySchemaCreateQuery.*; import static org.apache.sqoop.repository.derby.DerbySchemaCreateQuery.*;
import static org.apache.sqoop.repository.derby.DerbySchemaInsertUpdateDeleteSelectQuery.*; import static org.apache.sqoop.repository.derby.DerbySchemaInsertUpdateDeleteSelectQuery.*;
import static org.apache.sqoop.repository.common.CommonRepositoryInsertUpdateDeleteSelectQuery.*;
import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*; import static org.apache.sqoop.repository.derby.DerbySchemaUpgradeQuery.*;
import java.net.URL; import java.net.URL;
import java.sql.Connection; import java.sql.Connection;

View File

@ -83,28 +83,6 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
+ COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE
+ ") VALUES (?, ?, ?, ?, ?, ?, ?)"; + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
/**********JOB TABLE **************/
//DML: Select all jobs
public static final String STMT_SELECT_JOB =
"SELECT "
+ "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+ "JOB." + COLUMN_SQB_ID + ", "
+ "JOB." + COLUMN_SQB_NAME + ", "
+ "JOB." + COLUMN_SQB_FROM_LINK + ", "
+ "JOB." + COLUMN_SQB_TO_LINK + ", "
+ "JOB." + COLUMN_SQB_ENABLED + ", "
+ "JOB." + COLUMN_SQB_CREATION_USER + ", "
+ "JOB." + COLUMN_SQB_CREATION_DATE + ", "
+ "JOB." + COLUMN_SQB_UPDATE_USER + ", "
+ "JOB." + COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB + " JOB"
+ " LEFT JOIN " + TABLE_SQ_LINK + " FROM_CONNECTOR"
+ " ON " + COLUMN_SQB_FROM_LINK + " = FROM_CONNECTOR." + COLUMN_SQ_LNK_ID
+ " LEFT JOIN " + TABLE_SQ_LINK + " TO_CONNECTOR"
+ " ON " + COLUMN_SQB_TO_LINK + " = TO_CONNECTOR." + COLUMN_SQ_LNK_ID;
/******* CONFIG and CONNECTOR DIRECTIONS ****/ /******* CONFIG and CONNECTOR DIRECTIONS ****/
public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " " public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
+ "(" + COLUMN_SQD_NAME + ") VALUES (?)"; + "(" + COLUMN_SQD_NAME + ") VALUES (?)";
@ -115,16 +93,6 @@ public final class DerbySchemaInsertUpdateDeleteSelectQuery {
+ COLUMN_SQ_CFG_DIRECTION + COLUMN_SQ_CFG_DIRECTION
+ " FROM " + TABLE_SQ_CONFIG; + " FROM " + TABLE_SQ_CONFIG;
public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+ "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
+ " VALUES (?, ?)";
public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
+ "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
+ " VALUES (?, ?)";
private DerbySchemaInsertUpdateDeleteSelectQuery() { private DerbySchemaInsertUpdateDeleteSelectQuery() {
// Disable explicit object creation // Disable explicit object creation
} }

View File

@ -108,19 +108,46 @@ public void testFindJobs() throws Exception {
assertEquals(0, list.size()); assertEquals(0, list.size());
loadJobsForLatestVersion(); loadJobsForLatestVersion();
// Load all two connections on loaded repository // Load all two links on loaded repository
list = handler.findJobs(derbyConnection); list = handler.findJobs(derbyConnection);
assertEquals(4, list.size()); assertEquals(4, list.size());
assertEquals("JA0", list.get(0).getName()); assertEquals("JA0", list.get(0).getName());
assertEquals("JB0", list.get(1).getName()); assertEquals("JB0", list.get(1).getName());
assertEquals("JC0", list.get(2).getName()); assertEquals("JC0", list.get(2).getName());
assertEquals("JD0", list.get(3).getName()); assertEquals("JD0", list.get(3).getName());
} }
@Test
public void testFindJobsByConnector() throws Exception {
List<MJob> list;
// Load empty list on empty repository
list = handler.findJobs(derbyConnection);
assertEquals(0, list.size());
loadJobsForLatestVersion();
// Load all 4 jobs on loaded repository
list = handler.findJobsForConnector(1, derbyConnection);
assertEquals(4, list.size());
assertEquals("JA0", list.get(0).getName());
assertEquals("JB0", list.get(1).getName());
assertEquals("JC0", list.get(2).getName());
assertEquals("JD0", list.get(3).getName());
}
@Test
public void testFindJobsForNonExistingConnector() throws Exception {
List<MJob> list;
// Load empty list on empty repository
list = handler.findJobs(derbyConnection);
assertEquals(0, list.size());
loadJobsForLatestVersion();
list = handler.findJobsForConnector(11, derbyConnection);
assertEquals(0, list.size());
}
@Test @Test
public void testExistsJob() throws Exception { public void testExistsJob() throws Exception {
// There shouldn't be anything on empty repository // There shouldn't be anything on empty repository

View File

@ -63,7 +63,7 @@ public void testFindLink() throws Exception {
assertEquals(CommonRepositoryError.COMMON_0021, ex.getErrorCode()); assertEquals(CommonRepositoryError.COMMON_0021, ex.getErrorCode());
} }
// Load prepared connections into database // Load prepared links into database
loadLinksForLatestVersion(); loadLinksForLatestVersion();
MLink linkA = handler.findLink(1, getDerbyDatabaseConnection()); MLink linkA = handler.findLink(1, getDerbyDatabaseConnection());
@ -85,7 +85,7 @@ public void testFindLink() throws Exception {
public void testFindLinkByName() throws Exception { public void testFindLinkByName() throws Exception {
// Let's try to find non existing link // Let's try to find non existing link
assertNull(handler.findLink("non-existing", getDerbyDatabaseConnection())); assertNull(handler.findLink("non-existing", getDerbyDatabaseConnection()));
// Load prepared connections into database // Load prepared links into database
loadLinksForLatestVersion(); loadLinksForLatestVersion();
MLink linkA = handler.findLink("CA", getDerbyDatabaseConnection()); MLink linkA = handler.findLink("CA", getDerbyDatabaseConnection());
@ -113,7 +113,7 @@ public void testFindLinks() throws Exception {
loadLinksForLatestVersion(); loadLinksForLatestVersion();
// Load all two connections on loaded repository // Load all two links on loaded repository
list = handler.findLinks(getDerbyDatabaseConnection()); list = handler.findLinks(getDerbyDatabaseConnection());
assertEquals(2, list.size()); assertEquals(2, list.size());
@ -121,6 +121,37 @@ public void testFindLinks() throws Exception {
assertEquals("CB", list.get(1).getName()); assertEquals("CB", list.get(1).getName());
} }
@Test
public void testFindLinksByConnector() throws Exception {
List<MLink> list;
// Load empty list on empty repository
list = handler.findLinks(getDerbyDatabaseConnection());
assertEquals(0, list.size());
loadLinksForLatestVersion();
// Load all two links on loaded repository
list = handler.findLinksForConnector(1, getDerbyDatabaseConnection());
assertEquals(2, list.size());
assertEquals("CA", list.get(0).getName());
assertEquals("CB", list.get(1).getName());
}
public void testFindLinksByNonExistingConnector() throws Exception {
List<MLink> list;
// Load empty list on empty repository
list = handler.findLinks(getDerbyDatabaseConnection());
assertEquals(0, list.size());
loadLinksForLatestVersion();
list = handler.findLinksForConnector(2, getDerbyDatabaseConnection());
assertEquals(0, list.size());
}
@Test @Test
public void testExistsLink() throws Exception { public void testExistsLink() throws Exception {
// There shouldn't be anything on empty repository // There shouldn't be anything on empty repository