[improve][plugin][DRDS]

1.update DRDS reader doc
  [+] Fix thread pool never close.
This commit is contained in:
mengxiaopeng 2023-08-14 14:46:33 +08:00
parent e87ecd27f3
commit 3e15d43a05

View File

@ -132,6 +132,8 @@ public class CommonRdbmsReader {
// 目前 // 目前
private Integer netWorkTimeOut; private Integer netWorkTimeOut;
private ExecutorService executorService;
public Task(DataBaseType dataBaseType) { public Task(DataBaseType dataBaseType) {
this(dataBaseType, -1, -1); this(dataBaseType, -1, -1);
} }
@ -169,6 +171,8 @@ public class CommonRdbmsReader {
basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl); basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl);
executorService = Executors.newSingleThreadExecutor();
} }
public void startRead(Configuration readerSliceConfig, public void startRead(Configuration readerSliceConfig,
@ -195,7 +199,7 @@ public class CommonRdbmsReader {
if (DataBaseType.DRDS.equals(dataBaseType)){ if (DataBaseType.DRDS.equals(dataBaseType)){
LOG.info("start DRDS setNetWorkTimeOut..."); LOG.info("start DRDS setNetWorkTimeOut...");
try { try {
conn.setNetworkTimeout(Executors.newFixedThreadPool(1), netWorkTimeOut); conn.setNetworkTimeout(executorService, netWorkTimeOut);
LOG.info("end DRDS setNetWorkTimeOut..."); LOG.info("end DRDS setNetWorkTimeOut...");
} catch (SQLException throwables) { } catch (SQLException throwables) {
LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]"); LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]");
@ -233,6 +237,7 @@ public class CommonRdbmsReader {
}catch (Exception e) { }catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally { } finally {
executorService.shutdown();
DBUtil.closeDBResources(null, conn); DBUtil.closeDBResources(null, conn);
} }
} }
@ -242,7 +247,8 @@ public class CommonRdbmsReader {
} }
public void destroy(Configuration originalConfig) { public void destroy(Configuration originalConfig) {
// do nothing // 关闭连接池
executorService.shutdown();
} }
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,