This commit is contained in:
sanChouIsACat 2021-12-22 16:14:41 +08:00
parent 6980d89883
commit ca39f422d0
4 changed files with 125 additions and 4 deletions

View File

@ -1,8 +1,12 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader;
import java.sql.Array;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,6 +26,7 @@ public class OceanBaseReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig = null;
private ReaderJob readerJob;
private DataBaseType DATABASE_TYPE;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
@ -37,10 +42,15 @@ public class OceanBaseReader extends Reader {
setDatabaseType(originalConfig);
this.readerJob = new ReaderJob();
this.readerJob.init(this.originalConfig);
}
this.readerJob = new ReaderJob();
this.readerJob.init(this.originalConfig,DATABASE_TYPE);
}
@Override
public void prepare(){
//ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式
}
@Override
public void preCheck() {
init();
@ -90,6 +100,9 @@ public class OceanBaseReader extends Reader {
} catch (Exception e){
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
}
finally {
DATABASE_TYPE=ObReaderUtils.DATABASE_TYPE;
}
}
}

View File

@ -6,15 +6,44 @@ import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReaderJob extends CommonRdbmsReader.Job {
public ReaderJob() {
super(ObReaderUtils.DATABASE_TYPE);
}
}
public void init(Configuration originalConfig,DataBaseType databaseType){
//将config中的column和table中的关键字进行转义
final Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class);
try {
DatabaseKeywordTransformer.setDatabaseType(databaseType);
}catch (Exception e){
LOG.warn("database type is "+databaseType+e.getMessage());
}
List<String> columns=originalConfig.getList(Key.COLUMN,String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns);
List<JSONObject> conns=originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,JSONObject.class);
for(int i=0;i<conns.size();i++){
JSONObject conn=conns.get(i);
Configuration connConfig=Configuration.from(conn.toString());
List<String> tables=connConfig.getList(Key.TABLE,String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,i,Key.TABLE),tables);
}
super.init(originalConfig);
}
@Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list = super.split(originalConfig, adviceNumber);

View File

@ -84,8 +84,11 @@ public class ObReaderUtils {
return;
}
List<String> columns = context.getColumns();
// 最后参与排序的索引列
context.setPkColumns(pkColumns);
int[] pkIndexs = new int[pkColumns.length];
for (int i = 0, n = pkColumns.length; i < n; i++) {
String pkc = pkColumns[i];
@ -131,6 +134,13 @@ public class ObReaderUtils {
realIndex.add(columnName);
}
}
//fix:将主键中的关键字转义
DatabaseKeywordTransformer.setDatabaseType(DataBaseType.MySql);
if(isOracleMode(context.getCompatibleMode())){
DatabaseKeywordTransformer.setDatabaseType(DataBaseType.Oracle);
}
DatabaseKeywordTransformer.transferDatabaseKeywords(realIndex);
String[] pks = new String[realIndex.size()];
realIndex.toArray(pks);
return pks;