diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java new file mode 100644 index 00000000..7e872500 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + + +/** + * Runnable that will ping mapreduce context about progress. + */ +public class ProgressRunnable implements Runnable { + + public static final Log LOG = LogFactory.getLog(ProgressRunnable.class); + + /** + * Context class that we should use for reporting progress. + */ + private final TaskInputOutputContext context; + + public ProgressRunnable(final TaskInputOutputContext ctxt) { + this.context = ctxt; + } + + @Override + public void run() { + LOG.debug("Auto-progress thread reporting progress"); + this.context.progress(); + } +} diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 2a823032..7715d5f7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -18,6 +18,9 @@ package org.apache.sqoop.job.mr; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,11 +41,14 @@ /** * A mapper to perform map function. */ -public class SqoopMapper - extends Mapper { +public class SqoopMapper extends Mapper { - public static final Log LOG = - LogFactory.getLog(SqoopMapper.class.getName()); + public static final Log LOG = LogFactory.getLog(SqoopMapper.class); + + /** + * Service for reporting progress to mapreduce. + */ + private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); @Override public void run(Context context) throws IOException, InterruptedException { @@ -76,6 +82,9 @@ public void run(Context context) throws IOException, InterruptedException { ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context)); try { + LOG.info("Starting progress service"); + progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + LOG.info("Running extractor class " + extractorName); extractor.extract(extractorContext, configConnection, configJob, split.getPartition()); LOG.info("Extractor has finished"); @@ -83,6 +92,13 @@ public void run(Context context) throws IOException, InterruptedException { .increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); + } finally { + LOG.info("Stopping progress service"); + progressService.shutdown(); + if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.info("Stopping progress service with shutdownNow"); + progressService.shutdownNow(); + } } } diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index d2361482..e4ad6ba9 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -23,13 +23,38 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.sqoop.job.io.Data; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * A reducer to perform reduce function. */ -public class SqoopReducer - extends Reducer { +public class SqoopReducer extends Reducer { - public static final Log LOG = - LogFactory.getLog(SqoopReducer.class.getName()); + public static final Log LOG = LogFactory.getLog(SqoopReducer.class); + /** + * Service for reporting progress to mapreduce. + */ + private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); + + @Override + public void run(Context context) throws IOException, InterruptedException { + try { + LOG.info("Starting progress service"); + progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + + // Delegating all functionality to our parent + super.run(context); + } finally { + LOG.info("Stopping progress service"); + progressService.shutdown(); + if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.info("Stopping progress service with shutdownNow"); + progressService.shutdownNow(); + } + } + } }