From dcb541f04801afb9fc95f28e6aec348e216952a8 Mon Sep 17 00:00:00 2001 From: sanChouIsACat <993924507@qq.com> Date: Mon, 27 Dec 2021 17:02:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E4=B8=8A=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E7=89=88=E6=9C=AC=E6=8F=90=E5=88=B0=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../reader/oceanbasev10reader/ext/ReaderJob.java | 4 ++-- .../reader/oceanbasev10reader/ext/ReaderTask.java | 4 +++- .../oceanbasev10reader/util/ObReaderUtils.java | 14 +++++--------- .../oceanbasev10writer/OceanBaseV10Writer.java | 15 +++++++-------- .../task/ConcurrentTableWriterTask.java | 7 ++++++- .../oceanbasev10writer/util/ObWriterUtils.java | 10 ++++------ 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java index 2a31aaa1..f69a9166 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -26,7 +26,7 @@ public class ReaderJob extends CommonRdbmsReader.Job { public void init(Configuration originalConfig) { //将config中的column和table中的关键字进行转义 List columns = originalConfig.getList(Key.COLUMN, String.class); - ObReaderUtils.transferDatabaseKeywords(columns); + ObReaderUtils.escapeDatabaseKeywords(columns); originalConfig.set(Key.COLUMN, columns); List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); @@ -34,7 +34,7 @@ public class ReaderJob extends CommonRdbmsReader.Job { JSONObject conn = conns.get(i); Configuration connConfig = Configuration.from(conn.toString()); List tables = connConfig.getList(Key.TABLE, String.class); - ObReaderUtils.transferDatabaseKeywords(tables); + ObReaderUtils.escapeDatabaseKeywords(tables); originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables); } super.init(originalConfig); diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java index fab0b1fb..254b334c 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java @@ -66,10 +66,12 @@ public class ReaderTask extends CommonRdbmsReader.Task { if (ObReaderUtils.databaseType == DataBaseType.OceanBase) { jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 - compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; } else { jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 } + if(ObReaderUtils.compatibleMode==ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE){ + compatibleMode=ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; + } LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl); mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT); diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 8efee64f..143171db 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -40,7 +40,6 @@ public class ObReaderUtils { private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; private static Set databaseKeywords; - private static String currentKeywordsTpye; final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode"; final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE"; final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL"; @@ -54,23 +53,20 @@ public class ObReaderUtils { return new HashSet(Arrays.asList(keywords.split(","))); } - public static void transferDatabaseKeywords(List keywords) { - //判断是否需要更改关键字集合 - if (databaseKeywords == null || currentKeywordsTpye != compatibleMode) { + public static void escapeDatabaseKeywords(List keywords) { + if (databaseKeywords == null) { if (isOracleMode(compatibleMode.toString())) { databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); } else { databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS); } - currentKeywordsTpye = compatibleMode; } char escapeChar = isOracleMode(compatibleMode.toString()) ? '"' : '`'; for (int i = 0; i < keywords.size(); i++) { - String keyword = keywords.get(i).toUpperCase(); - if (databaseKeywords.contains(keyword)) { + String keyword = keywords.get(i); + if (databaseKeywords.contains(keyword.toUpperCase())) { keyword = escapeChar + keyword + escapeChar; } - keyword = keyword.toLowerCase(); keywords.set(i, keyword); } } @@ -160,7 +156,7 @@ public class ObReaderUtils { realIndex.add(columnName); } } - transferDatabaseKeywords(realIndex); + escapeDatabaseKeywords(realIndex); String[] pks = new String[realIndex.size()]; realIndex.toArray(pks); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java index bf1c9f60..417fccb3 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java @@ -3,7 +3,6 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer; import java.sql.*; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils; import com.alibaba.fastjson.JSONObject; @@ -62,17 +61,17 @@ public class OceanBaseV10Writer extends Writer { this.originalConfig = super.getPluginJobConf(); checkCompatibleMode(originalConfig); //将config中的column和table中的关键字进行转义 - List columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, String.class); - ObWriterUtils.transferDatabaseKeywords(columns); - originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, columns); + List columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, String.class); + ObWriterUtils.escapeDatabaseKeywords(columns); + originalConfig.set(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, columns); - List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); + List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, JSONObject.class); for (int i = 0; i < conns.size(); i++) { JSONObject conn = conns.get(i); Configuration connConfig = Configuration.from(conn.toString()); - List tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.TABLE, String.class); - ObWriterUtils.transferDatabaseKeywords(tables); - originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.reader.Key.TABLE), tables); + List tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE, String.class); + ObWriterUtils.escapeDatabaseKeywords(tables); + originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.writer.Key.TABLE), tables); } this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonJob.init(this.originalConfig); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index cbc9a936..f07c543b 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; @@ -105,7 +106,11 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { checkConnHolder.initConnection(); if (isOracleCompatibleMode) { connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); - table = table.toUpperCase(); + //在转义的情况下不翻译 + if(!Pattern.matches("\"\\w*\"",table)){ + table = table.toUpperCase(); + } + LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", connectInfo.databaseName, table)); } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index a8ff7a04..8e9b4204 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -21,26 +21,24 @@ public class ObWriterUtils { private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; private static Set databaseKeywords; private static String compatibleMode = null; - private static String currentKeywordsTpye =null; protected static final Logger LOG = LoggerFactory.getLogger(Task.class); private static Set keywordsFromString2HashSet(final String keywords) { return new HashSet(Arrays.asList(keywords.split(","))); } - public static void transferDatabaseKeywords(List keywords) { + public static void escapeDatabaseKeywords(List keywords) { //判断是否需要更改关键字集合 - if (databaseKeywords == null || currentKeywordsTpye != compatibleMode) { + if (databaseKeywords == null) { if (isOracleMode()) { databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); } else { databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS); } - currentKeywordsTpye = compatibleMode; } char escapeChar = isOracleMode() ? '"' : '`'; for (int i = 0; i < keywords.size(); i++) { - String keyword = keywords.get(i).toUpperCase(); - if (databaseKeywords.contains(keyword)) { + String keyword = keywords.get(i); + if (databaseKeywords.contains(keyword.toUpperCase())) { keyword = escapeChar + keyword + escapeChar; } keyword = keyword.toLowerCase();