最终版

This commit is contained in:
sanChouIsACat 2021-12-23 15:20:56 +08:00
parent 30ed711131
commit b017102e0e

View File

@ -17,32 +17,35 @@ import org.slf4j.LoggerFactory;
public class ReaderJob extends CommonRdbmsReader.Job { public class ReaderJob extends CommonRdbmsReader.Job {
private Logger LOG=LoggerFactory.getLogger(OceanBaseReader.Task.class); private Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class);
public ReaderJob() { public ReaderJob() {
super(ObReaderUtils.DATABASE_TYPE); super(ObReaderUtils.DATABASE_TYPE);
} }
public void init(Configuration originalConfig,DataBaseType databaseType){
public void init(Configuration originalConfig, DataBaseType databaseType) {
//将config中的column和table中的关键字进行转义 //将config中的column和table中的关键字进行转义
try { try {
DatabaseKeywordTransformer.setDatabaseType(databaseType); DatabaseKeywordTransformer.setDatabaseType(databaseType);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("database type is " + databaseType + e.getMessage()); LOG.warn("database type is " + databaseType + e.getMessage());
} }
List<String> columns=originalConfig.getList(Key.COLUMN,String.class); List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(columns); DatabaseKeywordTransformer.transferDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns); originalConfig.set(Key.COLUMN, columns);
List<JSONObject> conns=originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,JSONObject.class); List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
for(int i=0;i<conns.size();i++){ for (int i = 0; i < conns.size(); i++) {
JSONObject conn=conns.get(i); JSONObject conn = conns.get(i);
Configuration connConfig=Configuration.from(conn.toString()); Configuration connConfig = Configuration.from(conn.toString());
List<String> tables=connConfig.getList(Key.TABLE,String.class); List<String> tables = connConfig.getList(Key.TABLE, String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(tables); DatabaseKeywordTransformer.transferDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,i,Key.TABLE),tables); originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
} }
super.init(originalConfig); super.init(originalConfig);
} }
@Override @Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) { public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list = super.split(originalConfig, adviceNumber); List<Configuration> list = super.split(originalConfig, adviceNumber);