readByPartition is lower priority than splitPk

This commit is contained in:
johnrobbet 2022-03-08 15:57:27 +08:00
parent 8d9e936047
commit da9134bda4

View File

@ -6,7 +6,7 @@ import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil;
@ -29,7 +29,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
ObReaderUtils.escapeDatabaseKeywords(columns); ObReaderUtils.escapeDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns); originalConfig.set(Key.COLUMN, columns);
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); List<JSONObject> conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
for (int i = 0; i < conns.size(); i++) { for (int i = 0; i < conns.size(); i++) {
JSONObject conn = conns.get(i); JSONObject conn = conns.get(i);
Configuration connConfig = Configuration.from(conn.toString()); Configuration connConfig = Configuration.from(conn.toString());
@ -38,7 +38,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
// tables will be null when querySql is configured // tables will be null when querySql is configured
if (tables != null) { if (tables != null) {
ObReaderUtils.escapeDatabaseKeywords(tables); ObReaderUtils.escapeDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE),
tables); tables);
} }
} }
@ -48,7 +48,8 @@ public class ReaderJob extends CommonRdbmsReader.Job {
@Override @Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) { public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list; List<Configuration> list;
if (originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) { // readByPartition is lower priority than splitPk
if (!isSplitPkValid(originalConfig) && originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) {
LOG.info("try to split reader job by partition."); LOG.info("try to split reader job by partition.");
list = PartitionSplitUtil.splitByPartition(originalConfig); list = PartitionSplitUtil.splitByPartition(originalConfig);
} else { } else {
@ -65,9 +66,15 @@ public class ReaderJob extends CommonRdbmsReader.Job {
return list; return list;
} }
private boolean isSplitPkValid(Configuration originalConfig) {
String splitPk = originalConfig.getString(Key.SPLIT_PK);
return splitPk != null && splitPk.trim().length() > 0;
}
private String getObRegionName(String jdbcUrl) { private String getObRegionName(String jdbcUrl) {
if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { final String obJdbcDelimiter = com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING;
String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); if (jdbcUrl.startsWith(obJdbcDelimiter)) {
String[] ss = jdbcUrl.split(obJdbcDelimiter);
if (ss.length >= 2) { if (ss.length >= 2) {
String tenant = ss[1].trim(); String tenant = ss[1].trim();
String[] sss = tenant.split(":"); String[] sss = tenant.split(":");