diff --git a/drdsreader/doc/drdsreader.md b/drdsreader/doc/drdsreader.md index c54e6bd1..8657aae1 100644 --- a/drdsreader/doc/drdsreader.md +++ b/drdsreader/doc/drdsreader.md @@ -187,6 +187,15 @@ DRDS的插件目前DataX只适配了Mysql引擎的场景,DRDS对于DataX而言 * 描述:暂时不支持配置querySql模式
+* **networkTimeout** + + * 描述:当同步DRDS时,如果数据量过大,可能会超过数据库设置的DRDS的net_write_timeout,导致任务失败 目前默认 + 是8h 可根据实际数据量进行设置。 + + * 必选:否
+ + * 默认值:28800000
+ ### 3.3 类型转换 diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f3180402..fbf3bf0c 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -8,6 +8,7 @@ import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.exception.ExceptionTracker; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.statistics.PerfRecord; @@ -27,10 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.Types; +import java.sql.*; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -131,6 +129,11 @@ public class CommonRdbmsReader { // 作为日志显示信息时,需要附带的通用信息。比如信息所对应的数据库连接等信息,针对哪个表做的操作 private String basicMsg; + // 目前 + private Integer netWorkTimeOut; + + private ExecutorService executorService; + public Task(DataBaseType dataBaseType) { this(dataBaseType, -1, -1); } @@ -148,6 +151,7 @@ public class CommonRdbmsReader { this.username = readerSliceConfig.getString(Key.USERNAME); this.password = readerSliceConfig.getString(Key.PASSWORD); this.jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL); + this.netWorkTimeOut=readerSliceConfig.getInt(Key.NETWORK_TIMEOUT,Constant.NETWORK_TIMEOUT_DEFAULT); //ob10的处理 if (this.jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) { @@ -167,6 +171,8 @@ public class CommonRdbmsReader { basicMsg = String.format("jdbcUrl:[%s]", this.jdbcUrl); + executorService = Executors.newSingleThreadExecutor(); + } public void startRead(Configuration readerSliceConfig, @@ -189,6 +195,18 @@ public class CommonRdbmsReader { DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, basicMsg); + //deal with drds big data error net_write_timeout + if (DataBaseType.DRDS.equals(dataBaseType)){ + LOG.info("start DRDS setNetWorkTimeOut..."); + try { + conn.setNetworkTimeout(executorService, netWorkTimeOut); + LOG.info("end DRDS setNetWorkTimeOut..."); + } catch (SQLException throwables) { + LOG.warn("setNetWorkTimeOut error ["+throwables.getMessage()+"]"); + LOG.warn(ExceptionTracker.trace(throwables)); + } + } + int columnNumber = 0; ResultSet rs = null; try { @@ -219,6 +237,7 @@ public class CommonRdbmsReader { }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally { + executorService.shutdown(); DBUtil.closeDBResources(null, conn); } } @@ -228,7 +247,8 @@ public class CommonRdbmsReader { } public void destroy(Configuration originalConfig) { - // do nothing + // 关闭连接池 + executorService.shutdown(); } protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java index f998357e..a897ac08 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java @@ -27,4 +27,7 @@ public final class Constant { public static Integer SPLIT_FACTOR = 5; + //增加优化参数 + public final static Integer NETWORK_TIMEOUT_DEFAULT=28800000; + } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java index 9f2939c4..d5d9899a 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java @@ -55,4 +55,7 @@ public final class Key { public final static String REUSE_CONN = "reuseConn"; public final static String PARTITION_NAME = "partitionName"; + + //增加优化参数 + public final static String NETWORK_TIMEOUT="networkTimeout"; }