From b4bdc6d0e755ee28c6ebf5ac86f8c8475ddc9e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=8F=82?= Date: Thu, 30 May 2019 13:19:58 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=A4=9AJobContiner?= =?UTF-8?q?=E5=9C=A8=E5=90=8C=E4=B8=80=E4=B8=AAJVM=E4=B8=8B=E6=97=B6?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=8A=A5=E5=91=8A=E5=90=88=E5=B9=B6=E6=97=B6?= =?UTF-8?q?=E9=80=A0=E6=88=90=E7=9A=84=E6=8A=A5=E5=91=8A=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E4=B8=BA=E6=89=80=E6=9C=89JobContiner=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E6=89=80=E6=9C=89=E4=BB=BB=E5=8A=A1=E7=9A=84=E6=8A=A5=E5=91=8A?= =?UTF-8?q?=E6=80=BB=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/core/container/util/JobAssignUtil.java | 12 +++++++++--- .../core/statistics/communication/Communication.java | 11 ++++++++++- .../communication/LocalTGCommunicationManager.java | 11 +++++++++-- .../container/collector/ProcessInnerCollector.java | 8 +++++++- .../taskgroup/StandaloneTGContainerCommunicator.java | 1 + 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java b/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java index 31ba60a4..77fd8ef5 100755 --- a/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/container/util/JobAssignUtil.java @@ -161,13 +161,19 @@ public final class JobAssignUtil { } } } - + Long jobId = taskGroupTemplate.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID); Configuration tempTaskGroupConfig; for (int i = 0; i < taskGroupNumber; i++) { tempTaskGroupConfig = taskGroupTemplate.clone(); tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i)); - tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i); - + //tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i); + /** + * 这里为会区分该统计信息属于哪个Jobcontiner 用字符串相连的原因是 + * 1+3=4 + * 2+2=4 + * 用字符串相连可以规避这个问题 + */ + tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,Integer.parseInt(jobId+ ""+ i)); result.add(tempTaskGroupConfig); } diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/communication/Communication.java b/core/src/main/java/com/alibaba/datax/core/statistics/communication/Communication.java index 97867c95..b636ab7a 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/communication/Communication.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/communication/Communication.java @@ -40,6 +40,8 @@ public class Communication extends BaseObject implements Cloneable { */ Map> message; + private Long jobId; + public Communication() { this.init(); } @@ -277,5 +279,12 @@ public class Communication extends BaseObject implements Cloneable { return this.state == State.SUCCEEDED || this.state == State.FAILED || this.state == State.KILLED; } - + + public Long getJobId() { + return jobId; + } + + public void setJobId(Long jobId) { + this.jobId = jobId; + } } diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/communication/LocalTGCommunicationManager.java b/core/src/main/java/com/alibaba/datax/core/statistics/communication/LocalTGCommunicationManager.java index 0b0529f8..5356b50a 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/communication/LocalTGCommunicationManager.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/communication/LocalTGCommunicationManager.java @@ -16,13 +16,20 @@ public final class LocalTGCommunicationManager { taskGroupCommunicationMap.put(taskGroupId, communication); } - public static Communication getJobCommunication() { + public static Communication getJobCommunication(Long jobId) { Communication communication = new Communication(); communication.setState(State.SUCCEEDED); for (Communication taskGroupCommunication : taskGroupCommunicationMap.values()) { - communication.mergeFrom(taskGroupCommunication); + if(taskGroupCommunication.getJobId()==null){ + communication.mergeFrom(taskGroupCommunication); + } + if(taskGroupCommunication.getJobId()==null + ||jobId.equals(taskGroupCommunication.getJobId())){ //如JOB在正式启动后过段时间才会设置JobId所以这里把getJobId为空的也合并进去 + communication.mergeFrom(taskGroupCommunication); //因为如果为空就说明里面啥都没有合并了也不会影响什么 + + } } return communication; diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/container/collector/ProcessInnerCollector.java b/core/src/main/java/com/alibaba/datax/core/statistics/container/collector/ProcessInnerCollector.java index 530794b5..43db3822 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/container/collector/ProcessInnerCollector.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/container/collector/ProcessInnerCollector.java @@ -11,7 +11,13 @@ public class ProcessInnerCollector extends AbstractCollector { @Override public Communication collectFromTaskGroup() { - return LocalTGCommunicationManager.getJobCommunication(); + /** + * 在整合到JAVA中启动时会使多个Jobcontiner 使用同一个LocalTGCommunicationManager.taskGroupCommunicationMap + * taskGroupCommunicationMap中存放当前Job的分组情况一般KEY从0开始 + * 因为是静态的共享的每个JobContiner的taskgroup都是从0开始这样就到导致新的jobcontiner的收集器会收集的老的jobcontiner中的统计信息 + * 这里通过传指定的JobId来区分统计信息 + */ + return LocalTGCommunicationManager.getJobCommunication(this.getJobId()); } } diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java index 7852154d..7ba0e7f4 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java @@ -13,6 +13,7 @@ public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommun @Override public void report(Communication communication) { + communication.setJobId(super.jobId);//给当前 super.getReporter().reportTGCommunication(super.taskGroupId, communication); } From 9b5ed99a0cff3a2a5069b0f8acccd1250dde9337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=8F=82?= Date: Tue, 4 Jun 2019 14:34:30 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=A4=9AJobContiner?= =?UTF-8?q?=E5=9C=A8=E5=90=8C=E4=B8=80=E4=B8=AAJVM=E4=B8=8B=E6=97=B6?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E8=B7=AF=E5=BE=84=E6=89=BE=E4=B8=8D=E5=88=B0?= =?UTF-8?q?BUG=20=E4=BB=A5=E4=B8=8B=E6=98=AF=E8=BD=A6=E7=A5=B8=E7=8E=B0?= =?UTF-8?q?=E5=9C=BA=20=20=20/**=20=20=20=20=20=20*=20=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E9=85=8D=E7=BD=AE=E6=94=BE=E7=BD=AE=E5=9C=A8?= =?UTF-8?q?pluginRegisterCenter=E4=B8=AD=EF=BC=8C=E4=B8=BA=E5=8C=BA?= =?UTF-8?q?=E5=88=ABreader=E3=80=81transformer=E5=92=8Cwriter=EF=BC=8C?= =?UTF-8?q?=E8=BF=98=E8=83=BD=E5=8C=BA=E5=88=AB=20=20=20=20=20=20*=20?= =?UTF-8?q?=E5=85=B7=E4=BD=93pluginName=EF=BC=8C=E6=95=85=E4=BD=BF?= =?UTF-8?q?=E7=94=A8pluginType.pluginName=E4=BD=9C=E4=B8=BAkey=E6=94=BE?= =?UTF-8?q?=E7=BD=AE=E5=9C=A8=E8=AF=A5map=E4=B8=AD=20=20=20=20=20=20*/=20?= =?UTF-8?q?=20=20=20=20private=20static=20Configuration=20pluginRegisterCe?= =?UTF-8?q?nter;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit */ public static void bind(Configuration pluginConfigs) { pluginRegisterCenter = pluginConfigs; } --- .../alibaba/datax/core/job/JobContainer.java | 62 ++++--- .../core/taskgroup/TaskGroupContainer.java | 161 ++++++++++-------- .../core/util/container/CoreConstant.java | 2 + .../datax/core/util/container/LoadUtil.java | 50 +++--- 4 files changed, 160 insertions(+), 115 deletions(-) diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java index 50f1cf7b..9e9d3251 100755 --- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java @@ -35,7 +35,9 @@ import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Created by jingxing on 14-8-24. @@ -216,10 +218,10 @@ public class JobContainer extends AbstractContainer { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( - PluginType.READER, this.readerPluginName); + PluginType.READER, this.readerPluginName,this.jobId); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true); @@ -229,7 +231,7 @@ public class JobContainer extends AbstractContainer { // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); - + this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER+".jobid",this.jobId); jobReader.setJobPluginCollector(jobPluginCollector); classLoaderSwapper.restoreCurrentThreadClassLoader(); @@ -241,10 +243,10 @@ public class JobContainer extends AbstractContainer { this.writerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin( - PluginType.WRITER, this.writerPluginName); + PluginType.WRITER, this.writerPluginName,this.jobId); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true); @@ -265,7 +267,7 @@ public class JobContainer extends AbstractContainer { private void preCheckReader() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", this.readerPluginName)); this.jobReader.preCheck(); @@ -274,7 +276,7 @@ public class JobContainer extends AbstractContainer { private void preCheckWriter() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", this.writerPluginName)); this.jobWriter.preCheck(); @@ -328,10 +330,10 @@ public class JobContainer extends AbstractContainer { CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - handlerPluginType, handlerPluginName)); + handlerPluginType, handlerPluginName,this.jobId)); AbstractJobPlugin handler = LoadUtil.loadJobPlugin( - handlerPluginType, handlerPluginName); + handlerPluginType, handlerPluginName,this.jobId); JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( this.getContainerCommunicator()); @@ -364,10 +366,10 @@ public class JobContainer extends AbstractContainer { CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - handlerPluginType, handlerPluginName)); + handlerPluginType, handlerPluginName,this.jobId)); AbstractJobPlugin handler = LoadUtil.loadJobPlugin( - handlerPluginType, handlerPluginName); + handlerPluginType, handlerPluginName,this.jobId); JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( this.getContainerCommunicator()); @@ -513,7 +515,7 @@ public class JobContainer extends AbstractContainer { ExecuteMode executeMode = null; AbstractScheduler scheduler; try { - executeMode = ExecuteMode.STANDALONE; + executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //设置 executeMode @@ -602,8 +604,16 @@ public class JobContainer extends AbstractContainer { reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond); super.getContainerCommunicator().report(reportCommunication); + LOG.info(CommunicationTool.Stringify.getSnapshot(communication)); + Map log = new HashMap(); + log.put("startTimeStamp",startTimeStamp); + log.put("endTimeStamp",endTimeStamp); + log.put("totalCosts",totalCosts); + log.put("byteSpeedPerSecond",byteSpeedPerSecond); + log.put("recordSpeedPerSecond",recordSpeedPerSecond); + log.put("communication",communication); - + //TODO 记录日志 LOG.info(String.format( "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" @@ -656,11 +666,11 @@ public class JobContainer extends AbstractContainer { this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( - PluginType.READER, this.readerPluginName); - + PluginType.READER, this.readerPluginName,this.jobId); + this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER_JOBID,this.jobId); // 设置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); @@ -668,7 +678,6 @@ public class JobContainer extends AbstractContainer { // 设置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); - jobReader.setJobPluginCollector(jobPluginCollector); jobReader.init(); @@ -686,15 +695,14 @@ public class JobContainer extends AbstractContainer { this.writerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin( - PluginType.WRITER, this.writerPluginName); - + PluginType.WRITER, this.writerPluginName,this.jobId); + this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID,this.jobId); // 设置writer的jobConfig jobWriter.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); - // 设置reader的readerConfig jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); @@ -709,7 +717,7 @@ public class JobContainer extends AbstractContainer { private void prepareJobReader() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); LOG.info(String.format("DataX Reader.Job [%s] do prepare work .", this.readerPluginName)); this.jobReader.prepare(); @@ -718,7 +726,7 @@ public class JobContainer extends AbstractContainer { private void prepareJobWriter() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); LOG.info(String.format("DataX Writer.Job [%s] do prepare work .", this.writerPluginName)); this.jobWriter.prepare(); @@ -728,7 +736,7 @@ public class JobContainer extends AbstractContainer { // TODO: 如果源头就是空数据 private List doReaderSplit(int adviceNumber) { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); List readerSlicesConfigs = this.jobReader.split(adviceNumber); if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) { @@ -744,7 +752,7 @@ public class JobContainer extends AbstractContainer { private List doWriterSplit(int readerTaskNumber) { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); List writerSlicesConfigs = this.jobWriter .split(readerTaskNumber); @@ -938,7 +946,7 @@ public class JobContainer extends AbstractContainer { private void postJobReader() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.READER, this.readerPluginName)); + PluginType.READER, this.readerPluginName,this.jobId)); LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName); this.jobReader.post(); @@ -947,7 +955,7 @@ public class JobContainer extends AbstractContainer { private void postJobWriter() { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( - PluginType.WRITER, this.writerPluginName)); + PluginType.WRITER, this.writerPluginName,this.jobId)); LOG.info("DataX Writer.Job [{}] do post work.", this.writerPluginName); this.jobWriter.post(); diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java index c30c94d9..161d8d5e 100755 --- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java @@ -12,6 +12,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.AbstractContainer; import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.CommunicationTool; +import com.alibaba.datax.core.statistics.communication.LocalTGCommunicationManager; import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator; import com.alibaba.datax.core.statistics.plugin.task.AbstractTaskPluginCollector; import com.alibaba.datax.core.taskgroup.runner.AbstractRunner; @@ -118,7 +119,7 @@ public class TaskGroupContainer extends AbstractContainer { CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000); long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000); - + List taskConfigs = this.configuration .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT); @@ -126,12 +127,12 @@ public class TaskGroupContainer extends AbstractContainer { LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId, JSON.toJSONString(taskConfigs)); } - + int taskCountInThisTaskGroup = taskConfigs.size(); LOG.info(String.format( "taskGroupId=[%d] start [%d] channels for [%d] tasks.", this.taskGroupId, channelNumber, taskCountInThisTaskGroup)); - + this.containerCommunicator.registerCommunication(taskConfigs); Map taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置 @@ -144,12 +145,12 @@ public class TaskGroupContainer extends AbstractContainer { Communication lastTaskGroupContainerCommunication = new Communication(); while (true) { - //1.判断task状态 - boolean failedOrKilled = false; - Map communicationMap = containerCommunicator.getCommunicationMap(); - for(Map.Entry entry : communicationMap.entrySet()){ - Integer taskId = entry.getKey(); - Communication taskCommunication = entry.getValue(); + //1.判断task状态 + boolean failedOrKilled = false; + Map communicationMap = containerCommunicator.getCommunicationMap(); + for(Map.Entry entry : communicationMap.entrySet()){ + Integer taskId = entry.getKey(); + Communication taskCommunication = entry.getValue(); if(!taskCommunication.isFinished()){ continue; } @@ -159,21 +160,21 @@ public class TaskGroupContainer extends AbstractContainer { taskMonitor.removeTask(taskId); //失败,看task是否支持failover,重试次数未超过最大限制 - if(taskCommunication.getState() == State.FAILED){ + if(taskCommunication.getState() == State.FAILED){ taskFailedExecutorMap.put(taskId, taskExecutor); - if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){ + if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){ taskExecutor.shutdown(); //关闭老的executor containerCommunicator.resetCommunication(taskId); //将task的状态重置 - Configuration taskConfig = taskConfigMap.get(taskId); - taskQueue.add(taskConfig); //重新加入任务列表 - }else{ - failedOrKilled = true; - break; - } - }else if(taskCommunication.getState() == State.KILLED){ - failedOrKilled = true; - break; - }else if(taskCommunication.getState() == State.SUCCEEDED){ + Configuration taskConfig = taskConfigMap.get(taskId); + taskQueue.add(taskConfig); //重新加入任务列表 + }else{ + failedOrKilled = true; + break; + } + }else if(taskCommunication.getState() == State.KILLED){ + failedOrKilled = true; + break; + }else if(taskCommunication.getState() == State.SUCCEEDED){ Long taskStartTime = taskStartTimeMap.get(taskId); if(taskStartTime != null){ Long usedTime = System.currentTimeMillis() - taskStartTime; @@ -185,8 +186,8 @@ public class TaskGroupContainer extends AbstractContainer { taskConfigMap.remove(taskId); } } - } - + } + // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误 if (failedOrKilled) { lastTaskGroupContainerCommunication = reportTaskGroupCommunication( @@ -195,7 +196,7 @@ public class TaskGroupContainer extends AbstractContainer { throw DataXException.asDataXException( FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable()); } - + //3.有任务未执行,且正在运行的任务数小于最大通道限制 Iterator iterator = taskQueue.iterator(); while(iterator.hasNext() && runTasks.size() < channelNumber){ @@ -225,9 +226,9 @@ public class TaskGroupContainer extends AbstractContainer { } } Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig; - TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); + TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); taskStartTimeMap.put(taskId, System.currentTimeMillis()); - taskExecutor.doStart(); + taskExecutor.doStart(); iterator.remove(); runTasks.add(taskExecutor); @@ -242,10 +243,9 @@ public class TaskGroupContainer extends AbstractContainer { //4.任务列表为空,executor已结束, 搜集状态为success--->成功 if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) { - // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确 + // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确 lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); - LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId); break; } @@ -293,46 +293,47 @@ public class TaskGroupContainer extends AbstractContainer { } LOG.info(PerfTrace.getInstance().summarizeNoException()); + this.removeTaskGroup();//移除指定JobId中的统计Map } } } - + private Map buildTaskConfigMap(List configurations){ - Map map = new HashMap(); - for(Configuration taskConfig : configurations){ - int taskId = taskConfig.getInt(CoreConstant.TASK_ID); - map.put(taskId, taskConfig); - } - return map; + Map map = new HashMap(); + for(Configuration taskConfig : configurations){ + int taskId = taskConfig.getInt(CoreConstant.TASK_ID); + map.put(taskId, taskConfig); + } + return map; } private List buildRemainTasks(List configurations){ - List remainTasks = new LinkedList(); - for(Configuration taskConfig : configurations){ - remainTasks.add(taskConfig); - } - return remainTasks; + List remainTasks = new LinkedList(); + for(Configuration taskConfig : configurations){ + remainTasks.add(taskConfig); + } + return remainTasks; } - + private TaskExecutor removeTask(List taskList, int taskId){ - Iterator iterator = taskList.iterator(); - while(iterator.hasNext()){ - TaskExecutor taskExecutor = iterator.next(); - if(taskExecutor.getTaskId() == taskId){ - iterator.remove(); - return taskExecutor; - } - } - return null; + Iterator iterator = taskList.iterator(); + while(iterator.hasNext()){ + TaskExecutor taskExecutor = iterator.next(); + if(taskExecutor.getTaskId() == taskId){ + iterator.remove(); + return taskExecutor; + } + } + return null; } - + private boolean isAllTaskDone(List taskList){ - for(TaskExecutor taskExecutor : taskList){ - if(!taskExecutor.isTaskFinished()){ - return false; - } - } - return true; + for(TaskExecutor taskExecutor : taskList){ + if(!taskExecutor.isTaskFinished()){ + return false; + } + } + return true; } private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){ @@ -365,9 +366,9 @@ public class TaskGroupContainer extends AbstractContainer { private Thread readerThread; private Thread writerThread; - + private ReaderRunner readerRunner; - + private WriterRunner writerRunner; /** @@ -417,7 +418,7 @@ public class TaskGroupContainer extends AbstractContainer { //通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器 this.writerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.WRITER, this.taskConfig.getString( - CoreConstant.JOB_WRITER_NAME))); + CoreConstant.JOB_WRITER_NAME),getJobId())); /** * 生成readerThread @@ -431,7 +432,7 @@ public class TaskGroupContainer extends AbstractContainer { */ this.readerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.taskConfig.getString( - CoreConstant.JOB_READER_NAME))); + CoreConstant.JOB_READER_NAME),getJobId())); } public void doStart() { @@ -468,7 +469,7 @@ public class TaskGroupContainer extends AbstractContainer { switch (pluginType) { case READER: newRunner = LoadUtil.loadPluginRunner(pluginType, - this.taskConfig.getString(CoreConstant.JOB_READER_NAME)); + this.taskConfig.getString(CoreConstant.JOB_READER_NAME),getJobId()); newRunner.setJobConf(this.taskConfig.getConfiguration( CoreConstant.JOB_READER_PARAMETER)); @@ -493,7 +494,7 @@ public class TaskGroupContainer extends AbstractContainer { break; case WRITER: newRunner = LoadUtil.loadPluginRunner(pluginType, - this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)); + this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME),getJobId()); newRunner.setJobConf(this.taskConfig .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER)); @@ -527,14 +528,14 @@ public class TaskGroupContainer extends AbstractContainer { } if(taskCommunication==null || !taskCommunication.isFinished()){ - return false; - } + return false; + } return true; } - + private int getTaskId(){ - return taskId; + return taskId; } private long getTimeStamp(){ @@ -544,9 +545,9 @@ public class TaskGroupContainer extends AbstractContainer { private int getAttemptCount(){ return attemptCount; } - + private boolean supportFailOver(){ - return writerRunner.supportFailOver(); + return writerRunner.supportFailOver(); } private void shutdown(){ @@ -564,4 +565,26 @@ public class TaskGroupContainer extends AbstractContainer { return !readerThread.isAlive() && !writerThread.isAlive(); } } + + public void removeTaskGroup(){ + try { + /** + * 移除根据JOBID做的一些标记 防止内存溢出 + */ + LoadUtil.getConfigurationSet().remove(this.jobId); + Iterator> it = + LocalTGCommunicationManager.getTaskGroupCommunicationMap().entrySet().iterator(); + while(it.hasNext()){ + Map.Entry entry = it.next(); + String strJobId = String.valueOf(this.jobId); + String key = String.valueOf(entry.getKey()); + if (key.startsWith(strJobId)) { + it.remove(); + } + } + }catch (Exception e){ + e.printStackTrace(); + } + + } } diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java index 6a0b6205..517cfe05 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java @@ -148,6 +148,8 @@ public class CoreConstant { public static final String CURRENT_SERVICE_USERNAME = "current.service.username"; public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; + public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID = "job.content[0].writer.parameter.jobid"; + public static final String DATAX_JOB_CONTENT_READER_PARAMETER_JOBID = "job.content[0].reader.parameter.jobid"; // ----------------------------- 环境变量 --------------------------------- diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java index 30e926c3..603eaa24 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java @@ -12,8 +12,8 @@ import com.alibaba.datax.core.taskgroup.runner.WriterRunner; import com.alibaba.datax.core.util.FrameworkErrorCode; import org.apache.commons.lang3.StringUtils; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * Created by jingxing on 14-8-24. @@ -46,6 +46,10 @@ public class LoadUtil { */ private static Configuration pluginRegisterCenter; + private final static Map configurationSet = new ConcurrentHashMap(); + public static Map getConfigurationSet(){ + return configurationSet; + } /** * jarLoader的缓冲 */ @@ -56,10 +60,16 @@ public class LoadUtil { * * @param pluginConfigs */ - public static void bind(Configuration pluginConfigs) { + public static synchronized void bind(Configuration pluginConfigs) { pluginRegisterCenter = pluginConfigs; + Long jobId = pluginConfigs.getLong( + CoreConstant.DATAX_CORE_CONTAINER_JOB_ID); + configurationSet.put(jobId,pluginConfigs); + } + + private static String generatePluginKey(PluginType pluginType, String pluginName) { return String.format(pluginTypeNameFormat, pluginType.toString(), @@ -67,10 +77,12 @@ public class LoadUtil { } private static Configuration getPluginConf(PluginType pluginType, - String pluginName) { - Configuration pluginConf = pluginRegisterCenter + String pluginName,Long jobId) { +// Configuration pluginConf = pluginRegisterCenter +// .getConfiguration(generatePluginKey(pluginType, pluginName)); + Configuration pluginConf + = configurationSet.get(jobId) .getConfiguration(generatePluginKey(pluginType, pluginName)); - if (null == pluginConf) { throw DataXException.asDataXException( FrameworkErrorCode.PLUGIN_INSTALL_ERROR, @@ -89,14 +101,14 @@ public class LoadUtil { * @return */ public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, - String pluginName) { + String pluginName,Long jobId) { Class clazz = LoadUtil.loadPluginClass( - pluginType, pluginName, ContainerType.Job); + pluginType, pluginName, ContainerType.Job,jobId); try { AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz .newInstance(); - jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName)); + jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId)); return jobPlugin; } catch (Exception e) { throw DataXException.asDataXException( @@ -114,14 +126,14 @@ public class LoadUtil { * @return */ public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, - String pluginName) { + String pluginName,Long jobId) { Class clazz = LoadUtil.loadPluginClass( - pluginType, pluginName, ContainerType.Task); + pluginType, pluginName, ContainerType.Task,jobId); try { AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz .newInstance(); - taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName)); + taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId)); return taskPlugin; } catch (Exception e) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, @@ -137,9 +149,9 @@ public class LoadUtil { * @param pluginName * @return */ - public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) { + public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName,Long jobId) { AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType, - pluginName); + pluginName,jobId); switch (pluginType) { case READER: @@ -165,9 +177,9 @@ public class LoadUtil { @SuppressWarnings("unchecked") private static synchronized Class loadPluginClass( PluginType pluginType, String pluginName, - ContainerType pluginRunType) { - Configuration pluginConf = getPluginConf(pluginType, pluginName); - JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); + ContainerType pluginRunType,Long jobId) { + Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId); + JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName,jobId); try { return (Class) jarLoader .loadClass(pluginConf.getString("class") + "$" @@ -178,8 +190,8 @@ public class LoadUtil { } public static synchronized JarLoader getJarLoader(PluginType pluginType, - String pluginName) { - Configuration pluginConf = getPluginConf(pluginType, pluginName); + String pluginName,Long jobId) { + Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId); JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName));