From 738c11e0e3e3e71d57b5b1799e560ecd96d0cb2e Mon Sep 17 00:00:00 2001 From: sanChouIsACat <993924507@qq.com> Date: Wed, 22 Dec 2021 19:54:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86=E7=94=B1=E4=BA=8ESQ?= =?UTF-8?q?L=E4=B8=AD=E7=9A=84=E4=BF=9D=E7=95=99=E5=AD=97=E4=BD=9C?= =?UTF-8?q?=E4=B8=BA=E8=A1=A8=E5=90=8D=E6=88=96=E8=80=85=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E5=90=8D=E8=80=8C=E5=BC=95=E8=B5=B7=E7=9A=84sql=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../oceanbasev10reader/OceanBaseReader.java | 21 +++--- .../util/DatabaseKeywordTransformer.java | 68 ++++++++++--------- 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java index db9d34e0..04d28482 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java @@ -37,20 +37,17 @@ public class OceanBaseReader extends Reader { if (userConfigedFetchSize != null) { LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead."); } - this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE); - setDatabaseType(originalConfig); - - - this.readerJob = new ReaderJob(); - this.readerJob.init(this.originalConfig,DATABASE_TYPE); + this.readerJob.init(this.originalConfig, DATABASE_TYPE); } + @Override - public void prepare(){ + public void prepare() { //ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式 } + @Override public void preCheck() { init(); @@ -80,7 +77,7 @@ public class OceanBaseReader extends Reader { Configuration connConf = Configuration.from(conns.get(0).toString()); List jdbcUrls = connConf.getList(Key.JDBC_URL, String.class); String jdbcUrl = jdbcUrls.get(0); - if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length != 3) { LOG.warn("unrecognized jdbc url: " + jdbcUrl); @@ -97,11 +94,11 @@ public class OceanBaseReader extends Reader { if (ObReaderUtils.isOracleMode(compatibleMode)) { ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase; } - } catch (Exception e){ + + } catch (Exception e) { LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage()); - } - finally { - DATABASE_TYPE=ObReaderUtils.DATABASE_TYPE; + } finally { + DATABASE_TYPE = ObReaderUtils.DATABASE_TYPE; } } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java index 2a330ccb..6d85cf69 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/DatabaseKeywordTransformer.java @@ -1,65 +1,67 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; //java api + import java.util.HashSet; import java.util.List; //dataX api import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -//该类用于转义数据库中的关键字 -public class DatabaseKeywordTransformer { +/** + * 该类用于转义数据库中的关键字 + * + * @author:qianzhang + */ +public class DatabaseKeywordTransformer { private static DataBaseType databaseType; static HashSet databaseKeywords; - private static HashSet keywordsFromString2HashSet(final String Keywords){ - String[] keywordArray =Keywords.split(","); - HashSet res=new HashSet(); - for(String keyword: keywordArray){ + + private static HashSet keywordsFromString2HashSet(final String keywords) { + String[] keywordArray = keywords.split(","); + HashSet res = new HashSet(); + for (String keyword : keywordArray) { res.add(keyword); } return res; } + public static void setDatabaseType(final DataBaseType databaseType) throws Exception { - if(databaseType==DatabaseKeywordTransformer.databaseType){ - return ; + if (databaseType == DatabaseKeywordTransformer.databaseType) { + return; } DatabaseKeywordTransformer.databaseType = databaseType; - if(databaseType==DataBaseType.MySql){ - databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.MYSQL_KEYWORDS); - } - else if(databaseType==DataBaseType.Oracle || databaseType==DataBaseType.OceanBase){ - databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.ORACLE_KEYWORDS); - } - else if(databaseType==DataBaseType.SQLServer){ - databaseKeywords=keywordsFromString2HashSet(DatabaseKeywords.SQLSERVER_KEYWORDS); - } - else{ + if (databaseType == DataBaseType.MySql) { + databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.MYSQL_KEYWORDS); + } else if (databaseType == DataBaseType.Oracle || databaseType == DataBaseType.OceanBase) { + databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.ORACLE_KEYWORDS); + } else if (databaseType == DataBaseType.SQLServer) { + databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.SQLSERVER_KEYWORDS); + } else { throw new Exception("sorry,unknown database tpye..."); } } - public static void transferDatabaseKeywords(List keywords){ - for(int i=0;i keywords) { + for (int i = 0; i < keywords.size(); i++) { + String keyword = keywords.get(i).toUpperCase(); + if (databaseKeywords.contains(keyword)) { + if (databaseType == DataBaseType.MySql) { + keyword = '`' + keyword + '`'; + } else if (databaseType == DataBaseType.Oracle || databaseType == DataBaseType.OceanBase) { + keyword = '"' + keyword + '"'; + } else if (databaseType == DataBaseType.SQLServer) { + keyword = '[' + keyword + ']'; } } - keyword=keyword.toLowerCase(); - keywords.set(i,keyword); + keyword = keyword.toLowerCase(); + keywords.set(i, keyword); } } } -final class DatabaseKeywords{ +final class DatabaseKeywords { public static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE";