mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 07:10:23 +08:00
obreader: readByPartition
This commit is contained in:
parent
cb518114dc
commit
2d37cf10b9
@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader;
|
||||
import java.sql.Connection;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -86,6 +87,7 @@ public class OceanBaseReader extends Reader {
|
||||
String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:");
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
|
||||
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
|
||||
config.set(ObReaderKey.OB_COMPATIBILITY_MODE, compatibleMode);
|
||||
if (ObReaderUtils.isOracleMode(compatibleMode)) {
|
||||
ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||
}
|
||||
|
@ -0,0 +1,11 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
||||
|
||||
/**
|
||||
* @author johnrobbet
|
||||
*/
|
||||
public class Constant {
|
||||
|
||||
public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s ";
|
||||
|
||||
public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)";
|
||||
}
|
@ -0,0 +1,16 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
||||
|
||||
/**
|
||||
* @author johnrobbet
|
||||
*/
|
||||
public class ObReaderKey {
|
||||
|
||||
public final static String READ_BY_PARTITION = "readByPartition";
|
||||
|
||||
public final static String PARTITION_NAME = "partitionName";
|
||||
|
||||
public final static String PARTITION_TYPE = "partitionType";
|
||||
|
||||
public final static String OB_COMPATIBILITY_MODE = "obCompatibilityMode";
|
||||
|
||||
}
|
@ -6,10 +6,10 @@ import com.alibaba.datax.common.constant.CommonConstant;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
|
||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -34,20 +34,34 @@ public class ReaderJob extends CommonRdbmsReader.Job {
|
||||
JSONObject conn = conns.get(i);
|
||||
Configuration connConfig = Configuration.from(conn.toString());
|
||||
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
||||
ObReaderUtils.escapeDatabaseKeywords(tables);
|
||||
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
|
||||
|
||||
// tables will be null when querySql is configured
|
||||
if (tables != null) {
|
||||
ObReaderUtils.escapeDatabaseKeywords(tables);
|
||||
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE),
|
||||
tables);
|
||||
}
|
||||
}
|
||||
super.init(originalConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
|
||||
List<Configuration> list = super.split(originalConfig, adviceNumber);
|
||||
List<Configuration> list;
|
||||
if (originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) {
|
||||
LOG.info("try to split reader job by partition.");
|
||||
list = PartitionSplitUtil.splitByPartition(originalConfig);
|
||||
} else {
|
||||
LOG.info("try to split reader job by splitPk.");
|
||||
list = super.split(originalConfig, adviceNumber);
|
||||
}
|
||||
|
||||
for (Configuration config : list) {
|
||||
String jdbcUrl = config.getString(Key.JDBC_URL);
|
||||
String obRegionName = getObRegionName(jdbcUrl);
|
||||
config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName);
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
@ -60,6 +74,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
|
||||
return sss[0];
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because one or more lines are too long
@ -0,0 +1,35 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author johnrobbet
|
||||
*/
|
||||
public class PartInfo {
|
||||
|
||||
private PartType partType;
|
||||
|
||||
List<String> partList;
|
||||
|
||||
public PartInfo(PartType partType) {
|
||||
this.partType = partType;
|
||||
this.partList = new ArrayList();
|
||||
}
|
||||
|
||||
public String getPartType () {
|
||||
return partType.getTypeString();
|
||||
}
|
||||
|
||||
public void addPart(List partList) {
|
||||
this.partList.addAll(partList);
|
||||
}
|
||||
|
||||
public List<String> getPartList() {
|
||||
return partList;
|
||||
}
|
||||
|
||||
public boolean isPartitionTable() {
|
||||
return partType != PartType.NONPARTITION && partList.size() > 0;
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
|
||||
|
||||
/**
|
||||
* @author johnrobbet
|
||||
*/
|
||||
|
||||
public enum PartType {
|
||||
NONPARTITION("NONPARTITION"),
|
||||
PARTITION("PARTITION"),
|
||||
SUBPARTITION("SUBPARTITION");
|
||||
|
||||
private String typeString;
|
||||
|
||||
PartType (String typeString) {
|
||||
this.typeString = typeString;
|
||||
}
|
||||
|
||||
public String getTypeString() {
|
||||
return typeString;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,165 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
|
||||
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.Constant;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.util.HintUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author johnrobbet
|
||||
*/
|
||||
public class PartitionSplitUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PartitionSplitUtil.class);
|
||||
|
||||
public static List<Configuration> splitByPartition (Configuration configuration) {
|
||||
List<Configuration> allSlices = new ArrayList<>();
|
||||
List<Object> conns = configuration.getList(Constant.CONN_MARK, Object.class);
|
||||
for (int i = 0, len = conns.size(); i < len; i++) {
|
||||
Configuration sliceConfig = configuration.clone();
|
||||
Configuration connConf = Configuration.from(conns.get(i).toString());
|
||||
String jdbcUrl = connConf.getString(Key.JDBC_URL);
|
||||
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
|
||||
sliceConfig.remove(Constant.CONN_MARK);
|
||||
|
||||
List<String> tables = connConf.getList(Key.TABLE, String.class);
|
||||
for (String table : tables) {
|
||||
Configuration tempSlice = sliceConfig.clone();
|
||||
tempSlice.set(Key.TABLE, table);
|
||||
allSlices.addAll(splitSinglePartitionTable(tempSlice));
|
||||
}
|
||||
}
|
||||
|
||||
return allSlices;
|
||||
}
|
||||
|
||||
private static List<Configuration> splitSinglePartitionTable(Configuration configuration) {
|
||||
String table = configuration.getString(Key.TABLE);
|
||||
String where = configuration.getString(Key.WHERE, null);
|
||||
String column = configuration.getString(Key.COLUMN);
|
||||
final boolean weakRead = configuration.getBool(Key.WEAK_READ, true);
|
||||
|
||||
List<Configuration> slices = new ArrayList();
|
||||
PartInfo partInfo = getObPartInfoBySQL(configuration, table);
|
||||
if (partInfo != null && partInfo.isPartitionTable()) {
|
||||
String partitionType = partInfo.getPartType();
|
||||
for (String partitionName : partInfo.getPartList()) {
|
||||
LOG.info(String.format("add %s %s for table %s", partitionType, partitionName, table));
|
||||
Configuration slice = configuration.clone();
|
||||
slice.set(ObReaderKey.PARTITION_NAME, partitionName);
|
||||
slice.set(ObReaderKey.PARTITION_TYPE, partitionType);
|
||||
slice.set(Key.QUERY_SQL,
|
||||
ObReaderUtils.buildQuerySql(weakRead, column,
|
||||
String.format("%s partition(%s)", table, partitionName), where));
|
||||
slices.add(slice);
|
||||
}
|
||||
} else {
|
||||
LOG.info("fail to get table part info or table is not partitioned, proceed as non-partitioned table.");
|
||||
|
||||
Configuration slice = configuration.clone();
|
||||
slice.set(Key.QUERY_SQL, ObReaderUtils.buildQuerySql(weakRead, column, table, where));
|
||||
slices.add(slice);
|
||||
}
|
||||
|
||||
return slices;
|
||||
}
|
||||
|
||||
private static PartInfo getObPartInfoBySQL(Configuration config, String table) {
|
||||
PartInfo partInfo = new PartInfo(PartType.NONPARTITION);
|
||||
List<String> partList;
|
||||
Connection conn = null;
|
||||
try {
|
||||
String jdbcUrl = config.getString(Key.JDBC_URL);
|
||||
String username = config.getString(Key.USERNAME);
|
||||
String password = config.getString(Key.PASSWORD);
|
||||
String dbname = ObReaderUtils.getDbNameFromJdbcUrl(jdbcUrl);
|
||||
String allTable = "__all_table";
|
||||
|
||||
conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password);
|
||||
String obVersion = getResultsFromSql(conn, "select version()").get(0);
|
||||
|
||||
LOG.info("obVersion: " + obVersion);
|
||||
|
||||
if (ObReaderUtils.compareObVersion("2.2.76", obVersion) < 0) {
|
||||
allTable = "__all_table_v2";
|
||||
}
|
||||
|
||||
String queryPart = String.format(
|
||||
"select p.part_name " +
|
||||
"from oceanbase.__all_part p, oceanbase.%s t, oceanbase.__all_database d " +
|
||||
"where p.table_id = t.table_id " +
|
||||
"and d.database_id = t.database_id " +
|
||||
"and d.database_name = '%s' " +
|
||||
"and t.table_name = '%s'", allTable, dbname, table);
|
||||
String querySubPart = String.format(
|
||||
"select p.sub_part_name " +
|
||||
"from oceanbase.__all_sub_part p, oceanbase.%s t, oceanbase.__all_database d " +
|
||||
"where p.table_id = t.table_id " +
|
||||
"and d.database_id = t.database_id " +
|
||||
"and d.database_name = '%s' " +
|
||||
"and t.table_name = '%s'", allTable, dbname, table);
|
||||
if (config.getString(ObReaderKey.OB_COMPATIBILITY_MODE).equals("ORACLE")) {
|
||||
queryPart = String.format(
|
||||
"select partition_name from all_tab_partitions where TABLE_OWNER = '%s' and table_name = '%s'",
|
||||
dbname.toUpperCase(), table.toUpperCase());
|
||||
querySubPart = String.format(
|
||||
"select subpartition_name from all_tab_subpartitions where TABLE_OWNER = '%s' and table_name = '%s'",
|
||||
dbname.toUpperCase(), table.toUpperCase());
|
||||
}
|
||||
|
||||
PartType partType = PartType.SUBPARTITION;
|
||||
|
||||
// try subpartition first
|
||||
partList = getResultsFromSql(conn, querySubPart);
|
||||
|
||||
// if table is not sub-partitioned, the try partition
|
||||
if (partList.isEmpty()) {
|
||||
partList = getResultsFromSql(conn, queryPart);
|
||||
partType = PartType.PARTITION;
|
||||
}
|
||||
|
||||
if (!partList.isEmpty()) {
|
||||
partInfo = new PartInfo(partType);
|
||||
partInfo.addPart(partList);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("error when get partition list: " + ex.getMessage());
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, conn);
|
||||
}
|
||||
|
||||
return partInfo;
|
||||
}
|
||||
|
||||
private static List<String> getResultsFromSql(Connection conn, String sql) {
|
||||
List<String> list = new ArrayList();
|
||||
Statement stmt = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
LOG.info("executing sql: " + sql);
|
||||
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
rs = stmt.executeQuery(sql);
|
||||
while (rs.next()) {
|
||||
list.add(rs.getString(1));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("error when executing sql: " + e.getMessage());
|
||||
} finally {
|
||||
DBUtil.closeDBResources(rs, stmt, null);
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
}
|
@ -19,15 +19,6 @@ public class TaskContext {
|
||||
private boolean weakRead = true;
|
||||
private String userSavePoint;
|
||||
private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL;
|
||||
|
||||
public String getPartitionName() {
|
||||
return partitionName;
|
||||
}
|
||||
|
||||
public void setPartitionName(String partitionName) {
|
||||
this.partitionName = partitionName;
|
||||
}
|
||||
|
||||
private String partitionName;
|
||||
|
||||
// 断点续读的保存点
|
||||
@ -174,4 +165,12 @@ public class TaskContext {
|
||||
public void setCompatibleMode(String compatibleMode) {
|
||||
this.compatibleMode = compatibleMode;
|
||||
}
|
||||
|
||||
public String getPartitionName() {
|
||||
return partitionName;
|
||||
}
|
||||
|
||||
public void setPartitionName(String partitionName) {
|
||||
this.partitionName = partitionName;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class ObReaderUtilsTest {
|
||||
|
||||
@Test
|
||||
public void getDbTest() {
|
||||
assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
|
||||
assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:oceanbase://127.0.0.1:2883/testdb").equalsIgnoreCase("testdb");
|
||||
assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:mysql||_dsc_ob10_dsc_||jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
|
||||
assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:oracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compareObVersionTest() {
|
||||
assert ObReaderUtils.compareObVersion("2.2.70", "3.2.2") == -1;
|
||||
assert ObReaderUtils.compareObVersion("2.2.70", "2.2.50") == 1;
|
||||
assert ObReaderUtils.compareObVersion("2.2.70", "3.1.2") == -1;
|
||||
assert ObReaderUtils.compareObVersion("3.1.2", "3.1.2") == 0;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user