mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 20:39:48 +08:00
解决多JobContiner在同一个JVM下时统计报告合并时造成的报告日志为所有JobContiner中的所有任务的报告总合
This commit is contained in:
parent
4b99fab875
commit
b4bdc6d0e7
@ -161,13 +161,19 @@ public final class JobAssignUtil {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Long jobId = taskGroupTemplate.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
|
||||
Configuration tempTaskGroupConfig;
|
||||
for (int i = 0; i < taskGroupNumber; i++) {
|
||||
tempTaskGroupConfig = taskGroupTemplate.clone();
|
||||
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
|
||||
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
|
||||
|
||||
//tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
|
||||
/**
|
||||
* 这里为会区分该统计信息属于哪个Jobcontiner 用字符串相连的原因是
|
||||
* 1+3=4
|
||||
* 2+2=4
|
||||
* 用字符串相连可以规避这个问题
|
||||
*/
|
||||
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,Integer.parseInt(jobId+ ""+ i));
|
||||
result.add(tempTaskGroupConfig);
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,8 @@ public class Communication extends BaseObject implements Cloneable {
|
||||
*/
|
||||
Map<String, List<String>> message;
|
||||
|
||||
private Long jobId;
|
||||
|
||||
public Communication() {
|
||||
this.init();
|
||||
}
|
||||
@ -277,5 +279,12 @@ public class Communication extends BaseObject implements Cloneable {
|
||||
return this.state == State.SUCCEEDED || this.state == State.FAILED
|
||||
|| this.state == State.KILLED;
|
||||
}
|
||||
|
||||
|
||||
public Long getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public void setJobId(Long jobId) {
|
||||
this.jobId = jobId;
|
||||
}
|
||||
}
|
||||
|
@ -16,13 +16,20 @@ public final class LocalTGCommunicationManager {
|
||||
taskGroupCommunicationMap.put(taskGroupId, communication);
|
||||
}
|
||||
|
||||
public static Communication getJobCommunication() {
|
||||
public static Communication getJobCommunication(Long jobId) {
|
||||
Communication communication = new Communication();
|
||||
communication.setState(State.SUCCEEDED);
|
||||
|
||||
for (Communication taskGroupCommunication :
|
||||
taskGroupCommunicationMap.values()) {
|
||||
communication.mergeFrom(taskGroupCommunication);
|
||||
if(taskGroupCommunication.getJobId()==null){
|
||||
communication.mergeFrom(taskGroupCommunication);
|
||||
}
|
||||
if(taskGroupCommunication.getJobId()==null
|
||||
||jobId.equals(taskGroupCommunication.getJobId())){ //如JOB在正式启动后过段时间才会设置JobId所以这里把getJobId为空的也合并进去
|
||||
communication.mergeFrom(taskGroupCommunication); //因为如果为空就说明里面啥都没有合并了也不会影响什么
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return communication;
|
||||
|
@ -11,7 +11,13 @@ public class ProcessInnerCollector extends AbstractCollector {
|
||||
|
||||
@Override
|
||||
public Communication collectFromTaskGroup() {
|
||||
return LocalTGCommunicationManager.getJobCommunication();
|
||||
/**
|
||||
* 在整合到JAVA中启动时会使多个Jobcontiner 使用同一个LocalTGCommunicationManager.taskGroupCommunicationMap
|
||||
* taskGroupCommunicationMap中存放当前Job的分组情况一般KEY从0开始
|
||||
* 因为是静态的共享的每个JobContiner的taskgroup都是从0开始这样就到导致新的jobcontiner的收集器会收集的老的jobcontiner中的统计信息
|
||||
* 这里通过传指定的JobId来区分统计信息
|
||||
*/
|
||||
return LocalTGCommunicationManager.getJobCommunication(this.getJobId());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommun
|
||||
|
||||
@Override
|
||||
public void report(Communication communication) {
|
||||
communication.setJobId(super.jobId);//给当前
|
||||
super.getReporter().reportTGCommunication(super.taskGroupId, communication);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user