mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 06:10:18 +08:00
SQOOP-15. Do not assume that Job.getCounters() returns non-null.
If jobs are already retired, display a useful message on how to enable the completed job store and suppress performance counter usage, rather than crashing with a NullPointerException. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0fcbcd759a
commit
0f35d54f21
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
@ -234,16 +235,23 @@ protected int configureNumTasks(Job job) throws IOException {
|
||||
protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
|
||||
InterruptedException {
|
||||
|
||||
PerfCounters counters = new PerfCounters();
|
||||
counters.startClock();
|
||||
PerfCounters perfCounters = new PerfCounters();
|
||||
perfCounters.startClock();
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
counters.stopClock();
|
||||
counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
|
||||
perfCounters.stopClock();
|
||||
|
||||
Counters jobCounters = job.getCounters();
|
||||
// If the job has been retired, these may be unavailable.
|
||||
if (null == jobCounters) {
|
||||
displayRetiredJobNotice(LOG);
|
||||
} else {
|
||||
perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
|
||||
.findCounter("HDFS_BYTES_READ").getValue());
|
||||
LOG.info("Transferred " + counters.toString());
|
||||
LOG.info("Transferred " + perfCounters.toString());
|
||||
long numRecords = HadoopShim.get().getNumMapInputRecords(job);
|
||||
LOG.info("Exported " + numRecords + " records.");
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
@ -100,16 +101,23 @@ protected void configureOutputFormat(Job job, String tableName,
|
||||
protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
|
||||
InterruptedException {
|
||||
|
||||
PerfCounters counters = new PerfCounters();
|
||||
counters.startClock();
|
||||
PerfCounters perfCounters = new PerfCounters();
|
||||
perfCounters.startClock();
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
counters.stopClock();
|
||||
counters.addBytes(job.getCounters().getGroup("FileSystemCounters")
|
||||
perfCounters.stopClock();
|
||||
|
||||
Counters jobCounters = job.getCounters();
|
||||
// If the job has been retired, these may be unavailable.
|
||||
if (null == jobCounters) {
|
||||
displayRetiredJobNotice(LOG);
|
||||
} else {
|
||||
perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
|
||||
.findCounter("HDFS_BYTES_WRITTEN").getValue());
|
||||
LOG.info("Transferred " + counters.toString());
|
||||
LOG.info("Transferred " + perfCounters.toString());
|
||||
long numRecords = HadoopShim.get().getNumMapOutputRecords(job);
|
||||
LOG.info("Retrieved " + numRecords + " records.");
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
|
@ -193,4 +193,20 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
|
||||
InterruptedException {
|
||||
return job.waitForCompletion(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Display a notice on the log that the current MapReduce job has
|
||||
* been retired, and thus Counters are unavailable.
|
||||
* @param log the Log to display the info to.
|
||||
*/
|
||||
protected void displayRetiredJobNotice(Log log) {
|
||||
log.info("The MapReduce job has already been retired. Performance");
|
||||
log.info("counters are unavailable. To get this information, ");
|
||||
log.info("you will need to enable the completed job store on ");
|
||||
log.info("the jobtracker with:");
|
||||
log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
|
||||
log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
|
||||
log.info("A jobtracker restart is required for these settings");
|
||||
log.info("to take effect.");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user