解决多JobContiner在同一个JVM下时插件路径找不到BUG

以下是车祸现场
  /**
     * 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
     * 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
     */
    private static Configuration pluginRegisterCenter;

         */
        public static void bind(Configuration pluginConfigs) {
            pluginRegisterCenter = pluginConfigs;
        }
This commit is contained in:
李珂 2019-06-04 14:34:30 +08:00
parent b4bdc6d0e7
commit 9b5ed99a0c
4 changed files with 160 additions and 115 deletions

View File

@ -35,7 +35,9 @@ import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by jingxing on 14-8-24.
@ -216,10 +218,10 @@ public class JobContainer extends AbstractContainer {
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
PluginType.READER, this.readerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true);
@ -229,7 +231,7 @@ public class JobContainer extends AbstractContainer {
// 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER+".jobid",this.jobId);
jobReader.setJobPluginCollector(jobPluginCollector);
classLoaderSwapper.restoreCurrentThreadClassLoader();
@ -241,10 +243,10 @@ public class JobContainer extends AbstractContainer {
this.writerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
PluginType.WRITER, this.writerPluginName);
PluginType.WRITER, this.writerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true);
@ -265,7 +267,7 @@ public class JobContainer extends AbstractContainer {
private void preCheckReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .",
this.readerPluginName));
this.jobReader.preCheck();
@ -274,7 +276,7 @@ public class JobContainer extends AbstractContainer {
private void preCheckWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .",
this.writerPluginName));
this.jobWriter.preCheck();
@ -328,10 +330,10 @@ public class JobContainer extends AbstractContainer {
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
handlerPluginType, handlerPluginName));
handlerPluginType, handlerPluginName,this.jobId));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName);
handlerPluginType, handlerPluginName,this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
@ -364,10 +366,10 @@ public class JobContainer extends AbstractContainer {
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
handlerPluginType, handlerPluginName));
handlerPluginType, handlerPluginName,this.jobId));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName);
handlerPluginType, handlerPluginName,this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
@ -602,8 +604,16 @@ public class JobContainer extends AbstractContainer {
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);
super.getContainerCommunicator().report(reportCommunication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
Map<String,Object> log = new HashMap<String,Object>();
log.put("startTimeStamp",startTimeStamp);
log.put("endTimeStamp",endTimeStamp);
log.put("totalCosts",totalCosts);
log.put("byteSpeedPerSecond",byteSpeedPerSecond);
log.put("recordSpeedPerSecond",recordSpeedPerSecond);
log.put("communication",communication);
//TODO 记录日志
LOG.info(String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
@ -656,11 +666,11 @@ public class JobContainer extends AbstractContainer {
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
PluginType.READER, this.readerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER_JOBID,this.jobId);
// 设置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
@ -668,7 +678,6 @@ public class JobContainer extends AbstractContainer {
// 设置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();
@ -686,15 +695,14 @@ public class JobContainer extends AbstractContainer {
this.writerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
PluginType.WRITER, this.writerPluginName);
PluginType.WRITER, this.writerPluginName,this.jobId);
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID,this.jobId);
// 设置writer的jobConfig
jobWriter.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
// 设置reader的readerConfig
jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
@ -709,7 +717,7 @@ public class JobContainer extends AbstractContainer {
private void prepareJobReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
LOG.info(String.format("DataX Reader.Job [%s] do prepare work .",
this.readerPluginName));
this.jobReader.prepare();
@ -718,7 +726,7 @@ public class JobContainer extends AbstractContainer {
private void prepareJobWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info(String.format("DataX Writer.Job [%s] do prepare work .",
this.writerPluginName));
this.jobWriter.prepare();
@ -728,7 +736,7 @@ public class JobContainer extends AbstractContainer {
// TODO: 如果源头就是空数据
private List<Configuration> doReaderSplit(int adviceNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
@ -744,7 +752,7 @@ public class JobContainer extends AbstractContainer {
private List<Configuration> doWriterSplit(int readerTaskNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
@ -938,7 +946,7 @@ public class JobContainer extends AbstractContainer {
private void postJobReader() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
PluginType.READER, this.readerPluginName,this.jobId));
LOG.info("DataX Reader.Job [{}] do post work.",
this.readerPluginName);
this.jobReader.post();
@ -947,7 +955,7 @@ public class JobContainer extends AbstractContainer {
private void postJobWriter() {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
PluginType.WRITER, this.writerPluginName,this.jobId));
LOG.info("DataX Writer.Job [{}] do post work.",
this.writerPluginName);
this.jobWriter.post();

View File

@ -12,6 +12,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.communication.LocalTGCommunicationManager;
import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.task.AbstractTaskPluginCollector;
import com.alibaba.datax.core.taskgroup.runner.AbstractRunner;
@ -245,7 +246,6 @@ public class TaskGroupContainer extends AbstractContainer {
// 成功的情况下也需要汇报一次否则在任务结束非常快的情况下采集的信息将会不准确
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
break;
}
@ -293,6 +293,7 @@ public class TaskGroupContainer extends AbstractContainer {
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
this.removeTaskGroup();//移除指定JobId中的统计Map
}
}
}
@ -417,7 +418,7 @@ public class TaskGroupContainer extends AbstractContainer {
//通过设置thread的contextClassLoader即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));
CoreConstant.JOB_WRITER_NAME),getJobId()));
/**
* 生成readerThread
@ -431,7 +432,7 @@ public class TaskGroupContainer extends AbstractContainer {
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));
CoreConstant.JOB_READER_NAME),getJobId()));
}
public void doStart() {
@ -468,7 +469,7 @@ public class TaskGroupContainer extends AbstractContainer {
switch (pluginType) {
case READER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
this.taskConfig.getString(CoreConstant.JOB_READER_NAME),getJobId());
newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER));
@ -493,7 +494,7 @@ public class TaskGroupContainer extends AbstractContainer {
break;
case WRITER:
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME),getJobId());
newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
@ -564,4 +565,26 @@ public class TaskGroupContainer extends AbstractContainer {
return !readerThread.isAlive() && !writerThread.isAlive();
}
}
public void removeTaskGroup(){
try {
/**
* 移除根据JOBID做的一些标记 防止内存溢出
*/
LoadUtil.getConfigurationSet().remove(this.jobId);
Iterator<Map.Entry<Integer, Communication>> it =
LocalTGCommunicationManager.getTaskGroupCommunicationMap().entrySet().iterator();
while(it.hasNext()){
Map.Entry<Integer, Communication> entry = it.next();
String strJobId = String.valueOf(this.jobId);
String key = String.valueOf(entry.getKey());
if (key.startsWith(strJobId)) {
it.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -148,6 +148,8 @@ public class CoreConstant {
public static final String CURRENT_SERVICE_USERNAME = "current.service.username";
public static final String CURRENT_SERVICE_PASSWORD = "current.service.password";
public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER_JOBID = "job.content[0].writer.parameter.jobid";
public static final String DATAX_JOB_CONTENT_READER_PARAMETER_JOBID = "job.content[0].reader.parameter.jobid";
// ----------------------------- 环境变量 ---------------------------------

View File

@ -12,8 +12,8 @@ import com.alibaba.datax.core.taskgroup.runner.WriterRunner;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by jingxing on 14-8-24.
@ -46,6 +46,10 @@ public class LoadUtil {
*/
private static Configuration pluginRegisterCenter;
private final static Map <Long ,Configuration> configurationSet = new ConcurrentHashMap<Long, Configuration>();
public static Map getConfigurationSet(){
return configurationSet;
}
/**
* jarLoader的缓冲
*/
@ -56,10 +60,16 @@ public class LoadUtil {
*
* @param pluginConfigs
*/
public static void bind(Configuration pluginConfigs) {
public static synchronized void bind(Configuration pluginConfigs) {
pluginRegisterCenter = pluginConfigs;
Long jobId = pluginConfigs.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
configurationSet.put(jobId,pluginConfigs);
}
private static String generatePluginKey(PluginType pluginType,
String pluginName) {
return String.format(pluginTypeNameFormat, pluginType.toString(),
@ -67,10 +77,12 @@ public class LoadUtil {
}
private static Configuration getPluginConf(PluginType pluginType,
String pluginName) {
Configuration pluginConf = pluginRegisterCenter
String pluginName,Long jobId) {
// Configuration pluginConf = pluginRegisterCenter
// .getConfiguration(generatePluginKey(pluginType, pluginName));
Configuration pluginConf
= configurationSet.get(jobId)
.getConfiguration(generatePluginKey(pluginType, pluginName));
if (null == pluginConf) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
@ -89,14 +101,14 @@ public class LoadUtil {
* @return
*/
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) {
String pluginName,Long jobId) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);
pluginType, pluginName, ContainerType.Job,jobId);
try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId));
return jobPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(
@ -114,14 +126,14 @@ public class LoadUtil {
* @return
*/
public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType,
String pluginName) {
String pluginName,Long jobId) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Task);
pluginType, pluginName, ContainerType.Task,jobId);
try {
AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clazz
.newInstance();
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName,jobId));
return taskPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
@ -137,9 +149,9 @@ public class LoadUtil {
* @param pluginName
* @return
*/
public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {
public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName,Long jobId) {
AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
pluginName);
pluginName,jobId);
switch (pluginType) {
case READER:
@ -165,9 +177,9 @@ public class LoadUtil {
@SuppressWarnings("unchecked")
private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
ContainerType pluginRunType,Long jobId) {
Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName,jobId);
try {
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
@ -178,8 +190,8 @@ public class LoadUtil {
}
public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
String pluginName,Long jobId) {
Configuration pluginConf = getPluginConf(pluginType, pluginName,jobId);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));