5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 20:11:54 +08:00

SQOOP-1897: Sqoop2: Submission Engine API change for better performance

(Veena Basavaraj via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2014-12-15 16:35:58 -08:00
parent 4b8fa56931
commit 68be30c1cb
3 changed files with 56 additions and 126 deletions

View File

@ -556,7 +556,7 @@ public MSubmission stop(long jobId, HttpEventContext ctx) {
mSubmission.setLastUpdateUser(ctx.getUsername());
// Fetch new information to verify that the stop command has actually worked
update(mSubmission);
submissionEngine.update(mSubmission);
// Return updated structure
return mSubmission;
@ -571,37 +571,12 @@ public MSubmission status(long jobId) {
}
// If the submission isin running state, let's update it
if (mSubmission.getStatus().isRunning()) {
update(mSubmission);
submissionEngine.update(mSubmission);
}
return mSubmission;
}
private void update(MSubmission submission) {
double progress = -1;
Counters counters = null;
String externalJobId = submission.getExternalJobId();
SubmissionStatus newStatus = submissionEngine.status(externalJobId);
SubmissionError error = submissionEngine.error(externalJobId);
String externalLink = submissionEngine.externalLink(externalJobId);
if (newStatus.isRunning()) {
progress = submissionEngine.progress(externalJobId);
} else {
counters = submissionEngine.counters(externalJobId);
}
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
submission.setExternalLink(externalLink);
submission.setLastUpdateDate(new Date());
RepositoryManager.getInstance().getRepository()
.updateSubmission(submission);
}
@Override
public synchronized void configurationChanged() {
LOG.info("Begin submission engine manager reconfiguring");
@ -700,7 +675,7 @@ public void run() {
.findUnfinishedSubmissions();
for (MSubmission submission : unfinishedSubmissions) {
update(submission);
submissionEngine.update(submission);
}
Thread.sleep(updateSleep);

View File

@ -18,9 +18,7 @@
package org.apache.sqoop.driver;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.model.SubmissionError;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.model.MSubmission;
/**
* Submission engine is responsible in conveying the information about the
@ -69,56 +67,9 @@ public void destroy() {
public abstract void stop(String externalJobId);
/**
* Return status of given submission.
*
* @param externalJobId Submission external job id.
* @return Current submission status.
* Update the given submission
* @param submission record to update
*/
public abstract SubmissionStatus status(String externalJobId);
public abstract void update(MSubmission submission);
/**
* Return failure info if the job status is FAILED
*
* @param submissionId Submission internal id.
* @return Current failure info
*/
public abstract SubmissionError error(String externalJobId);
/**
* Return submission progress.
*
* Expected is number from interval <0, 1> denoting how far the processing
* has gone or -1 in case that this submission engine do not supports
* progress reporting.
*
* @param externalJobId Submission external job id.
* @return {-1} union <0, 1>
*/
public double progress(String externalJobId) {
return -1;
}
/**
* Return statistics for given submission id.
*
* Sqoop will call counters only for submission in state SUCCEEDED,
* it's consider exceptional state to call this method for other states.
*
* @param externalJobId Submission external job id.
* @return Submission statistics
*/
public Counters counters(String externalJobId) {
return null;
}
/**
* Return link to external web page with given submission.
*
* @param externalJobId Submission external job id.
* @return Null in case that external page is not supported or available or
* HTTP link to given submission.
*/
public String externalLink(String externalJobId) {
return null;
}
}

View File

@ -23,6 +23,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.util.Date;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
@ -43,7 +44,9 @@
import org.apache.sqoop.driver.JobRequest;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.SubmissionError;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
@ -286,13 +289,8 @@ public void stop(String externalJobId) {
}
}
/**
* {@inheritDoc}
*/
@Override
public SubmissionStatus status(String externalJobId) {
private SubmissionStatus status(RunningJob runningJob) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
return SubmissionStatus.UNKNOWN;
}
@ -306,13 +304,8 @@ public SubmissionStatus status(String externalJobId) {
}
/**
* {@inheritDoc}
*/
@Override
public SubmissionError error(String externalJobId) {
private SubmissionError error(RunningJob runningJob) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if (runningJob == null) {
return null;
}
@ -323,43 +316,30 @@ public SubmissionError error(String externalJobId) {
error.setErrorDetails(runningJob.getFailureInfo());
return error;
}
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
return null;
}
/**
* {@inheritDoc}
*/
@Override
public double progress(String externalJobId) {
private double progress(RunningJob runningJob) {
try {
// Get some reasonable approximation of map-reduce job progress
// TODO(jarcec): What if we're running without reducers?
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
// Return default value
return super.progress(externalJobId);
return -1;
}
return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public Counters counters(String externalJobId) {
private Counters counters(RunningJob runningJob) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
// Return default value
return super.counters(externalJobId);
return null;
}
return convertMapreduceCounters(runningJob.getCounters());
@ -368,21 +348,12 @@ public Counters counters(String externalJobId) {
}
}
/**
* {@inheritDoc}
*/
@Override
public String externalLink(String externalJobId) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
return null;
}
return runningJob.getTrackingURL();
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
private String externalLink(RunningJob runningJob) {
if (runningJob == null) {
return null;
}
return runningJob.getTrackingURL();
}
/**
@ -392,7 +363,7 @@ public String externalLink(String externalJobId) {
* @param status Map-reduce job constant
* @return Equivalent submission status
*/
protected SubmissionStatus convertMapreduceState(int status) {
private SubmissionStatus convertMapreduceState(int status) {
if(status == JobStatus.PREP) {
return SubmissionStatus.BOOTING;
} else if (status == JobStatus.RUNNING) {
@ -435,6 +406,39 @@ private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hado
return sqoopCounters;
}
/**
* {@inheritDoc}
*/
@Override
public void update(MSubmission submission) {
double progress = -1;
Counters counters = null;
String externalJobId = submission.getExternalJobId();
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
SubmissionStatus newStatus = status(runningJob);
SubmissionError error = error(runningJob);
String externalLink = externalLink(runningJob);
if (newStatus.isRunning()) {
progress = progress(runningJob);
} else {
counters = counters(runningJob);
}
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
submission.setExternalLink(externalLink);
submission.setLastUpdateDate(new Date());
RepositoryManager.getInstance().getRepository().updateSubmission(submission);
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
/**
* Detect MapReduce local mode.
*