This commit is contained in:
mengxiaopeng 2025-04-10 16:20:56 +08:00 committed by GitHub
commit dead3ce52d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 40 additions and 5 deletions

View File

@ -187,6 +187,15 @@ DRDS的插件目前DataX只适配了Mysql引擎的场景DRDS对于DataX而言
* 描述暂时不支持配置querySql模式 <br /> * 描述暂时不支持配置querySql模式 <br />
* **networkTimeout**
* 描述当同步DRDS时如果数据量过大可能会超过数据库设置的DRDS的net_write_timeout,导致任务失败 目前默认
是8h 可根据实际数据量进行设置。
* 必选:否 <br />
* 默认值28800000 <br />
### 3.3 类型转换 ### 3.3 类型转换

View File

@ -8,6 +8,7 @@ import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.exception.ExceptionTracker;
import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord; import com.alibaba.datax.common.statistics.PerfRecord;
@ -27,10 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.*;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -131,6 +129,11 @@ public class CommonRdbmsReader {
// 作为日志显示信息时需要附带的通用信息比如信息所对应的数据库连接等信息针对哪个表做的操作 // 作为日志显示信息时需要附带的通用信息比如信息所对应的数据库连接等信息针对哪个表做的操作
private String basicMsg; private String basicMsg;
// 目前
private Integer netWorkTimeOut;
private ExecutorService executorService;
public Task(DataBaseType dataBaseType) { public Task(DataBaseType dataBaseType) {
this(dataBaseType, -1, -1); this(dataBaseType, -1, -1);
} }
@ -148,6 +151,7 @@ public class CommonRdbmsReader {
this.username = readerSliceConfig.getString(Key.USERNAME); this.username = readerSliceConfig.getString(Key.USERNAME);
this.password = readerSliceConfig.getString(Key.PASSWORD); this.password = readerSliceConfig.getString(Key.PASSWORD);
this.jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL); this.jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL);
this.netWorkTimeOut=readerSliceConfig.getInt(Key.NETWORK_TIMEOUT,Constant.NETWORK_TIMEOUT_DEFAULT);
//ob10的处理 //ob10的处理
if (this.jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) { if (this.jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) {
@ -167,6 +171,8 @@ public class CommonRdbmsReader {
basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl); basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl);
executorService = Executors.newSingleThreadExecutor();
} }
public void startRead(Configuration readerSliceConfig, public void startRead(Configuration readerSliceConfig,
@ -189,6 +195,18 @@ public class CommonRdbmsReader {
DBUtil.dealWithSessionConfig(conn, readerSliceConfig, DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg); this.dataBaseType, basicMsg);
//deal with drds big data error net_write_timeout
if (DataBaseType.DRDS.equals(dataBaseType)){
LOG.info("start DRDS setNetWorkTimeOut...");
try {
conn.setNetworkTimeout(executorService, netWorkTimeOut);
LOG.info("end DRDS setNetWorkTimeOut...");
} catch (SQLException throwables) {
LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
LOG.warn(ExceptionTracker.trace(throwables));
}
}
int columnNumber = 0; int columnNumber = 0;
ResultSet rs = null; ResultSet rs = null;
try { try {
@ -219,6 +237,7 @@ public class CommonRdbmsReader {
}catch (Exception e) { }catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally { } finally {
executorService.shutdown();
DBUtil.closeDBResources(null, conn); DBUtil.closeDBResources(null, conn);
} }
} }
@ -228,7 +247,8 @@ public class CommonRdbmsReader {
} }
public void destroy(Configuration originalConfig) { public void destroy(Configuration originalConfig) {
// do nothing // 关闭连接池
executorService.shutdown();
} }
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,

View File

@ -27,4 +27,7 @@ public final class Constant {
public static Integer SPLIT_FACTOR = 5; public static Integer SPLIT_FACTOR = 5;
//增加优化参数
public final static Integer NETWORK_TIMEOUT_DEFAULT=28800000;
} }

View File

@ -55,4 +55,7 @@ public final class Key {
public final static String REUSE_CONN = "reuseConn"; public final static String REUSE_CONN = "reuseConn";
public final static String PARTITION_NAME = "partitionName"; public final static String PARTITION_NAME = "partitionName";
//增加优化参数
public final static String NETWORK_TIMEOUT="networkTimeout";
} }