diff --git a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java index 31079e41..1337fad6 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -234,16 +235,23 @@ protected int configureNumTasks(Job job) throws IOException { protected boolean runJob(Job job) throws ClassNotFoundException, IOException, InterruptedException { - PerfCounters counters = new PerfCounters(); - counters.startClock(); + PerfCounters perfCounters = new PerfCounters(); + perfCounters.startClock(); boolean success = job.waitForCompletion(true); - counters.stopClock(); - counters.addBytes(job.getCounters().getGroup("FileSystemCounters") - .findCounter("HDFS_BYTES_READ").getValue()); - LOG.info("Transferred " + counters.toString()); - long numRecords = HadoopShim.get().getNumMapInputRecords(job); - LOG.info("Exported " + numRecords + " records."); + perfCounters.stopClock(); + + Counters jobCounters = job.getCounters(); + // If the job has been retired, these may be unavailable. + if (null == jobCounters) { + displayRetiredJobNotice(LOG); + } else { + perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters") + .findCounter("HDFS_BYTES_READ").getValue()); + LOG.info("Transferred " + perfCounters.toString()); + long numRecords = HadoopShim.get().getNumMapInputRecords(job); + LOG.info("Exported " + numRecords + " records."); + } return success; } diff --git a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java index 069c5119..f4bfe815 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -100,16 +101,23 @@ protected void configureOutputFormat(Job job, String tableName, protected boolean runJob(Job job) throws ClassNotFoundException, IOException, InterruptedException { - PerfCounters counters = new PerfCounters(); - counters.startClock(); + PerfCounters perfCounters = new PerfCounters(); + perfCounters.startClock(); boolean success = job.waitForCompletion(true); - counters.stopClock(); - counters.addBytes(job.getCounters().getGroup("FileSystemCounters") - .findCounter("HDFS_BYTES_WRITTEN").getValue()); - LOG.info("Transferred " + counters.toString()); - long numRecords = HadoopShim.get().getNumMapOutputRecords(job); - LOG.info("Retrieved " + numRecords + " records."); + perfCounters.stopClock(); + + Counters jobCounters = job.getCounters(); + // If the job has been retired, these may be unavailable. + if (null == jobCounters) { + displayRetiredJobNotice(LOG); + } else { + perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters") + .findCounter("HDFS_BYTES_WRITTEN").getValue()); + LOG.info("Transferred " + perfCounters.toString()); + long numRecords = HadoopShim.get().getNumMapOutputRecords(job); + LOG.info("Retrieved " + numRecords + " records."); + } return success; } diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java index 1646b824..b80cfc97 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java @@ -193,4 +193,20 @@ protected boolean runJob(Job job) throws ClassNotFoundException, IOException, InterruptedException { return job.waitForCompletion(true); } + + /** + * Display a notice on the log that the current MapReduce job has + * been retired, and thus Counters are unavailable. + * @param log the Log to display the info to. + */ + protected void displayRetiredJobNotice(Log log) { + log.info("The MapReduce job has already been retired. Performance"); + log.info("counters are unavailable. To get this information, "); + log.info("you will need to enable the completed job store on "); + log.info("the jobtracker with:"); + log.info("mapreduce.jobtracker.persist.jobstatus.active = true"); + log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1"); + log.info("A jobtracker restart is required for these settings"); + log.info("to take effect."); + } }