From ca39f422d0f78f44afd2178ceb17930098661d45 Mon Sep 17 00:00:00 2001 From: sanChouIsACat <993924507@qq.com> Date: Wed, 22 Dec 2021 16:14:41 +0800 Subject: [PATCH] sd --- .../oceanbasev10reader/OceanBaseReader.java | 19 ++++- .../oceanbasev10reader/ext/ReaderJob.java | 31 ++++++++- .../util/DatabaseKeywordTransformer.java | 69 +++++++++++++++++++ .../util/ObReaderUtils.java | 10 +++ 4 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java index 0a4934a1..db9d34e0 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java @@ -1,8 +1,12 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader; +import java.sql.Array; import java.sql.Connection; +import java.util.ArrayList; import java.util.List; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer; +import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +26,7 @@ public class OceanBaseReader extends Reader { public static class Job extends Reader.Job { private Configuration originalConfig = null; private ReaderJob readerJob; + private DataBaseType DATABASE_TYPE; private static final Logger LOG = LoggerFactory.getLogger(Task.class); @Override @@ -37,10 +42,15 @@ public class OceanBaseReader extends Reader { setDatabaseType(originalConfig); - this.readerJob = new ReaderJob(); - this.readerJob.init(this.originalConfig); - } + + this.readerJob = new ReaderJob(); + this.readerJob.init(this.originalConfig,DATABASE_TYPE); + } + @Override + public void prepare(){ + //ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式 + } @Override public void preCheck() { init(); @@ -90,6 +100,9 @@ public class OceanBaseReader extends Reader { } catch (Exception e){ LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage()); } + finally { + DATABASE_TYPE=ObReaderUtils.DATABASE_TYPE; + } } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java index c56155f6..efeec2e9 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -6,15 +6,44 @@ import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; +import com.alibaba.fastjson.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ReaderJob extends CommonRdbmsReader.Job { public ReaderJob() { super(ObReaderUtils.DATABASE_TYPE); - } + } + public void init(Configuration originalConfig,DataBaseType databaseType){ + //将config中的column和table中的关键字进行转义 + final Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class); + try { + DatabaseKeywordTransformer.setDatabaseType(databaseType); + }catch (Exception e){ + LOG.warn("database type is "+databaseType+e.getMessage()); + } + List columns=originalConfig.getList(Key.COLUMN,String.class); + DatabaseKeywordTransformer.transferDatabaseKeywords(columns); + originalConfig.set(Key.COLUMN, columns); + + List conns=originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,JSONObject.class); + for(int i=0;i tables=connConfig.getList(Key.TABLE,String.class); + DatabaseKeywordTransformer.transferDatabaseKeywords(tables); + originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK,i,Key.TABLE),tables); + } + super.init(originalConfig); + } @Override public List split(Configuration originalConfig, int adviceNumber) { List list = super.split(originalConfig, adviceNumber); diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java new file mode 100644 index 00000000..ccbbfb25 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java @@ -0,0 +1,69 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +//java api +import java.util.HashSet; +import java.util.List; + +//dataX api +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; + +//该类用于转义数据库中的关键字 +public class DatabaseKeywordTransformer { + private static DataBaseType databaseType; + static HashSet databaseKeywords; + private static HashSet keywordsFromString2HashSet(final String Keywords){ + String[] keywordArray =Keywords.split(","); + HashSet res=new HashSet(); + for(String keyword: keywordArray){ + res.add(keyword); + } + return res; + } + public static void setDatabaseType(final DataBaseType databaseType) throws Exception { + if(databaseType==DatabaseKeywordTransformer.databaseType){ + return ; + } + DatabaseKeywordTransformer.databaseType = databaseType; + if(databaseType==DataBaseType.MySql){ + databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.MYSQL_KEYWORDS); + } + else if(databaseType==DataBaseType.Oracle){ + databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.ORACLE_KEYWORDS); + } + else if(databaseType==DataBaseType.SQLServer){ + databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.SQLSERVER_KEYWORDS); + } + else{ + throw new Exception("sorry,unknown database tpye..."); + } + } + + public static void transferDatabaseKeywords(List keywords){ + for(int i=0;i columns = context.getColumns(); + // 最后参与排序的索引列 + context.setPkColumns(pkColumns); + int[] pkIndexs = new int[pkColumns.length]; for (int i = 0, n = pkColumns.length; i < n; i++) { String pkc = pkColumns[i]; @@ -131,6 +134,13 @@ public class ObReaderUtils { realIndex.add(columnName); } } + //fix:将主键中的关键字转义 + DatabaseKeywordTransformer.setDatabaseType(DataBaseType.MySql); + if(isOracleMode(context.getCompatibleMode())){ + DatabaseKeywordTransformer.setDatabaseType(DataBaseType.Oracle); + } + DatabaseKeywordTransformer.transferDatabaseKeywords(realIndex); + String[] pks = new String[realIndex.size()]; realIndex.toArray(pks); return pks;