Merge pull request #1318 from johnrobbet/obreader_read_by_partition

Obreader read by partition
This commit is contained in:
Trafalgar 2022-06-16 21:26:42 +08:00 committed by GitHub
commit 894b57bd3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 470 additions and 42 deletions

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader;
import java.sql.Connection; import java.sql.Connection;
import java.util.List; import java.util.List;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,6 +53,21 @@ public class OceanBaseReader extends Reader {
@Override @Override
public List<Configuration> split(int adviceNumber) { public List<Configuration> split(int adviceNumber) {
String splitPk = originalConfig.getString(Key.SPLIT_PK);
List<String> quotedColumns = originalConfig.getList(Key.COLUMN_LIST, String.class);
if (splitPk != null && splitPk.length() > 0 && quotedColumns != null) {
String escapeChar = ObReaderUtils.isOracleMode(originalConfig.getString(ObReaderKey.OB_COMPATIBILITY_MODE))
? "\"" : "`";
if (!splitPk.startsWith(escapeChar) && !splitPk.endsWith(escapeChar)) {
splitPk = escapeChar + splitPk + escapeChar;
}
for (String column : quotedColumns) {
if (column.equals(splitPk)) {
LOG.info("splitPk is an ob reserved keyword, set to {}", splitPk);
originalConfig.set(Key.SPLIT_PK, splitPk);
}
}
}
return this.readerJob.split(this.originalConfig, adviceNumber); return this.readerJob.split(this.originalConfig, adviceNumber);
} }
@ -86,6 +102,7 @@ public class OceanBaseReader extends Reader {
String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:"); String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:");
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password); Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
String compatibleMode = ObReaderUtils.getCompatibleMode(conn); String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
config.set(ObReaderKey.OB_COMPATIBILITY_MODE, compatibleMode);
if (ObReaderUtils.isOracleMode(compatibleMode)) { if (ObReaderUtils.isOracleMode(compatibleMode)) {
ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
} }

View File

@ -0,0 +1,11 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
/**
* @author johnrobbet
*/
public class Constant {
public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s ";
public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)";
}

View File

@ -0,0 +1,16 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
/**
* @author johnrobbet
*/
public class ObReaderKey {
public final static String READ_BY_PARTITION = "readByPartition";
public final static String PARTITION_NAME = "partitionName";
public final static String PARTITION_TYPE = "partitionType";
public final static String OB_COMPATIBILITY_MODE = "obCompatibilityMode";
}

View File

@ -1,15 +1,16 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
import java.util.Arrays;
import java.util.List; import java.util.List;
import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -29,37 +30,62 @@ public class ReaderJob extends CommonRdbmsReader.Job {
ObReaderUtils.escapeDatabaseKeywords(columns); ObReaderUtils.escapeDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns); originalConfig.set(Key.COLUMN, columns);
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); List<JSONObject> conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
for (int i = 0; i < conns.size(); i++) { for (int i = 0; i < conns.size(); i++) {
JSONObject conn = conns.get(i); JSONObject conn = conns.get(i);
Configuration connConfig = Configuration.from(conn.toString()); Configuration connConfig = Configuration.from(conn.toString());
List<String> tables = connConfig.getList(Key.TABLE, String.class); List<String> tables = connConfig.getList(Key.TABLE, String.class);
ObReaderUtils.escapeDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables); // tables will be null when querySql is configured
if (tables != null) {
ObReaderUtils.escapeDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE),
tables);
}
} }
super.init(originalConfig); super.init(originalConfig);
} }
@Override @Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) { public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list = super.split(originalConfig, adviceNumber); List<Configuration> list;
// readByPartition is lower priority than splitPk.
// and readByPartition only works in table mode.
if (!isSplitPkValid(originalConfig) &&
originalConfig.getBool(Constant.IS_TABLE_MODE) &&
originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) {
LOG.info("try to split reader job by partition.");
list = PartitionSplitUtil.splitByPartition(originalConfig);
} else {
LOG.info("try to split reader job by splitPk.");
list = super.split(originalConfig, adviceNumber);
}
for (Configuration config : list) { for (Configuration config : list) {
String jdbcUrl = config.getString(Key.JDBC_URL); String jdbcUrl = config.getString(Key.JDBC_URL);
String obRegionName = getObRegionName(jdbcUrl); String obRegionName = getObRegionName(jdbcUrl);
config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName); config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName);
} }
return list; return list;
} }
private boolean isSplitPkValid(Configuration originalConfig) {
String splitPk = originalConfig.getString(Key.SPLIT_PK);
return splitPk != null && splitPk.trim().length() > 0;
}
private String getObRegionName(String jdbcUrl) { private String getObRegionName(String jdbcUrl) {
if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { final String obJdbcDelimiter = com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING;
String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); if (jdbcUrl.startsWith(obJdbcDelimiter)) {
String[] ss = jdbcUrl.split(obJdbcDelimiter);
if (ss.length >= 2) { if (ss.length >= 2) {
String tenant = ss[1].trim(); String tenant = ss[1].trim();
String[] sss = tenant.split(":"); String[] sss = tenant.split(":");
return sss[0]; return sss[0];
} }
} }
return null; return null;
} }
} }

View File

@ -0,0 +1,35 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import java.util.ArrayList;
import java.util.List;
/**
* @author johnrobbet
*/
public class PartInfo {
private PartType partType;
List<String> partList;
public PartInfo(PartType partType) {
this.partType = partType;
this.partList = new ArrayList();
}
public String getPartType () {
return partType.getTypeString();
}
public void addPart(List partList) {
this.partList.addAll(partList);
}
public List<String> getPartList() {
return partList;
}
public boolean isPartitionTable() {
return partType != PartType.NONPARTITION && partList.size() > 0;
}
}

View File

@ -0,0 +1,23 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
/**
* @author johnrobbet
*/
public enum PartType {
NONPARTITION("NONPARTITION"),
PARTITION("PARTITION"),
SUBPARTITION("SUBPARTITION");
private String typeString;
PartType (String typeString) {
this.typeString = typeString;
}
public String getTypeString() {
return typeString;
}
}

View File

@ -0,0 +1,165 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.reader.util.HintUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
/**
* @author johnrobbet
*/
public class PartitionSplitUtil {
private static final Logger LOG = LoggerFactory.getLogger(PartitionSplitUtil.class);
public static List<Configuration> splitByPartition (Configuration configuration) {
List<Configuration> allSlices = new ArrayList<>();
List<Object> conns = configuration.getList(Constant.CONN_MARK, Object.class);
for (int i = 0, len = conns.size(); i < len; i++) {
Configuration sliceConfig = configuration.clone();
Configuration connConf = Configuration.from(conns.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
sliceConfig.remove(Constant.CONN_MARK);
List<String> tables = connConf.getList(Key.TABLE, String.class);
for (String table : tables) {
Configuration tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
allSlices.addAll(splitSinglePartitionTable(tempSlice));
}
}
return allSlices;
}
private static List<Configuration> splitSinglePartitionTable(Configuration configuration) {
String table = configuration.getString(Key.TABLE);
String where = configuration.getString(Key.WHERE, null);
String column = configuration.getString(Key.COLUMN);
final boolean weakRead = configuration.getBool(Key.WEAK_READ, true);
List<Configuration> slices = new ArrayList();
PartInfo partInfo = getObPartInfoBySQL(configuration, table);
if (partInfo != null && partInfo.isPartitionTable()) {
String partitionType = partInfo.getPartType();
for (String partitionName : partInfo.getPartList()) {
LOG.info(String.format("add %s %s for table %s", partitionType, partitionName, table));
Configuration slice = configuration.clone();
slice.set(ObReaderKey.PARTITION_NAME, partitionName);
slice.set(ObReaderKey.PARTITION_TYPE, partitionType);
slice.set(Key.QUERY_SQL,
ObReaderUtils.buildQuerySql(weakRead, column,
String.format("%s partition(%s)", table, partitionName), where));
slices.add(slice);
}
} else {
LOG.info("fail to get table part info or table is not partitioned, proceed as non-partitioned table.");
Configuration slice = configuration.clone();
slice.set(Key.QUERY_SQL, ObReaderUtils.buildQuerySql(weakRead, column, table, where));
slices.add(slice);
}
return slices;
}
private static PartInfo getObPartInfoBySQL(Configuration config, String table) {
PartInfo partInfo = new PartInfo(PartType.NONPARTITION);
List<String> partList;
Connection conn = null;
try {
String jdbcUrl = config.getString(Key.JDBC_URL);
String username = config.getString(Key.USERNAME);
String password = config.getString(Key.PASSWORD);
String dbname = ObReaderUtils.getDbNameFromJdbcUrl(jdbcUrl);
String allTable = "__all_table";
conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password);
String obVersion = getResultsFromSql(conn, "select version()").get(0);
LOG.info("obVersion: " + obVersion);
if (ObReaderUtils.compareObVersion("2.2.76", obVersion) < 0) {
allTable = "__all_table_v2";
}
String queryPart = String.format(
"select p.part_name " +
"from oceanbase.__all_part p, oceanbase.%s t, oceanbase.__all_database d " +
"where p.table_id = t.table_id " +
"and d.database_id = t.database_id " +
"and d.database_name = '%s' " +
"and t.table_name = '%s'", allTable, dbname, table);
String querySubPart = String.format(
"select p.sub_part_name " +
"from oceanbase.__all_sub_part p, oceanbase.%s t, oceanbase.__all_database d " +
"where p.table_id = t.table_id " +
"and d.database_id = t.database_id " +
"and d.database_name = '%s' " +
"and t.table_name = '%s'", allTable, dbname, table);
if (config.getString(ObReaderKey.OB_COMPATIBILITY_MODE).equals("ORACLE")) {
queryPart = String.format(
"select partition_name from all_tab_partitions where TABLE_OWNER = '%s' and table_name = '%s'",
dbname.toUpperCase(), table.toUpperCase());
querySubPart = String.format(
"select subpartition_name from all_tab_subpartitions where TABLE_OWNER = '%s' and table_name = '%s'",
dbname.toUpperCase(), table.toUpperCase());
}
PartType partType = PartType.SUBPARTITION;
// try subpartition first
partList = getResultsFromSql(conn, querySubPart);
// if table is not sub-partitioned, the try partition
if (partList.isEmpty()) {
partList = getResultsFromSql(conn, queryPart);
partType = PartType.PARTITION;
}
if (!partList.isEmpty()) {
partInfo = new PartInfo(partType);
partInfo.addPart(partList);
}
} catch (Exception ex) {
LOG.error("error when get partition list: " + ex.getMessage());
} finally {
DBUtil.closeDBResources(null, conn);
}
return partInfo;
}
private static List<String> getResultsFromSql(Connection conn, String sql) {
List<String> list = new ArrayList();
Statement stmt = null;
ResultSet rs = null;
LOG.info("executing sql: " + sql);
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
list.add(rs.getString(1));
}
} catch (Exception e) {
LOG.error("error when executing sql: " + e.getMessage());
} finally {
DBUtil.closeDBResources(rs, stmt, null);
}
return list;
}
}

View File

@ -19,15 +19,6 @@ public class TaskContext {
private boolean weakRead = true; private boolean weakRead = true;
private String userSavePoint; private String userSavePoint;
private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL;
public String getPartitionName() {
return partitionName;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
private String partitionName; private String partitionName;
// 断点续读的保存点 // 断点续读的保存点
@ -174,4 +165,12 @@ public class TaskContext {
public void setCompatibleMode(String compatibleMode) { public void setCompatibleMode(String compatibleMode) {
this.compatibleMode = compatibleMode; this.compatibleMode = compatibleMode;
} }
public String getPartitionName() {
return partitionName;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
} }

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import org.junit.Test;
public class ObReaderUtilsTest {
@Test
public void getDbTest() {
assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:oceanbase://127.0.0.1:2883/testdb").equalsIgnoreCase("testdb");
assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:mysql||_dsc_ob10_dsc_||jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:oracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
}
@Test
public void compareObVersionTest() {
assert ObReaderUtils.compareObVersion("2.2.70", "3.2.2") == -1;
assert ObReaderUtils.compareObVersion("2.2.70", "2.2.50") == 1;
assert ObReaderUtils.compareObVersion("2.2.70", "3.1.2") == -1;
assert ObReaderUtils.compareObVersion("3.1.2", "3.1.2") == 0;
}
}

View File

@ -93,6 +93,7 @@ public class SingleTableSplitUtil {
allQuerySql.add(tempQuerySql); allQuerySql.add(tempQuerySql);
tempConfig.set(Key.QUERY_SQL, tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql);
tempConfig.set(Key.WHERE, (hasWhere ? ("(" + where + ") and") : "") + range);
pluginParams.add(tempConfig); pluginParams.add(tempConfig);
} }
} else { } else {
@ -103,6 +104,7 @@ public class SingleTableSplitUtil {
+ String.format(" %s IS NOT NULL", splitPkName); + String.format(" %s IS NOT NULL", splitPkName);
allQuerySql.add(tempQuerySql); allQuerySql.add(tempQuerySql);
tempConfig.set(Key.QUERY_SQL, tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql);
tempConfig.set(Key.WHERE, (hasWhere ? "(" + where + ") and" : "") + String.format(" %s IS NOT NULL", splitPkName));
pluginParams.add(tempConfig); pluginParams.add(tempConfig);
} }
@ -118,6 +120,7 @@ public class SingleTableSplitUtil {
StringUtils.join(allQuerySql, "\n")); StringUtils.join(allQuerySql, "\n"));
tempConfig.set(Key.QUERY_SQL, tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql);
tempConfig.set(Key.WHERE, (hasWhere ? "(" + where + ") and" : "") + String.format(" %s IS NULL", splitPkName));
pluginParams.add(tempConfig); pluginParams.add(tempConfig);
return pluginParams; return pluginParams;
@ -254,6 +257,7 @@ public class SingleTableSplitUtil {
switch (SingleTableSplitUtil.DATABASE_TYPE) { switch (SingleTableSplitUtil.DATABASE_TYPE) {
case Oracle: case Oracle:
case OceanBase:
isValidLongType |= type == Types.NUMERIC; isValidLongType |= type == Types.NUMERIC;
break; break;
default: default: