diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index f286c02e..dc441bc2 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -335,7 +335,7 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) { // Create a job request for submit/execution JobRequest jobRequest = executionEngine.createJobRequest(); // Save important variables to the job request - jobRequest.setSummary(submission); + jobRequest.setJobSubmission(submission); jobRequest.setConnector(Direction.FROM, fromConnector); jobRequest.setConnector(Direction.TO, toConnector); diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java index d2496bd6..cfa45b23 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java @@ -28,8 +28,6 @@ import org.apache.sqoop.utils.ClassUtils; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Set; /** @@ -140,7 +138,7 @@ public MSubmission getJobSubmission() { return jobSubmission; } - public void setSummary(MSubmission submission) { + public void setJobSubmission(MSubmission submission) { this.jobSubmission = submission; } diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 22a9736d..d15bcfca 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -246,17 +246,14 @@ public boolean submit(JobRequest mrJobRequest) { // If we're in local mode than wait on completion. Local job runner do not // seems to be exposing API to get previously submitted job which makes // other methods of the submission engine quite useless. - if(isLocal()) { - job.waitForCompletion(true); + // NOTE: The minicluster mode is not local. It runs similar to a real MR cluster but + // only that it is in the same JVM + if (isLocal()) { + submitToLocalRunner(request, job); } else { - job.submit(); + submitToCluster(request, job); } - - String jobId = job.getJobID().toString(); - request.getJobSubmission().setExternalJobId(jobId); - request.getJobSubmission().setExternalLink(job.getTrackingURL()); - - LOG.debug("Executed new map-reduce job with id " + jobId); + LOG.debug("Executed new map-reduce job with id " + job.getJobID().toString()); } catch (Exception e) { SubmissionError error = new SubmissionError(); error.setErrorSummary(e.toString()); @@ -272,6 +269,32 @@ public boolean submit(JobRequest mrJobRequest) { return true; } + private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException { + job.submit(); + request.getJobSubmission().setExternalJobId(job.getJobID().toString()); + request.getJobSubmission().setExternalLink(job.getTrackingURL()); + } + + private void submitToLocalRunner(MRJobRequest request, Job job) throws IOException, InterruptedException, + ClassNotFoundException { + boolean successful = job.waitForCompletion(true); + if (successful) { + request.getJobSubmission().setStatus(SubmissionStatus.SUCCEEDED); + } else { + // treat any other state as failed + request.getJobSubmission().setStatus(SubmissionStatus.FAILED); + } + request.getJobSubmission().setExternalJobId(job.getJobID().toString()); + request.getJobSubmission().setExternalLink(job.getTrackingURL()); + + request.getJobSubmission().setStatus(convertMapreduceState(job.getJobState().getValue())); + // there is no failure info in this job api, unlike the running job + request.getJobSubmission().setError(null); + request.getJobSubmission().setProgress((job.mapProgress() + job.reduceProgress()) / 2); + request.getJobSubmission().setCounters(convertHadoop2MapreduceCounters(job.getCounters())); + request.getJobSubmission().setLastUpdateDate(new Date()); + } + /** * {@inheritDoc} */ @@ -342,20 +365,12 @@ private Counters counters(RunningJob runningJob) { return null; } - return convertMapreduceCounters(runningJob.getCounters()); + return convertHadoop1MapreduceCounters(runningJob.getCounters()); } catch (IOException e) { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); } } - private String externalLink(RunningJob runningJob) { - if (runningJob == null) { - return null; - } - - return runningJob.getTrackingURL(); - } - /** * Convert map-reduce specific job status constants to Sqoop job status * constants. @@ -382,21 +397,22 @@ private SubmissionStatus convertMapreduceState(int status) { /** - * Convert Hadoop counters to Sqoop counters. + * Convert Hadoop1 counters to Sqoop counters. * * @param hadoopCounters Hadoop counters * @return Appropriate Sqoop counters */ - private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) { + + private Counters convertHadoop1MapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) { Counters sqoopCounters = new Counters(); - if(hadoopCounters == null) { + if (hadoopCounters == null) { return sqoopCounters; } - for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) { - CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName()); - for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) { + for (org.apache.hadoop.mapred.Counters.Group counterGroup : hadoopCounters) { + CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName()); + for (org.apache.hadoop.mapred.Counters.Counter hadoopCounter : counterGroup) { Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue()); sqoopGroup.addCounter(sqoopCounter); } @@ -406,6 +422,32 @@ private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hado return sqoopCounters; } + /** + * Convert Hadoop2 counters to Sqoop counters. + * + * @param hadoopCounters Hadoop counters + * @return Appropriate Sqoop counters + */ + private Counters convertHadoop2MapreduceCounters(org.apache.hadoop.mapreduce.Counters hadoopCounters) { + Counters sqoopCounters = new Counters(); + + if (hadoopCounters == null) { + return sqoopCounters; + } + + for (org.apache.hadoop.mapreduce.CounterGroup counterGroup : hadoopCounters) { + CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName()); + for (org.apache.hadoop.mapreduce.Counter hadoopCounter : counterGroup) { + Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue()); + sqoopGroup.addCounter(sqoopCounter); + } + sqoopCounters.addCounterGroup(sqoopGroup); + } + + return sqoopCounters; + } + + /** * {@inheritDoc} */ @@ -419,19 +461,18 @@ public void update(MSubmission submission) { SubmissionStatus newStatus = status(runningJob); SubmissionError error = error(runningJob); - String externalLink = externalLink(runningJob); if (newStatus.isRunning()) { progress = progress(runningJob); } else { counters = counters(runningJob); } - + // these properties change as the job runs, rest of the submission attributes + // do not change as job runs submission.setStatus(newStatus); submission.setError(error); submission.setProgress(progress); submission.setCounters(counters); - submission.setExternalLink(externalLink); submission.setLastUpdateDate(new Date()); RepositoryManager.getInstance().getRepository().updateSubmission(submission); diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index a423785f..9a76c4b3 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -249,7 +249,7 @@ protected void executeJob(long jid) throws Exception { LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary()); LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails()); } - assertEquals("Submission has failed with " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus()); + assertEquals("Submission finished with error: " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus()); } /**