From e26f82ad769e2a3c915fe704bcef2a89c545bf76 Mon Sep 17 00:00:00 2001
From: mengxiaopeng <602012854@qq.com>
Date: Wed, 12 Jul 2023 17:14:57 +0800
Subject: [PATCH 1/4] =?UTF-8?q?[improve][plugin][DRDS]=20The=20task=20of?=
=?UTF-8?q?=20DRDS-Reader=20Big=20data=20fails=20with=20the=20following=20?=
=?UTF-8?q?error=20=E3=80=90net=5Fwrite=5Ftimeout=E3=80=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../plugin/rdbms/reader/CommonRdbmsReader.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
index f3180402..81d5b7f2 100755
--- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
+++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
@@ -27,10 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Types;
+import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -189,6 +186,18 @@ public class CommonRdbmsReader {
DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
this.dataBaseType, basicMsg);
+ //deal with drds big data error net_write_timeout
+ if (DataBaseType.DRDS.equals(dataBaseType)){
+ LOG.info("start DRDS setNetWorkTimeOut...");
+ try {
+ conn.setNetworkTimeout(Executors.newFixedThreadPool(1), 172800000);
+ LOG.info("end DRDS setNetWorkTimeOut...");
+ } catch (SQLException throwables) {
+ LOG.error("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
+ throwables.printStackTrace();
+ }
+ }
+
int columnNumber = 0;
ResultSet rs = null;
try {
From d80ca30fea428b9432566105d4c9e2d3a0ea7f9d Mon Sep 17 00:00:00 2001
From: mengxiaopeng <602012854@qq.com>
Date: Tue, 18 Jul 2023 17:43:55 +0800
Subject: [PATCH 2/4] [improve][plugin][DRDS] 1.Make NetworkTimeout
configurable. 2.Let logger Print stack trace.
---
.../datax/plugin/rdbms/reader/CommonRdbmsReader.java | 11 ++++++++---
.../alibaba/datax/plugin/rdbms/reader/Constant.java | 3 +++
.../com/alibaba/datax/plugin/rdbms/reader/Key.java | 3 +++
3 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
index 81d5b7f2..5e383d80 100755
--- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
+++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
@@ -8,6 +8,7 @@ import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.exception.ExceptionTracker;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord;
@@ -128,6 +129,9 @@ public class CommonRdbmsReader {
// 作为日志显示信息时,需要附带的通用信息。比如信息所对应的数据库连接等信息,针对哪个表做的操作
private String basicMsg;
+ // 目前
+ private Integer netWorkTimeOut;
+
public Task(DataBaseType dataBaseType) {
this(dataBaseType, -1, -1);
}
@@ -145,6 +149,7 @@ public class CommonRdbmsReader {
this.username = readerSliceConfig.getString(Key.USERNAME);
this.password = readerSliceConfig.getString(Key.PASSWORD);
this.jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL);
+ this.netWorkTimeOut=readerSliceConfig.getInt(Key.NETWORK_TIMEOUT,Constant.NETWORK_TIMEOUT_DEFAULT);
//ob10的处理
if (this.jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) {
@@ -190,11 +195,11 @@ public class CommonRdbmsReader {
if (DataBaseType.DRDS.equals(dataBaseType)){
LOG.info("start DRDS setNetWorkTimeOut...");
try {
- conn.setNetworkTimeout(Executors.newFixedThreadPool(1), 172800000);
+ conn.setNetworkTimeout(Executors.newFixedThreadPool(1), netWorkTimeOut);
LOG.info("end DRDS setNetWorkTimeOut...");
} catch (SQLException throwables) {
- LOG.error("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
- throwables.printStackTrace();
+ LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
+ LOG.warn(ExceptionTracker.trace(throwables));
}
}
diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java
index f998357e..a897ac08 100755
--- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java
+++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java
@@ -27,4 +27,7 @@ public final class Constant {
public static Integer SPLIT_FACTOR = 5;
+ //增加优化参数
+ public final static Integer NETWORK_TIMEOUT_DEFAULT=28800000;
+
}
diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java
index 9f2939c4..d5d9899a 100755
--- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java
+++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java
@@ -55,4 +55,7 @@ public final class Key {
public final static String REUSE_CONN = "reuseConn";
public final static String PARTITION_NAME = "partitionName";
+
+ //增加优化参数
+ public final static String NETWORK_TIMEOUT="networkTimeout";
}
From e87ecd27f30fd542e23d6a3ba6734122a3d2b2e8 Mon Sep 17 00:00:00 2001
From: mengxiaopeng <602012854@qq.com>
Date: Tue, 18 Jul 2023 17:53:40 +0800
Subject: [PATCH 3/4] =?UTF-8?q?[improve][plugin][DRDS]=201.update=20DRDS?=
=?UTF-8?q?=20reader=20doc=20=20=20[+]=20=E5=A2=9E=E5=8A=A0networkTimeout?=
=?UTF-8?q?=E5=8F=82=E6=95=B0=E9=85=8D=E7=BD=AE=E8=AF=B4=E6=98=8E?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
drdsreader/doc/drdsreader.md | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/drdsreader/doc/drdsreader.md b/drdsreader/doc/drdsreader.md
index c54e6bd1..8657aae1 100644
--- a/drdsreader/doc/drdsreader.md
+++ b/drdsreader/doc/drdsreader.md
@@ -187,6 +187,15 @@ DRDS的插件目前DataX只适配了Mysql引擎的场景,DRDS对于DataX而言
* 描述:暂时不支持配置querySql模式
+* **networkTimeout**
+
+ * 描述:当同步DRDS时,如果数据量过大,可能会超过数据库设置的DRDS的net_write_timeout,导致任务失败 目前默认
+ 是8h 可根据实际数据量进行设置。
+
+ * 必选:否
+
+ * 默认值:28800000
+
### 3.3 类型转换
From 3e15d43a055e3650fb8b70252ab4d7f72ea210ca Mon Sep 17 00:00:00 2001
From: mengxiaopeng <602012854@qq.com>
Date: Mon, 14 Aug 2023 14:46:33 +0800
Subject: [PATCH 4/4] [improve][plugin][DRDS] 1.update DRDS reader doc [+]
Fix thread pool never close.
---
.../datax/plugin/rdbms/reader/CommonRdbmsReader.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
index 5e383d80..fbf3bf0c 100755
--- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
+++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java
@@ -132,6 +132,8 @@ public class CommonRdbmsReader {
// 目前
private Integer netWorkTimeOut;
+ private ExecutorService executorService;
+
public Task(DataBaseType dataBaseType) {
this(dataBaseType, -1, -1);
}
@@ -169,6 +171,8 @@ public class CommonRdbmsReader {
basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl);
+ executorService = Executors.newSingleThreadExecutor();
+
}
public void startRead(Configuration readerSliceConfig,
@@ -195,7 +199,7 @@ public class CommonRdbmsReader {
if (DataBaseType.DRDS.equals(dataBaseType)){
LOG.info("start DRDS setNetWorkTimeOut...");
try {
- conn.setNetworkTimeout(Executors.newFixedThreadPool(1), netWorkTimeOut);
+ conn.setNetworkTimeout(executorService, netWorkTimeOut);
LOG.info("end DRDS setNetWorkTimeOut...");
} catch (SQLException throwables) {
LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
@@ -233,6 +237,7 @@ public class CommonRdbmsReader {
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
+ executorService.shutdown();
DBUtil.closeDBResources(null, conn);
}
}
@@ -242,7 +247,8 @@ public class CommonRdbmsReader {
}
public void destroy(Configuration originalConfig) {
- // do nothing
+ // 关闭连接池
+ executorService.shutdown();
}
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,