5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-12 15:01:45 +08:00

SQOOP-1879: Sqoop2: Submission Engine does not set all details on SubmissionRecord in Local mode

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-16 18:07:53 -08:00
parent 89d9660c7b
commit cf9af00812
4 changed files with 71 additions and 32 deletions

View File

@ -335,7 +335,7 @@ private JobRequest createJobRequest(long jobId, MSubmission submission) {
// Create a job request for submit/execution
JobRequest jobRequest = executionEngine.createJobRequest();
// Save important variables to the job request
jobRequest.setSummary(submission);
jobRequest.setJobSubmission(submission);
jobRequest.setConnector(Direction.FROM, fromConnector);
jobRequest.setConnector(Direction.TO, toConnector);

View File

@ -28,8 +28,6 @@
import org.apache.sqoop.utils.ClassUtils;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
@ -140,7 +138,7 @@ public MSubmission getJobSubmission() {
return jobSubmission;
}
public void setSummary(MSubmission submission) {
public void setJobSubmission(MSubmission submission) {
this.jobSubmission = submission;
}

View File

@ -246,17 +246,14 @@ public boolean submit(JobRequest mrJobRequest) {
// If we're in local mode than wait on completion. Local job runner do not
// seems to be exposing API to get previously submitted job which makes
// other methods of the submission engine quite useless.
if(isLocal()) {
job.waitForCompletion(true);
// NOTE: The minicluster mode is not local. It runs similar to a real MR cluster but
// only that it is in the same JVM
if (isLocal()) {
submitToLocalRunner(request, job);
} else {
job.submit();
submitToCluster(request, job);
}
String jobId = job.getJobID().toString();
request.getJobSubmission().setExternalJobId(jobId);
request.getJobSubmission().setExternalLink(job.getTrackingURL());
LOG.debug("Executed new map-reduce job with id " + jobId);
LOG.debug("Executed new map-reduce job with id " + job.getJobID().toString());
} catch (Exception e) {
SubmissionError error = new SubmissionError();
error.setErrorSummary(e.toString());
@ -272,6 +269,32 @@ public boolean submit(JobRequest mrJobRequest) {
return true;
}
private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException {
job.submit();
request.getJobSubmission().setExternalJobId(job.getJobID().toString());
request.getJobSubmission().setExternalLink(job.getTrackingURL());
}
private void submitToLocalRunner(MRJobRequest request, Job job) throws IOException, InterruptedException,
ClassNotFoundException {
boolean successful = job.waitForCompletion(true);
if (successful) {
request.getJobSubmission().setStatus(SubmissionStatus.SUCCEEDED);
} else {
// treat any other state as failed
request.getJobSubmission().setStatus(SubmissionStatus.FAILED);
}
request.getJobSubmission().setExternalJobId(job.getJobID().toString());
request.getJobSubmission().setExternalLink(job.getTrackingURL());
request.getJobSubmission().setStatus(convertMapreduceState(job.getJobState().getValue()));
// there is no failure info in this job api, unlike the running job
request.getJobSubmission().setError(null);
request.getJobSubmission().setProgress((job.mapProgress() + job.reduceProgress()) / 2);
request.getJobSubmission().setCounters(convertHadoop2MapreduceCounters(job.getCounters()));
request.getJobSubmission().setLastUpdateDate(new Date());
}
/**
* {@inheritDoc}
*/
@ -342,20 +365,12 @@ private Counters counters(RunningJob runningJob) {
return null;
}
return convertMapreduceCounters(runningJob.getCounters());
return convertHadoop1MapreduceCounters(runningJob.getCounters());
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
private String externalLink(RunningJob runningJob) {
if (runningJob == null) {
return null;
}
return runningJob.getTrackingURL();
}
/**
* Convert map-reduce specific job status constants to Sqoop job status
* constants.
@ -382,21 +397,22 @@ private SubmissionStatus convertMapreduceState(int status) {
/**
* Convert Hadoop counters to Sqoop counters.
* Convert Hadoop1 counters to Sqoop counters.
*
* @param hadoopCounters Hadoop counters
* @return Appropriate Sqoop counters
*/
private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
private Counters convertHadoop1MapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
Counters sqoopCounters = new Counters();
if(hadoopCounters == null) {
if (hadoopCounters == null) {
return sqoopCounters;
}
for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) {
CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName());
for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) {
for (org.apache.hadoop.mapred.Counters.Group counterGroup : hadoopCounters) {
CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
for (org.apache.hadoop.mapred.Counters.Counter hadoopCounter : counterGroup) {
Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
sqoopGroup.addCounter(sqoopCounter);
}
@ -406,6 +422,32 @@ private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hado
return sqoopCounters;
}
/**
* Convert Hadoop2 counters to Sqoop counters.
*
* @param hadoopCounters Hadoop counters
* @return Appropriate Sqoop counters
*/
private Counters convertHadoop2MapreduceCounters(org.apache.hadoop.mapreduce.Counters hadoopCounters) {
Counters sqoopCounters = new Counters();
if (hadoopCounters == null) {
return sqoopCounters;
}
for (org.apache.hadoop.mapreduce.CounterGroup counterGroup : hadoopCounters) {
CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
for (org.apache.hadoop.mapreduce.Counter hadoopCounter : counterGroup) {
Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
sqoopGroup.addCounter(sqoopCounter);
}
sqoopCounters.addCounterGroup(sqoopGroup);
}
return sqoopCounters;
}
/**
* {@inheritDoc}
*/
@ -419,19 +461,18 @@ public void update(MSubmission submission) {
SubmissionStatus newStatus = status(runningJob);
SubmissionError error = error(runningJob);
String externalLink = externalLink(runningJob);
if (newStatus.isRunning()) {
progress = progress(runningJob);
} else {
counters = counters(runningJob);
}
// these properties change as the job runs, rest of the submission attributes
// do not change as job runs
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
submission.setExternalLink(externalLink);
submission.setLastUpdateDate(new Date());
RepositoryManager.getInstance().getRepository().updateSubmission(submission);

View File

@ -249,7 +249,7 @@ protected void executeJob(long jid) throws Exception {
LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary());
LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails());
}
assertEquals("Submission has failed with " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
assertEquals("Submission finished with error: " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
}
/**