This commit is contained in:
Likename Haojie 2025-04-10 16:24:35 +08:00 committed by GitHub
commit f818863ac0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 196 additions and 122 deletions

View File

@ -161,13 +161,19 @@ public final class JobAssignUtil {
} }
} }
} }
Long jobId = taskGroupTemplate.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
Configuration tempTaskGroupConfig; Configuration tempTaskGroupConfig;
for (int i = 0; i < taskGroupNumber; i++) { for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone(); tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i)); tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i); //tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
/**
* 这里为会区分该统计信息属于哪个Jobcontiner 用字符串相连的原因是
* 1+3=4
* 2+2=4
* 用字符串相连可以规避这个问题
*/
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,Integer.parseInt(jobId+ ""+ i));
result.add(tempTaskGroupConfig); result.add(tempTaskGroupConfig);
} }

View File

@ -35,7 +35,9 @@ import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Created by jingxing on 14-8-24. * Created by jingxing on 14-8-24.
@ -216,10 +218,10 @@ public class JobContainer extends AbstractContainer {
this.readerPluginName = this.configuration.getString( this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME); CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName); PluginType.READER, this.readerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true);
@ -229,7 +231,7 @@ public class JobContainer extends AbstractContainer {
// 设置reader的readerConfig // 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER+".jobid",this.jobId);
jobReader.setJobPluginCollector(jobPluginCollector); jobReader.setJobPluginCollector(jobPluginCollector);
classLoaderSwapper.restoreCurrentThreadClassLoader(); classLoaderSwapper.restoreCurrentThreadClassLoader();
@ -241,10 +243,10 @@ public class JobContainer extends AbstractContainer {
this.writerPluginName = this.configuration.getString( this.writerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin( Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
PluginType.WRITER, this.writerPluginName); PluginType.WRITER, this.writerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true);
@ -265,7 +267,7 @@ public class JobContainer extends AbstractContainer {
private void preCheckReader() { private void preCheckReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .",
this.readerPluginName)); this.readerPluginName));
this.jobReader.preCheck(); this.jobReader.preCheck();
@ -274,7 +276,7 @@ public class JobContainer extends AbstractContainer {
private void preCheckWriter() { private void preCheckWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .",
this.writerPluginName)); this.writerPluginName));
this.jobWriter.preCheck(); this.jobWriter.preCheck();
@ -328,10 +330,10 @@ public class JobContainer extends AbstractContainer {
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME); CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
handlerPluginType, handlerPluginName)); handlerPluginType, handlerPluginName,this.jobId));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin( AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName); handlerPluginType, handlerPluginName,this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator()); this.getContainerCommunicator());
@ -364,10 +366,10 @@ public class JobContainer extends AbstractContainer {
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME); CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
handlerPluginType, handlerPluginName)); handlerPluginType, handlerPluginName,this.jobId));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin( AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName); handlerPluginType, handlerPluginName,this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator()); this.getContainerCommunicator());
@ -513,7 +515,7 @@ public class JobContainer extends AbstractContainer {
ExecuteMode executeMode = null; ExecuteMode executeMode = null;
AbstractScheduler scheduler; AbstractScheduler scheduler;
try { try {
executeMode = ExecuteMode.STANDALONE; executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration); scheduler = initStandaloneScheduler(this.configuration);
//设置 executeMode //设置 executeMode
@ -602,8 +604,16 @@ public class JobContainer extends AbstractContainer {
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond); reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);
super.getContainerCommunicator().report(reportCommunication); super.getContainerCommunicator().report(reportCommunication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
Map<String,Object> log = new HashMap<String,Object>();
log.put("startTimeStamp",startTimeStamp);
log.put("endTimeStamp",endTimeStamp);
log.put("totalCosts",totalCosts);
log.put("byteSpeedPerSecond",byteSpeedPerSecond);
log.put("recordSpeedPerSecond",recordSpeedPerSecond);
log.put("communication",communication);
//TODO 记录日志
LOG.info(String.format( LOG.info(String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n" "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
@ -656,11 +666,11 @@ public class JobContainer extends AbstractContainer {
this.readerPluginName = this.configuration.getString( this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME); CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName); PluginType.READER, this.readerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER_JOBID,this.jobId);
// 设置reader的jobConfig // 设置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration( jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
@ -668,7 +678,6 @@ public class JobContainer extends AbstractContainer {
// 设置reader的readerConfig // 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector); jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init(); jobReader.init();
@ -686,15 +695,14 @@ public class JobContainer extends AbstractContainer {
this.writerPluginName = this.configuration.getString( this.writerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin( Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
PluginType.WRITER, this.writerPluginName); PluginType.WRITER, this.writerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID,this.jobId);
// 设置writer的jobConfig // 设置writer的jobConfig
jobWriter.setPluginJobConf(this.configuration.getConfiguration( jobWriter.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
// 设置reader的readerConfig // 设置reader的readerConfig
jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration( jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
@ -709,7 +717,7 @@ public class JobContainer extends AbstractContainer {
private void prepareJobReader() { private void prepareJobReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
LOG.info(String.format("DataX Reader.Job [%s] do prepare work .", LOG.info(String.format("DataX Reader.Job [%s] do prepare work .",
this.readerPluginName)); this.readerPluginName));
this.jobReader.prepare(); this.jobReader.prepare();
@ -718,7 +726,7 @@ public class JobContainer extends AbstractContainer {
private void prepareJobWriter() { private void prepareJobWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info(String.format("DataX Writer.Job [%s] do prepare work .", LOG.info(String.format("DataX Writer.Job [%s] do prepare work .",
this.writerPluginName)); this.writerPluginName));
this.jobWriter.prepare(); this.jobWriter.prepare();
@ -728,7 +736,7 @@ public class JobContainer extends AbstractContainer {
// TODO: 如果源头就是空数据 // TODO: 如果源头就是空数据
private List<Configuration> doReaderSplit(int adviceNumber) { private List<Configuration> doReaderSplit(int adviceNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
List<Configuration> readerSlicesConfigs = List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber); this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) { if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
@ -744,7 +752,7 @@ public class JobContainer extends AbstractContainer {
private List<Configuration> doWriterSplit(int readerTaskNumber) { private List<Configuration> doWriterSplit(int readerTaskNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
List<Configuration> writerSlicesConfigs = this.jobWriter List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber); .split(readerTaskNumber);
@ -938,7 +946,7 @@ public class JobContainer extends AbstractContainer {
private void postJobReader() { private void postJobReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); PluginType.READER, this.readerPluginName,this.jobId));
LOG.info("DataX Reader.Job [{}] do post work.", LOG.info("DataX Reader.Job [{}] do post work.",
this.readerPluginName); this.readerPluginName);
this.jobReader.post(); this.jobReader.post();
@ -947,7 +955,7 @@ public class JobContainer extends AbstractContainer {
private void postJobWriter() { private void postJobWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName)); PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info("DataX Writer.Job [{}] do post work.", LOG.info("DataX Writer.Job [{}] do post work.",
this.writerPluginName); this.writerPluginName);
this.jobWriter.post(); this.jobWriter.post();

View File

@ -40,6 +40,8 @@ public class Communication extends BaseObject implements Cloneable {
*/ */
Map<String, List<String>> message; Map<String, List<String>> message;
private Long jobId;
public Communication() { public Communication() {
this.init(); this.init();
} }
@ -277,5 +279,12 @@ public class Communication extends BaseObject implements Cloneable {
return this.state == State.SUCCEEDED || this.state == State.FAILED return this.state == State.SUCCEEDED || this.state == State.FAILED
|| this.state == State.KILLED; || this.state == State.KILLED;
} }
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
} }

View File

@ -16,13 +16,20 @@ public final class LocalTGCommunicationManager {
taskGroupCommunicationMap.put(taskGroupId, communication); taskGroupCommunicationMap.put(taskGroupId, communication);
} }
public static Communication getJobCommunication() { public static Communication getJobCommunication(Long jobId) {
Communication communication = new Communication(); Communication communication = new Communication();
communication.setState(State.SUCCEEDED); communication.setState(State.SUCCEEDED);
for (Communication taskGroupCommunication : for (Communication taskGroupCommunication :
taskGroupCommunicationMap.values()) { taskGroupCommunicationMap.values()) {
communication.mergeFrom(taskGroupCommunication); if(taskGroupCommunication.getJobId()==null){
communication.mergeFrom(taskGroupCommunication);
}
if(taskGroupCommunication.getJobId()==null
||jobId.equals(taskGroupCommunication.getJobId())){ //如JOB在正式启动后过段时间才会设置JobId所以这里把getJobId为空的也合并进去
communication.mergeFrom(taskGroupCommunication); //因为如果为空就说明里面啥都没有合并了也不会影响什么
}
} }
return communication; return communication;

View File

@ -11,7 +11,13 @@ public class ProcessInnerCollector extends AbstractCollector {
@Override @Override
public Communication collectFromTaskGroup() { public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication(); /**
* 在整合到JAVA中启动时会使多个Jobcontiner 使用同一个LocalTGCommunicationManager.taskGroupCommunicationMap
* taskGroupCommunicationMap中存放当前Job的分组情况一般KEY从0开始
* 因为是静态的共享的每个JobContiner的taskgroup都是从0开始这样就到导致新的jobcontiner的收集器会收集的老的jobcontiner中的统计信息
* 这里通过传指定的JobId来区分统计信息
*/
return LocalTGCommunicationManager.getJobCommunication(this.getJobId());
} }
} }

View File

@ -13,6 +13,7 @@ public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommun
@Override @Override
public void report(Communication communication) { public void report(Communication communication) {
communication.setJobId(super.jobId);//给当前
super.getReporter().reportTGCommunication(super.taskGroupId, communication); super.getReporter().reportTGCommunication(super.taskGroupId, communication);
} }

View File

@ -12,6 +12,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.AbstractContainer; import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool; import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.communication.LocalTGCommunicationManager;
import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator; import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.task.AbstractTaskPluginCollector; import com.alibaba.datax.core.statistics.plugin.task.AbstractTaskPluginCollector;
import com.alibaba.datax.core.taskgroup.runner.AbstractRunner; import com.alibaba.datax.core.taskgroup.runner.AbstractRunner;
@ -118,7 +119,7 @@ public class TaskGroupContainer extends AbstractContainer {
CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000); CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000); long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
List<Configuration> taskConfigs = this.configuration List<Configuration> taskConfigs = this.configuration
.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT); .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
@ -126,12 +127,12 @@ public class TaskGroupContainer extends AbstractContainer {
LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId, LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,
JSON.toJSONString(taskConfigs)); JSON.toJSONString(taskConfigs));
} }
int taskCountInThisTaskGroup = taskConfigs.size(); int taskCountInThisTaskGroup = taskConfigs.size();
LOG.info(String.format( LOG.info(String.format(
"taskGroupId=[%d] start [%d] channels for [%d] tasks.", "taskGroupId=[%d] start [%d] channels for [%d] tasks.",
this.taskGroupId, channelNumber, taskCountInThisTaskGroup)); this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
this.containerCommunicator.registerCommunication(taskConfigs); this.containerCommunicator.registerCommunication(taskConfigs);
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置 Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
@ -144,12 +145,12 @@ public class TaskGroupContainer extends AbstractContainer {
Communication lastTaskGroupContainerCommunication = new Communication(); Communication lastTaskGroupContainerCommunication = new Communication();
while (true) { while (true) {
//1.判断task状态 //1.判断task状态
boolean failedOrKilled = false; boolean failedOrKilled = false;
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap(); Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){ for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
Integer taskId = entry.getKey(); Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue(); Communication taskCommunication = entry.getValue();
if(!taskCommunication.isFinished()){ if(!taskCommunication.isFinished()){
continue; continue;
} }
@ -159,21 +160,21 @@ public class TaskGroupContainer extends AbstractContainer {
taskMonitor.removeTask(taskId); taskMonitor.removeTask(taskId);
//失败看task是否支持failover重试次数未超过最大限制 //失败看task是否支持failover重试次数未超过最大限制
if(taskCommunication.getState() == State.FAILED){ if(taskCommunication.getState() == State.FAILED){
taskFailedExecutorMap.put(taskId, taskExecutor); taskFailedExecutorMap.put(taskId, taskExecutor);
if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){ if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
taskExecutor.shutdown(); //关闭老的executor taskExecutor.shutdown(); //关闭老的executor
containerCommunicator.resetCommunication(taskId); //将task的状态重置 containerCommunicator.resetCommunication(taskId); //将task的状态重置
Configuration taskConfig = taskConfigMap.get(taskId); Configuration taskConfig = taskConfigMap.get(taskId);
taskQueue.add(taskConfig); //重新加入任务列表 taskQueue.add(taskConfig); //重新加入任务列表
}else{ }else{
failedOrKilled = true; failedOrKilled = true;
break; break;
} }
}else if(taskCommunication.getState() == State.KILLED){ }else if(taskCommunication.getState() == State.KILLED){
failedOrKilled = true; failedOrKilled = true;
break; break;
}else if(taskCommunication.getState() == State.SUCCEEDED){ }else if(taskCommunication.getState() == State.SUCCEEDED){
Long taskStartTime = taskStartTimeMap.get(taskId); Long taskStartTime = taskStartTimeMap.get(taskId);
if(taskStartTime != null){ if(taskStartTime != null){
Long usedTime = System.currentTimeMillis() - taskStartTime; Long usedTime = System.currentTimeMillis() - taskStartTime;
@ -185,8 +186,8 @@ public class TaskGroupContainer extends AbstractContainer {
taskConfigMap.remove(taskId); taskConfigMap.remove(taskId);
} }
} }
} }
// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误 // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
if (failedOrKilled) { if (failedOrKilled) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
@ -195,7 +196,7 @@ public class TaskGroupContainer extends AbstractContainer {
throw DataXException.asDataXException( throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable()); FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
} }
//3.有任务未执行且正在运行的任务数小于最大通道限制 //3.有任务未执行且正在运行的任务数小于最大通道限制
Iterator<Configuration> iterator = taskQueue.iterator(); Iterator<Configuration> iterator = taskQueue.iterator();
while(iterator.hasNext() && runTasks.size() < channelNumber){ while(iterator.hasNext() && runTasks.size() < channelNumber){
@ -225,9 +226,9 @@ public class TaskGroupContainer extends AbstractContainer {
} }
} }
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig; Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis()); taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart(); taskExecutor.doStart();
iterator.remove(); iterator.remove();
runTasks.add(taskExecutor); runTasks.add(taskExecutor);
@ -242,10 +243,9 @@ public class TaskGroupContainer extends AbstractContainer {
//4.任务列表为空executor已结束, 搜集状态为success--->成功 //4.任务列表为空executor已结束, 搜集状态为success--->成功
if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) { if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
// 成功的情况下也需要汇报一次否则在任务结束非常快的情况下采集的信息将会不准确 // 成功的情况下也需要汇报一次否则在任务结束非常快的情况下采集的信息将会不准确
lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId); LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
break; break;
} }
@ -293,46 +293,47 @@ public class TaskGroupContainer extends AbstractContainer {
} }
LOG.info(PerfTrace.getInstance().summarizeNoException()); LOG.info(PerfTrace.getInstance().summarizeNoException());
this.removeTaskGroup();//移除指定JobId中的统计Map
} }
} }
} }
private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations){ private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> configurations){
Map<Integer, Configuration> map = new HashMap<Integer, Configuration>(); Map<Integer, Configuration> map = new HashMap<Integer, Configuration>();
for(Configuration taskConfig : configurations){ for(Configuration taskConfig : configurations){
int taskId = taskConfig.getInt(CoreConstant.TASK_ID); int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
map.put(taskId, taskConfig); map.put(taskId, taskConfig);
} }
return map; return map;
} }
private List<Configuration> buildRemainTasks(List<Configuration> configurations){ private List<Configuration> buildRemainTasks(List<Configuration> configurations){
List<Configuration> remainTasks = new LinkedList<Configuration>(); List<Configuration> remainTasks = new LinkedList<Configuration>();
for(Configuration taskConfig : configurations){ for(Configuration taskConfig : configurations){
remainTasks.add(taskConfig); remainTasks.add(taskConfig);
} }
return remainTasks; return remainTasks;
} }
private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId){ private TaskExecutor removeTask(List<TaskExecutor> taskList, int taskId){
Iterator<TaskExecutor> iterator = taskList.iterator(); Iterator<TaskExecutor> iterator = taskList.iterator();
while(iterator.hasNext()){ while(iterator.hasNext()){
TaskExecutor taskExecutor = iterator.next(); TaskExecutor taskExecutor = iterator.next();
if(taskExecutor.getTaskId() == taskId){ if(taskExecutor.getTaskId() == taskId){
iterator.remove(); iterator.remove();
return taskExecutor; return taskExecutor;
} }
} }
return null; return null;
} }
private boolean isAllTaskDone(List<TaskExecutor> taskList){ private boolean isAllTaskDone(List<TaskExecutor> taskList){
for(TaskExecutor taskExecutor : taskList){ for(TaskExecutor taskExecutor : taskList){
if(!taskExecutor.isTaskFinished()){ if(!taskExecutor.isTaskFinished()){
return false; return false;
} }
} }
return true; return true;
} }
private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){ private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){
@ -365,9 +366,9 @@ public class TaskGroupContainer extends AbstractContainer {
private Thread readerThread; private Thread readerThread;
private Thread writerThread; private Thread writerThread;
private ReaderRunner readerRunner; private ReaderRunner readerRunner;
private WriterRunner writerRunner; private WriterRunner writerRunner;
/** /**
@ -417,7 +418,7 @@ public class TaskGroupContainer extends AbstractContainer {
//通过设置thread的contextClassLoader即可实现同步和主程序不通的加载器 //通过设置thread的contextClassLoader即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader( this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString( PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME))); CoreConstant.JOB_WRITER_NAME),getJobId()));
/** /**
* 生成readerThread * 生成readerThread
@ -431,7 +432,7 @@ public class TaskGroupContainer extends AbstractContainer {
*/ */
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader( this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString( PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME))); CoreConstant.JOB_READER_NAME),getJobId()));
} }
public void doStart() { public void doStart() {
@ -468,7 +469,7 @@ public class TaskGroupContainer extends AbstractContainer {
switch (pluginType) { switch (pluginType) {
case READER: case READER:
newRunner = LoadUtil.loadPluginRunner(pluginType, newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME)); this.taskConfig.getString(CoreConstant.JOB_READER_NAME),getJobId());
newRunner.setJobConf(this.taskConfig.getConfiguration( newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER)); CoreConstant.JOB_READER_PARAMETER));
@ -493,7 +494,7 @@ public class TaskGroupContainer extends AbstractContainer {
break; break;
case WRITER: case WRITER:
newRunner = LoadUtil.loadPluginRunner(pluginType, newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)); this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME),getJobId());
newRunner.setJobConf(this.taskConfig newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER)); .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
@ -527,14 +528,14 @@ public class TaskGroupContainer extends AbstractContainer {
} }
if(taskCommunication==null || !taskCommunication.isFinished()){ if(taskCommunication==null || !taskCommunication.isFinished()){
return false; return false;
} }
return true; return true;
} }
private int getTaskId(){ private int getTaskId(){
return taskId; return taskId;
} }
private long getTimeStamp(){ private long getTimeStamp(){
@ -544,9 +545,9 @@ public class TaskGroupContainer extends AbstractContainer {
private int getAttemptCount(){ private int getAttemptCount(){
return attemptCount; return attemptCount;
} }
private boolean supportFailOver(){ private boolean supportFailOver(){
return writerRunner.supportFailOver(); return writerRunner.supportFailOver();
} }
private void shutdown(){ private void shutdown(){
@ -564,4 +565,26 @@ public class TaskGroupContainer extends AbstractContainer {
return !readerThread.isAlive() && !writerThread.isAlive(); return !readerThread.isAlive() && !writerThread.isAlive();
} }
} }
public void removeTaskGroup(){
try {
/**
* 移除根据JOBID做的一些标记 防止内存溢出
*/
LoadUtil.getConfigurationSet().remove(this.jobId);
Iterator<Map.Entry<Integer, Communication>> it =
LocalTGCommunicationManager.getTaskGroupCommunicationMap().entrySet().iterator();
while(it.hasNext()){
Map.Entry<Integer, Communication> entry = it.next();
String strJobId = String.valueOf(this.jobId);
String key = String.valueOf(entry.getKey());
if (key.startsWith(strJobId)) {
it.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
} }

View File

@ -148,6 +148,8 @@ public class CoreConstant {
public static final String CURRENT_SERVICE_USERNAME = "current.service.username"; public static final String CURRENT_SERVICE_USERNAME = "current.service.username";
public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; public static final String CURRENT_SERVICE_PASSWORD = "current.service.password";
public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID = "job.content[0].writer.parameter.jobid";
public static final String DATAX_JOB_CONTENT_READER_PARAMETER_JOBID = "job.content[0].reader.parameter.jobid";
// ----------------------------- 环境变量 --------------------------------- // ----------------------------- 环境变量 ---------------------------------

View File

@ -12,8 +12,8 @@ import com.alibaba.datax.core.taskgroup.runner.WriterRunner;
import com.alibaba.datax.core.util.FrameworkErrorCode; import com.alibaba.datax.core.util.FrameworkErrorCode;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.HashMap; import java.util.*;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
/** /**
* Created by jingxing on 14-8-24. * Created by jingxing on 14-8-24.
@ -46,6 +46,10 @@ public class LoadUtil {
*/ */
private static Configuration pluginRegisterCenter; private static Configuration pluginRegisterCenter;
private final static Map <Long ,Configuration> configurationSet = new ConcurrentHashMap<Long, Configuration>();
public static Map getConfigurationSet(){
return configurationSet;
}
/** /**
* jarLoader的缓冲 * jarLoader的缓冲
*/ */
@ -56,10 +60,16 @@ public class LoadUtil {
* *
* @param pluginConfigs * @param pluginConfigs
*/ */
public static void bind(Configuration pluginConfigs) { public static synchronized void bind(Configuration pluginConfigs) {
pluginRegisterCenter = pluginConfigs; pluginRegisterCenter = pluginConfigs;
Long jobId = pluginConfigs.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
configurationSet.put(jobId,pluginConfigs);
} }
private static String generatePluginKey(PluginType pluginType, private static String generatePluginKey(PluginType pluginType,
String pluginName) { String pluginName) {
return String.format(pluginTypeNameFormat, pluginType.toString(), return String.format(pluginTypeNameFormat, pluginType.toString(),
@ -67,10 +77,12 @@ public class LoadUtil {
} }
private static Configuration getPluginConf(PluginType pluginType, private static Configuration getPluginConf(PluginType pluginType,
String pluginName) { String pluginName,Long jobId) {
Configuration pluginConf = pluginRegisterCenter // Configuration pluginConf = pluginRegisterCenter
// .getConfiguration(generatePluginKey(pluginType, pluginName));
Configuration pluginConf
= configurationSet.get(jobId)
.getConfiguration(generatePluginKey(pluginType, pluginName)); .getConfiguration(generatePluginKey(pluginType, pluginName));
if (null == pluginConf) { if (null == pluginConf) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INSTALL_ERROR, FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
@ -89,14 +101,14 @@ public class LoadUtil {
* @return * @return
*/ */
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) { String pluginName,Long jobId) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass( Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job); pluginType, pluginName, ContainerType.Job,jobId);
try { try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance(); .newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName)); jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId));
return jobPlugin; return jobPlugin;
} catch (Exception e) { } catch (Exception e) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
@ -114,14 +126,14 @@ public class LoadUtil {
* @return * @return
*/ */
public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,
String pluginName) { String pluginName,Long jobId) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass( Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Task); pluginType, pluginName, ContainerType.Task,jobId);
try { try {
AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz
.newInstance(); .newInstance();
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName)); taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId));
return taskPlugin; return taskPlugin;
} catch (Exception e) { } catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
@ -137,9 +149,9 @@ public class LoadUtil {
* @param pluginName * @param pluginName
* @return * @return
*/ */
public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) { public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName,Long jobId) {
AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType, AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
pluginName); pluginName,jobId);
switch (pluginType) { switch (pluginType) {
case READER: case READER:
@ -165,9 +177,9 @@ public class LoadUtil {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static synchronized Class<? extends AbstractPlugin> loadPluginClass( private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName, PluginType pluginType, String pluginName,
ContainerType pluginRunType) { ContainerType pluginRunType,Long jobId) {
Configuration pluginConf = getPluginConf(pluginType, pluginName); Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName); JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName,jobId);
try { try {
return (Class<? extends AbstractPlugin>) jarLoader return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$" .loadClass(pluginConf.getString("class") + "$"
@ -178,8 +190,8 @@ public class LoadUtil {
} }
public static synchronized JarLoader getJarLoader(PluginType pluginType, public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) { String pluginName,Long jobId) {
Configuration pluginConf = getPluginConf(pluginType, pluginName); Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName)); pluginName));