From 3e15d43a055e3650fb8b70252ab4d7f72ea210ca Mon Sep 17 00:00:00 2001 From: mengxiaopeng <602012854@qq.com> Date: Mon, 14 Aug 2023 14:46:33 +0800 Subject: [PATCH] [improve][plugin][DRDS] 1.update DRDS reader doc [+] Fix thread pool never close. --- .../datax/plugin/rdbms/reader/CommonRdbmsReader.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index 5e383d80..fbf3bf0c 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -132,6 +132,8 @@ public class CommonRdbmsReader { // 目前 private Integer netWorkTimeOut; + private ExecutorService executorService; + public Task(DataBaseType dataBaseType) { this(dataBaseType, -1, -1); } @@ -169,6 +171,8 @@ public class CommonRdbmsReader { basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl); + executorService = Executors.newSingleThreadExecutor(); + } public void startRead(Configuration readerSliceConfig, @@ -195,7 +199,7 @@ public class CommonRdbmsReader { if (DataBaseType.DRDS.equals(dataBaseType)){ LOG.info("start DRDS setNetWorkTimeOut..."); try { - conn.setNetworkTimeout(Executors.newFixedThreadPool(1), netWorkTimeOut); + conn.setNetworkTimeout(executorService, netWorkTimeOut); LOG.info("end DRDS setNetWorkTimeOut..."); } catch (SQLException throwables) { LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]"); @@ -233,6 +237,7 @@ public class CommonRdbmsReader { }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally { + executorService.shutdown(); DBUtil.closeDBResources(null, conn); } } @@ -242,7 +247,8 @@ public class CommonRdbmsReader { } public void destroy(Configuration originalConfig) { - // do nothing + // 关闭连接池 + executorService.shutdown(); } protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,