diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java index 56c4f1ad..6ed15e40 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -6,7 +6,7 @@ import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Key; -import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil; @@ -29,7 +29,7 @@ public class ReaderJob extends CommonRdbmsReader.Job { ObReaderUtils.escapeDatabaseKeywords(columns); originalConfig.set(Key.COLUMN, columns); - List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); + List conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class); for (int i = 0; i < conns.size(); i++) { JSONObject conn = conns.get(i); Configuration connConfig = Configuration.from(conn.toString()); @@ -38,7 +38,7 @@ public class ReaderJob extends CommonRdbmsReader.Job { // tables will be null when querySql is configured if (tables != null) { ObReaderUtils.escapeDatabaseKeywords(tables); - originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), + originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), tables); } } @@ -48,7 +48,8 @@ public class ReaderJob extends CommonRdbmsReader.Job { @Override public List split(Configuration originalConfig, int adviceNumber) { List list; - if (originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) { + // readByPartition is lower priority than splitPk + if (!isSplitPkValid(originalConfig) && originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) { LOG.info("try to split reader job by partition."); list = PartitionSplitUtil.splitByPartition(originalConfig); } else { @@ -65,9 +66,15 @@ public class ReaderJob extends CommonRdbmsReader.Job { return list; } + private boolean isSplitPkValid(Configuration originalConfig) { + String splitPk = originalConfig.getString(Key.SPLIT_PK); + return splitPk != null && splitPk.trim().length() > 0; + } + private String getObRegionName(String jdbcUrl) { - if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { - String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); + final String obJdbcDelimiter = com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING; + if (jdbcUrl.startsWith(obJdbcDelimiter)) { + String[] ss = jdbcUrl.split(obJdbcDelimiter); if (ss.length >= 2) { String tenant = ss[1].trim(); String[] sss = tenant.split(":");