From a27844a84b248f9649d65d38a6a9a32655697a40 Mon Sep 17 00:00:00 2001 From: mickyuan <384077767@qq.com> Date: Tue, 28 Feb 2023 15:48:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DtaskExecutor=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E5=90=8E=EF=BC=8C=E6=B2=A1=E6=9C=89=E6=AD=A3=E5=B8=B8?= =?UTF-8?q?=E5=85=B3=E9=97=AD=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/datax/core/taskgroup/TaskGroupContainer.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java index b4b45695..3db320dd 100755 --- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java @@ -187,8 +187,15 @@ public class TaskGroupContainer extends AbstractContainer { } } - // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误 + // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误,同时关闭失败的taskExecutor if (failedOrKilled) { + + taskFailedExecutorMap.forEach((taskId, taskExecutor) -> { + if (!taskExecutor.isShutdown()) { + taskExecutor.shutdown(); + } + }); + lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);