diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index f6447c65..ba56c776 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -279,7 +279,7 @@ public MSubmission submit(long jobId, HttpEventContext ctx) { // only if it's not. synchronized (getClass()) { MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() - .findSubmissionLastForJob(jobId); + .findLastSubmissionForJob(jobId); if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); } @@ -530,7 +530,7 @@ void destroySubmission(JobRequest request) { public MSubmission stop(long jobId, HttpEventContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); + MSubmission mSubmission = repository.findLastSubmissionForJob(jobId); if (mSubmission == null || !mSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId @@ -549,7 +549,7 @@ public MSubmission stop(long jobId, HttpEventContext ctx) { public MSubmission status(long jobId) { Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); + MSubmission mSubmission = repository.findLastSubmissionForJob(jobId); if (mSubmission == null) { return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); @@ -681,7 +681,7 @@ public void run() { // Let's get all running submissions from repository to check them out List unfinishedSubmissions = RepositoryManager.getInstance().getRepository() - .findSubmissionsUnfinished(); + .findUnfinishedSubmissions(); for (MSubmission submission : unfinishedSubmissions) { update(submission); diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 2aeb07eb..976223d2 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -385,6 +385,20 @@ public Object doIt(Connection conn) { }); } + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public List findLinksForConnector(final long connectorId) { + return (List) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findLinksForConnector(connectorId, conn); + } + }); + } + /** * {@inheritDoc} */ @@ -460,13 +474,11 @@ public void deleteJob(final long id) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsJob(id, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, - "Invalid id: " + id); + if (!handler.existsJob(id, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + id); } - if(handler.inUseJob(id, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0022, - "Id in use: " + id); + if (handler.inUseJob(id, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0022, "Id in use: " + id); } handler.deleteJob(id, conn); @@ -502,6 +514,20 @@ public Object doIt(Connection conn) { }); } + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + @Override + public List findJobsForConnector(final long connectorId) { + return (List) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findJobsForConnector(connectorId, conn); + } + }); + } + /** * {@inheritDoc} */ @@ -513,7 +539,6 @@ public Object doIt(Connection conn) { if(submission.hasPersistenceId()) { throw new SqoopException(RepositoryError.JDBCREPO_0023); } - handler.createSubmission(submission, conn); return null; } @@ -561,11 +586,11 @@ public Object doIt(Connection conn) { */ @SuppressWarnings("unchecked") @Override - public List findSubmissionsUnfinished() { + public List findUnfinishedSubmissions() { return (List) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - return handler.findSubmissionsUnfinished(conn); + return handler.findUnfinishedSubmissions(conn); } }); } @@ -606,43 +631,14 @@ public Object doIt(Connection conn) throws Exception { * {@inheritDoc} */ @Override - public MSubmission findSubmissionLastForJob(final long jobId) { + public MSubmission findLastSubmissionForJob(final long jobId) { return (MSubmission) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, - "Invalid id: " + jobId); + if (!handler.existsJob(jobId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); } - return handler.findSubmissionLastForJob(jobId, conn); - } - }); - } - - /** - * {@inheritDoc} - */ - @SuppressWarnings("unchecked") - @Override - public List findLinksForConnector(final long connectorId) { - return (List) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) throws Exception { - return handler.findLinksForConnector(connectorId, conn); - } - }); - } - - /** - * {@inheritDoc} - */ - @SuppressWarnings("unchecked") - @Override - public List findJobsForConnector(final long connectorId) { - return (List) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) throws Exception { - return handler.findJobsForConnector(connectorId, conn); + return handler.findLastSubmissionForJob(jobId, conn); } }); } diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index ad380d39..1e227590 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -330,7 +330,7 @@ public abstract class JdbcRepositoryHandler { * * @param jobId Job id * @param conn Connection to the repository - * @return Deserialized config of the job that is present in the repository + * @return job for a given id that is present in the repository */ public abstract MJob findJob(long jobId, Connection conn); @@ -343,7 +343,7 @@ public abstract class JdbcRepositoryHandler { public abstract List findJobs(Connection conn); /** - * Save given submission in repository. + * Save given submission associates with a job in repository. * * @param submission Submission object * @param conn Connection to the repository @@ -380,7 +380,7 @@ public abstract class JdbcRepositoryHandler { * @param conn Connection to the repository * @return List of unfinished submissions. */ - public abstract List findSubmissionsUnfinished(Connection conn); + public abstract List findUnfinishedSubmissions(Connection conn); /** * Return list of all submissions from the repository. @@ -405,5 +405,5 @@ public abstract class JdbcRepositoryHandler { * @param conn Connection to the repository * @return Most recent submission */ - public abstract MSubmission findSubmissionLastForJob(long jobId, Connection conn); + public abstract MSubmission findLastSubmissionForJob(long jobId, Connection conn); } \ No newline at end of file diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 79742b9d..09989e01 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -115,7 +115,7 @@ public abstract class Repository { /** * Get all connectors in repository * - * @return List will all connectors in repository + * @return List with all connectors in repository */ public abstract List findConnectors(); @@ -188,6 +188,13 @@ public abstract class Repository { */ public abstract MLink findLink(String name); + /** + * Retrieve links which use the given connector. + * @param connectorId Connector id whose links should be fetched + * @return List of MLink that use connectorId. + */ + public abstract List findLinksForConnector(long connectorId); + /** * Get all Link objects. * @@ -242,7 +249,7 @@ public abstract class Repository { * Find job object with given id. * * @param id Job id - * @return Deserialized config of job loaded from repository + * @return job with given id loaded from repository */ public abstract MJob findJob(long id); @@ -253,6 +260,14 @@ public abstract class Repository { */ public abstract List findJobs(); + /** + * Retrieve jobs which use the given link. + * + * @param connectorId Connector ID whose jobs should be fetched + * @return List of MJobs that use linkID. + */ + public abstract List findJobsForConnector(long connectorId); + /** * Create new submission record in repository. * @@ -279,7 +294,7 @@ public abstract class Repository { * * @return List of unfinished submissions */ - public abstract List findSubmissionsUnfinished(); + public abstract List findUnfinishedSubmissions(); /** * Return all submissions from repository @@ -301,22 +316,7 @@ public abstract class Repository { * @param jobId Job id * @return Most recent submission */ - public abstract MSubmission findSubmissionLastForJob(long jobId); - - /** - * Retrieve links which use the given connector. - * @param connectorId Connector ID whose links should be fetched - * @return List of MLink that use connectorID. - */ - public abstract List findLinksForConnector(long connectorId); - - /** - * Retrieve jobs which use the given link. - * - * @param connectorId Connector ID whose jobs should be fetched - * @return List of MJobs that use linkID. - */ - public abstract List findJobsForConnector(long connectorId); + public abstract MSubmission findLastSubmissionForJob(long jobId); /** * Update the connector with the new data supplied in the diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index b996a0b0..b324f4f1 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -1205,9 +1205,9 @@ public MLink findLink(long linkId, Connection conn) { List links = loadLinks(linkFetchStmt, conn); - if(links.size() != 1) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0024, "Couldn't find" - + " link with id " + linkId); + if (links.size() != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0024, "Couldn't find link with id " + + linkId); } // Return the first and only one link object with the given id @@ -1492,7 +1492,7 @@ public boolean existsJob(long id, Connection conn) { @Override public boolean inUseJob(long jobId, Connection conn) { - MSubmission submission = findSubmissionLastForJob(jobId, conn); + MSubmission submission = findLastSubmissionForJob(jobId, conn); // We have no submissions and thus job can't be in use if(submission == null) { @@ -1573,9 +1573,9 @@ public MJob findJob(long id, Connection conn) { List jobs = loadJobs(stmt, conn); - if(jobs.size() != 1) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0030, "Couldn't find" - + " job with id " + id); + if (jobs.size() != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0030, "Couldn't find job with id " + + id); } // Return the first and only one link object @@ -1763,7 +1763,7 @@ public void purgeSubmissions(Date threshold, Connection conn) { * {@inheritDoc} */ @Override - public List findSubmissionsUnfinished(Connection conn) { + public List findUnfinishedSubmissions(Connection conn) { List submissions = new LinkedList(); PreparedStatement stmt = null; ResultSet rs = null; @@ -1852,7 +1852,7 @@ public List findSubmissionsForJob(long jobId, Connection conn) { * {@inheritDoc} */ @Override - public MSubmission findSubmissionLastForJob(long jobId, Connection conn) { + public MSubmission findLastSubmissionForJob(long jobId, Connection conn) { PreparedStatement stmt = null; ResultSet rs = null; try { diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java index 4c2d062a..e2e80730 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java @@ -60,13 +60,13 @@ public void setUp() throws Exception { public void testFindSubmissionsUnfinished() throws Exception { List submissions; - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(0, submissions.size()); loadSubmissions(); - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(2, submissions.size()); } @@ -123,7 +123,7 @@ public void testCreateSubmission() throws Exception { assertCountForTable("SQOOP.SQ_SUBMISSION", 1); List submissions = - handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(1, submissions.size()); @@ -179,7 +179,7 @@ public void testUpdateConnection() throws Exception { loadSubmissions(); List submissions = - handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(2, submissions.size()); @@ -188,7 +188,7 @@ public void testUpdateConnection() throws Exception { handler.updateSubmission(submission, getDerbyDatabaseConnection()); - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(1, submissions.size()); } @@ -198,7 +198,7 @@ public void testPurgeSubmissions() throws Exception { loadSubmissions(); List submissions; - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(2, submissions.size()); assertCountForTable("SQOOP.SQ_SUBMISSION", 5); @@ -208,21 +208,21 @@ public void testPurgeSubmissions() throws Exception { calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5); handler.purgeSubmissions(calendar.getTime(), getDerbyDatabaseConnection()); - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(1, submissions.size()); assertCountForTable("SQOOP.SQ_SUBMISSION", 2); handler.purgeSubmissions(new Date(), getDerbyDatabaseConnection()); - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(0, submissions.size()); assertCountForTable("SQOOP.SQ_SUBMISSION", 0); handler.purgeSubmissions(new Date(), getDerbyDatabaseConnection()); - submissions = handler.findSubmissionsUnfinished(getDerbyDatabaseConnection()); + submissions = handler.findUnfinishedSubmissions(getDerbyDatabaseConnection()); assertNotNull(submissions); assertEquals(0, submissions.size()); assertCountForTable("SQOOP.SQ_SUBMISSION", 0);