From 68be30c1cb5065d652c20c89057bf44971b88283 Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Mon, 15 Dec 2014 16:35:58 -0800 Subject: [PATCH] SQOOP-1897: Sqoop2: Submission Engine API change for better performance (Veena Basavaraj via Abraham Elmahrek) --- .../org/apache/sqoop/driver/JobManager.java | 31 +----- .../apache/sqoop/driver/SubmissionEngine.java | 57 +---------- .../mapreduce/MapreduceSubmissionEngine.java | 94 ++++++++++--------- 3 files changed, 56 insertions(+), 126 deletions(-) diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index ff263aec..f286c02e 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -556,7 +556,7 @@ public MSubmission stop(long jobId, HttpEventContext ctx) { mSubmission.setLastUpdateUser(ctx.getUsername()); // Fetch new information to verify that the stop command has actually worked - update(mSubmission); + submissionEngine.update(mSubmission); // Return updated structure return mSubmission; @@ -571,37 +571,12 @@ public MSubmission status(long jobId) { } // If the submission isin running state, let's update it if (mSubmission.getStatus().isRunning()) { - update(mSubmission); + submissionEngine.update(mSubmission); } return mSubmission; } - private void update(MSubmission submission) { - double progress = -1; - Counters counters = null; - String externalJobId = submission.getExternalJobId(); - SubmissionStatus newStatus = submissionEngine.status(externalJobId); - SubmissionError error = submissionEngine.error(externalJobId); - String externalLink = submissionEngine.externalLink(externalJobId); - - if (newStatus.isRunning()) { - progress = submissionEngine.progress(externalJobId); - } else { - counters = submissionEngine.counters(externalJobId); - } - - submission.setStatus(newStatus); - submission.setError(error); - submission.setProgress(progress); - submission.setCounters(counters); - submission.setExternalLink(externalLink); - submission.setLastUpdateDate(new Date()); - - RepositoryManager.getInstance().getRepository() - .updateSubmission(submission); - } - @Override public synchronized void configurationChanged() { LOG.info("Begin submission engine manager reconfiguring"); @@ -700,7 +675,7 @@ public void run() { .findUnfinishedSubmissions(); for (MSubmission submission : unfinishedSubmissions) { - update(submission); + submissionEngine.update(submission); } Thread.sleep(updateSleep); diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java index 62e0d8fc..f2995d2b 100644 --- a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java +++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java @@ -18,9 +18,7 @@ package org.apache.sqoop.driver; import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.model.SubmissionError; -import org.apache.sqoop.submission.counter.Counters; -import org.apache.sqoop.submission.SubmissionStatus; +import org.apache.sqoop.model.MSubmission; /** * Submission engine is responsible in conveying the information about the @@ -69,56 +67,9 @@ public void destroy() { public abstract void stop(String externalJobId); /** - * Return status of given submission. - * - * @param externalJobId Submission external job id. - * @return Current submission status. + * Update the given submission + * @param submission record to update */ - public abstract SubmissionStatus status(String externalJobId); + public abstract void update(MSubmission submission); - /** - * Return failure info if the job status is FAILED - * - * @param submissionId Submission internal id. - * @return Current failure info - */ - public abstract SubmissionError error(String externalJobId); - - /** - * Return submission progress. - * - * Expected is number from interval <0, 1> denoting how far the processing - * has gone or -1 in case that this submission engine do not supports - * progress reporting. - * - * @param externalJobId Submission external job id. - * @return {-1} union <0, 1> - */ - public double progress(String externalJobId) { - return -1; - } - - /** - * Return statistics for given submission id. - * - * Sqoop will call counters only for submission in state SUCCEEDED, - * it's consider exceptional state to call this method for other states. - * - * @param externalJobId Submission external job id. - * @return Submission statistics - */ - public Counters counters(String externalJobId) { - return null; - } - - /** - * Return link to external web page with given submission. - * - * @param externalJobId Submission external job id. - * @return Null in case that external page is not supported or available or - * HTTP link to given submission. - */ - public String externalLink(String externalJobId) { - return null; - } } diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index e04c888c..22a9736d 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -23,6 +23,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.MalformedURLException; +import java.util.Date; import java.util.Map; import org.apache.commons.lang.StringUtils; @@ -43,7 +44,9 @@ import org.apache.sqoop.driver.JobRequest; import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.mr.MRConfigurationUtils; +import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.SubmissionError; +import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; @@ -286,13 +289,8 @@ public void stop(String externalJobId) { } } - /** - * {@inheritDoc} - */ - @Override - public SubmissionStatus status(String externalJobId) { + private SubmissionStatus status(RunningJob runningJob) { try { - RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); if(runningJob == null) { return SubmissionStatus.UNKNOWN; } @@ -306,13 +304,8 @@ public SubmissionStatus status(String externalJobId) { } - /** - * {@inheritDoc} - */ - @Override - public SubmissionError error(String externalJobId) { + private SubmissionError error(RunningJob runningJob) { try { - RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); if (runningJob == null) { return null; } @@ -323,43 +316,30 @@ public SubmissionError error(String externalJobId) { error.setErrorDetails(runningJob.getFailureInfo()); return error; } - } catch (IOException e) { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); } return null; } - /** - * {@inheritDoc} - */ - @Override - public double progress(String externalJobId) { + private double progress(RunningJob runningJob) { try { - // Get some reasonable approximation of map-reduce job progress - // TODO(jarcec): What if we're running without reducers? - RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); if(runningJob == null) { // Return default value - return super.progress(externalJobId); + return -1; } - return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2; } catch (IOException e) { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); } } - /** - * {@inheritDoc} - */ - @Override - public Counters counters(String externalJobId) { + + private Counters counters(RunningJob runningJob) { try { - RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); if(runningJob == null) { // Return default value - return super.counters(externalJobId); + return null; } return convertMapreduceCounters(runningJob.getCounters()); @@ -368,21 +348,12 @@ public Counters counters(String externalJobId) { } } - /** - * {@inheritDoc} - */ - @Override - public String externalLink(String externalJobId) { - try { - RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); - if(runningJob == null) { - return null; - } - - return runningJob.getTrackingURL(); - } catch (IOException e) { - throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + private String externalLink(RunningJob runningJob) { + if (runningJob == null) { + return null; } + + return runningJob.getTrackingURL(); } /** @@ -392,7 +363,7 @@ public String externalLink(String externalJobId) { * @param status Map-reduce job constant * @return Equivalent submission status */ - protected SubmissionStatus convertMapreduceState(int status) { + private SubmissionStatus convertMapreduceState(int status) { if(status == JobStatus.PREP) { return SubmissionStatus.BOOTING; } else if (status == JobStatus.RUNNING) { @@ -435,6 +406,39 @@ private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hado return sqoopCounters; } + /** + * {@inheritDoc} + */ + @Override + public void update(MSubmission submission) { + double progress = -1; + Counters counters = null; + String externalJobId = submission.getExternalJobId(); + try { + RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId)); + + SubmissionStatus newStatus = status(runningJob); + SubmissionError error = error(runningJob); + String externalLink = externalLink(runningJob); + + if (newStatus.isRunning()) { + progress = progress(runningJob); + } else { + counters = counters(runningJob); + } + + submission.setStatus(newStatus); + submission.setError(error); + submission.setProgress(progress); + submission.setCounters(counters); + submission.setExternalLink(externalLink); + submission.setLastUpdateDate(new Date()); + + RepositoryManager.getInstance().getRepository().updateSubmission(submission); + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } + } /** * Detect MapReduce local mode. *