diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java index 0a4934a1..228af811 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java @@ -32,19 +32,21 @@ public class OceanBaseReader extends Reader { if (userConfigedFetchSize != null) { LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead."); } - this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE); - setDatabaseType(originalConfig); - this.readerJob = new ReaderJob(); this.readerJob.init(this.originalConfig); } + @Override + public void prepare() { + //ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式 + } + @Override public void preCheck() { init(); - this.readerJob.preCheck(this.originalConfig, ObReaderUtils.DATABASE_TYPE); + this.readerJob.preCheck(this.originalConfig, ObReaderUtils.databaseType); } @@ -70,7 +72,7 @@ public class OceanBaseReader extends Reader { Configuration connConf = Configuration.from(conns.get(0).toString()); List jdbcUrls = connConf.getList(Key.JDBC_URL, String.class); String jdbcUrl = jdbcUrls.get(0); - if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length != 3) { LOG.warn("unrecognized jdbc url: " + jdbcUrl); @@ -85,9 +87,10 @@ public class OceanBaseReader extends Reader { Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password); String compatibleMode = ObReaderUtils.getCompatibleMode(conn); if (ObReaderUtils.isOracleMode(compatibleMode)) { - ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase; + ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; } - } catch (Exception e){ + + } catch (Exception e) { LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage()); } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java index c56155f6..f69a9166 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -6,13 +6,38 @@ import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; +import com.alibaba.fastjson.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ReaderJob extends CommonRdbmsReader.Job { + private Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class); public ReaderJob() { - super(ObReaderUtils.DATABASE_TYPE); + super(ObReaderUtils.databaseType); + } + + @Override + public void init(Configuration originalConfig) { + //将config中的column和table中的关键字进行转义 + List columns = originalConfig.getList(Key.COLUMN, String.class); + ObReaderUtils.escapeDatabaseKeywords(columns); + originalConfig.set(Key.COLUMN, columns); + + List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); + for (int i = 0; i < conns.size(); i++) { + JSONObject conn = conns.get(i); + Configuration connConfig = Configuration.from(conn.toString()); + List tables = connConfig.getList(Key.TABLE, String.class); + ObReaderUtils.escapeDatabaseKeywords(tables); + originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables); + } + super.init(originalConfig); } @Override diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java index 073bb3cb..a43dcebd 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java @@ -1,13 +1,5 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; -import java.sql.*; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordSender; @@ -19,11 +11,17 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; public class ReaderTask extends CommonRdbmsReader.Task { private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class); @@ -41,11 +39,12 @@ public class ReaderTask extends CommonRdbmsReader.Task { private boolean reuseConn = false; public ReaderTask(int taskGroupId, int taskId) { - super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId); + super(ObReaderUtils.databaseType, taskGroupId, taskId); this.taskGroupId = taskGroupId; this.taskId = taskId; } + @Override public void init(Configuration readerSliceConfig) { /* for database connection */ username = readerSliceConfig.getString(Key.USERNAME); @@ -54,7 +53,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND, Config.DEFAULT_QUERY_TIMEOUT_SECOND); // ob10的处理 - if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length == 3) { LOG.info("this is ob1_0 jdbc url."); @@ -63,16 +62,14 @@ public class ReaderTask extends CommonRdbmsReader.Task { } } - if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) { - jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 + jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 + if (ObReaderUtils.compatibleMode.equals(ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)) { compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; - } else { - jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 } LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl); mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT); - LOG.info("retryLimit: "+ retryLimit); + LOG.info("retryLimit: " + retryLimit); } private void buildSavePoint(TaskContext context) { @@ -83,7 +80,6 @@ public class ReaderTask extends CommonRdbmsReader.Task { } /** - * * 如果isTableMode && table有PK *

* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游) @@ -92,7 +88,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { */ @Override public void startRead(Configuration readerSliceConfig, RecordSender recordSender, - TaskPluginCollector taskPluginCollector, int fetchSize) { + TaskPluginCollector taskPluginCollector, int fetchSize) { String querySql = readerSliceConfig.getString(Key.QUERY_SQL); String table = readerSliceConfig.getString(Key.TABLE); PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl); @@ -131,14 +127,14 @@ public class ReaderTask extends CommonRdbmsReader.Task { } private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender, - TaskPluginCollector taskPluginCollector) { + TaskPluginCollector taskPluginCollector) { // 不是table模式 直接使用原来的做法 if (!isTableMode) { doRead(recordSender, taskPluginCollector, context); return; } // check primary key index - Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password); + Connection conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password); ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds); context.setConn(conn); try { @@ -184,11 +180,11 @@ public class ReaderTask extends CommonRdbmsReader.Task { } } catch (Throwable e) { if (retryLimit == ++retryCount) { - throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e), + throw RdbmsException.asQueryException(ObReaderUtils.databaseType, new Exception(e), context.getQuerySql(), context.getTable(), username); } LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" + - context.getSavePoint() + ", error: "+ e.getMessage()); + context.getSavePoint() + ", error: " + e.getMessage()); ObReaderUtils.sleep(60000); // sleep 10s } // 假如原来的查询有查出数据,则改成增量查询 @@ -227,7 +223,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { LOG.info("connection is alive, will reuse this connection."); } else { LOG.info("Create new connection for reader."); - conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password); + conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password); ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds); context.setConn(conn); } @@ -287,7 +283,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { ObReaderUtils.close(null, null, context.getConn()); context.setConn(null); LOG.error("reader data fail", e); - throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(), + throw RdbmsException.asQueryException(ObReaderUtils.databaseType, e, context.getQuerySql(), context.getTable(), username); } finally { perfRecord.end(); diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 2290fb43..20c2f922 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -1,52 +1,71 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.datax.common.element.BoolColumn; -import com.alibaba.datax.common.element.BytesColumn; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.DoubleColumn; -import com.alibaba.datax.common.element.LongColumn; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.element.*; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.*; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class ObReaderUtils { private static final Logger LOG = LoggerFactory.getLogger(ObReaderUtils.class); + private static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE"; + private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; + private static Set databaseKeywords; final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode"; final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE"; final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL"; - public static DataBaseType DATABASE_TYPE = DataBaseType.MySql; + public static String compatibleMode = OB_COMPATIBLE_MODE_MYSQL; + + public static final DataBaseType databaseType = DataBaseType.OceanBase; + + + private static Set keywordsFromString2HashSet(final String keywords) { + return new HashSet(Arrays.asList(keywords.split(","))); + } + + public static String escapeDatabaseKeywords(String keyword) { + if (databaseKeywords == null) { + if (isOracleMode(compatibleMode)) { + databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); + } else { + databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS); + } + } + char escapeChar = isOracleMode(compatibleMode) ? '"' : '`'; + if (databaseKeywords.contains(keyword.toUpperCase())) { + keyword = escapeChar + keyword + escapeChar; + } + return keyword; + } + + public static void escapeDatabaseKeywords(List keywords) { + for (int i = 0; i < keywords.size(); i++) { + keywords.set(i, escapeDatabaseKeywords(keywords.get(i))); + } + } + + public static Boolean isEscapeMode(String keyword) { + if (isOracleMode(compatibleMode)) { + return keyword.startsWith("\"") && keyword.endsWith("\""); + } else { + return keyword.startsWith("`") && keyword.endsWith("`"); + } + } public static void initConn4Reader(Connection conn, long queryTimeoutSeconds) { String setQueryTimeout = "set ob_query_timeout=" + (queryTimeoutSeconds * 1000 * 1000L); @@ -57,7 +76,7 @@ public class ObReaderUtils { stmt = conn.createStatement(); stmt.execute(setQueryTimeout); stmt.execute(setTrxTimeout); - LOG.warn("setAutoCommit=true;"+setQueryTimeout+";"+setTrxTimeout+";"); + LOG.warn("setAutoCommit=true;" + setQueryTimeout + ";" + setTrxTimeout + ";"); } catch (Throwable e) { LOG.warn("initConn4Reader fail", e); } finally { @@ -73,7 +92,6 @@ public class ObReaderUtils { } /** - * * @param conn * @param context */ @@ -84,8 +102,11 @@ public class ObReaderUtils { return; } List columns = context.getColumns(); + // 最后参与排序的索引列 + context.setPkColumns(pkColumns); + int[] pkIndexs = new int[pkColumns.length]; for (int i = 0, n = pkColumns.length; i < n; i++) { String pkc = pkColumns[i]; @@ -112,9 +133,9 @@ public class ObReaderUtils { String sql = "show index from " + tableName + " where Key_name='PRIMARY'"; if (isOracleMode(context.getCompatibleMode())) { tableName = tableName.toUpperCase(); - sql = "SELECT cols.column_name Column_name "+ + sql = "SELECT cols.column_name Column_name " + "FROM all_constraints cons, all_cons_columns cols " + - "WHERE cols.table_name = '" + tableName+ "' AND cons.constraint_type = 'P' " + + "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type = 'P' " + "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; } LOG.info("get primary key by sql: " + sql); @@ -126,11 +147,16 @@ public class ObReaderUtils { ps = conn.createStatement(); rs = ps.executeQuery(sql); while (rs.next()) { - String columnName = StringUtils.lowerCase(rs.getString("Column_name")); + String columnName = rs.getString("Column_name"); + columnName = escapeDatabaseKeywords(columnName); + if (!isEscapeMode(columnName)) { + columnName.toLowerCase(); + } if (!realIndex.contains(columnName)) { realIndex.add(columnName); } } + String[] pks = new String[realIndex.size()]; realIndex.toArray(pks); return pks; @@ -156,7 +182,7 @@ public class ObReaderUtils { if (StringUtils.isNotEmpty(indexName)) { String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+"; sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ "; - } else if (weakRead){ + } else if (weakRead) { sql += " /*+READ_CONSISTENCY(WEAK)*/ "; } sql += StringUtils.join(context.getColumns(), ','); @@ -187,7 +213,6 @@ public class ObReaderUtils { * 增量查的SQL * * @param conn - * * @param context * @return sql */ @@ -197,8 +222,8 @@ public class ObReaderUtils { String sql = "select "; if (StringUtils.isNotEmpty(indexName)) { String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+"; - sql += " /*"+ weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ "; - } else if (weakRead){ + sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ "; + } else if (weakRead) { sql += " /*+READ_CONSISTENCY(WEAK)*/ "; } sql += StringUtils.join(context.getColumns(), ',') + " from " + context.getTable(); @@ -295,7 +320,7 @@ public class ObReaderUtils { final char rightBracket = ')'; if (str != null && str.contains(String.valueOf(leftBracket)) && str.contains(String.valueOf(rightBracket)) && str.indexOf(leftBracket) < str.indexOf(rightBracket)) { - return str.substring(str.indexOf(leftBracket)+1, str.indexOf(rightBracket)); + return str.substring(str.indexOf(leftBracket) + 1, str.indexOf(rightBracket)); } return str; } @@ -364,7 +389,7 @@ public class ObReaderUtils { /** * 找出where条件中的列名,目前仅支持全部为and条件,并且操作符为大于、大约等于、等于、小于、小于等于和不等于的表达式。 - * + *

* test coverage: - c6 = 20180710 OR c4 = 320: no index selected - 20180710 * = c6: correct index selected - 20180710 = c6 and c4 = 320 or c2 < 100: no * index selected @@ -416,17 +441,17 @@ public class ObReaderUtils { if (isOracleMode(compatibleMode)) { tableName = tableName.toUpperCase(); sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " + - "from dba_ind_columns where TABLE_NAME = '" + tableName +"' " + + "from dba_ind_columns where TABLE_NAME = '" + tableName + "' " + " union all " + - "SELECT DISTINCT " + - "CASE " + - "WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " + - "WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " + - "ELSE '' " + - "END AS Key_name, " + - "cols.column_name Column_name " + - "FROM all_constraints cons, all_cons_columns cols " + - "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " + + "SELECT DISTINCT " + + "CASE " + + "WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " + + "WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " + + "ELSE '' " + + "END AS Key_name, " + + "cols.column_name Column_name " + + "FROM all_constraints cons, all_cons_columns cols " + + "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " + "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; } Statement stmt = null; @@ -469,14 +494,13 @@ public class ObReaderUtils { } /** - * * @param conn * @param table * @param colNamesInCondition * @return */ private static List getIndexName(Connection conn, String table, - Set colNamesInCondition, String compatibleMode) { + Set colNamesInCondition, String compatibleMode) { List indexNames = new ArrayList(); if (colNamesInCondition == null || colNamesInCondition.size() == 0) { LOG.info("there is no qulified conditions in the where clause, skip index selection."); @@ -540,7 +564,7 @@ public class ObReaderUtils { Map index = new TreeMap(); List columnList = allIndexInTab.get(indexName); for (int i = 1; i <= columnList.size(); i++) { - index.put(i, columnList.get(i-1)); + index.put(i, columnList.get(i - 1)); } allIndexs.put(indexName, index); } else { @@ -644,19 +668,19 @@ public class ObReaderUtils { public static void binding(PreparedStatement ps, List list) throws SQLException { for (int i = 0, n = list.size(); i < n; i++) { Column c = list.get(i); - if(c instanceof BoolColumn){ - ps.setLong(i + 1, ((BoolColumn)c).asLong()); - }else if(c instanceof BytesColumn){ - ps.setBytes(i + 1, ((BytesColumn)c).asBytes()); - }else if(c instanceof DateColumn){ - ps.setTimestamp(i + 1, new Timestamp(((DateColumn)c).asDate().getTime())); - }else if(c instanceof DoubleColumn){ - ps.setDouble(i + 1, ((DoubleColumn)c).asDouble()); - }else if(c instanceof LongColumn){ - ps.setLong(i + 1, ((LongColumn)c).asLong()); - }else if(c instanceof StringColumn){ - ps.setString(i + 1, ((StringColumn)c).asString()); - }else{ + if (c instanceof BoolColumn) { + ps.setLong(i + 1, ((BoolColumn) c).asLong()); + } else if (c instanceof BytesColumn) { + ps.setBytes(i + 1, ((BytesColumn) c).asBytes()); + } else if (c instanceof DateColumn) { + ps.setTimestamp(i + 1, new Timestamp(((DateColumn) c).asDate().getTime())); + } else if (c instanceof DoubleColumn) { + ps.setDouble(i + 1, ((DoubleColumn) c).asDouble()); + } else if (c instanceof LongColumn) { + ps.setLong(i + 1, ((LongColumn) c).asLong()); + } else if (c instanceof StringColumn) { + ps.setString(i + 1, ((StringColumn) c).asString()); + } else { ps.setObject(i + 1, c.getRawData()); } } @@ -692,6 +716,6 @@ public class ObReaderUtils { } public static boolean isOracleMode(String mode) { - return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equals(mode)); + return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equalsIgnoreCase(mode)); } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java index ba754a37..17655a52 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java @@ -162,6 +162,7 @@ public class TaskContext { public String getUserSavePoint() { return userSavePoint; } + public void setUserSavePoint(String userSavePoint) { this.userSavePoint = userSavePoint; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java index 4ffaffed..ede2eb01 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java @@ -1,15 +1,5 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer; -import java.sql.*; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; @@ -20,7 +10,16 @@ import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; /** * 2016-04-07 @@ -60,6 +59,19 @@ public class OceanBaseV10Writer extends Writer { public void init() { this.originalConfig = super.getPluginJobConf(); checkCompatibleMode(originalConfig); + //将config中的column和table中的关键字进行转义 + List columns = originalConfig.getList(Key.COLUMN, String.class); + ObWriterUtils.escapeDatabaseKeywords(columns); + originalConfig.set(Key.COLUMN, columns); + + List conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class); + for (int i = 0; i < conns.size(); i++) { + JSONObject conn = conns.get(i); + Configuration connConfig = Configuration.from(conn.toString()); + List tables = connConfig.getList(Key.TABLE, String.class); + ObWriterUtils.escapeDatabaseKeywords(tables); + originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), tables); + } this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonJob.init(this.originalConfig); } @@ -222,6 +234,7 @@ public class OceanBaseV10Writer extends Writer { /** * 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。 */ + @Override public void startWrite(RecordReceiver recordReceiver) { this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index cbc9a936..e6b4a561 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -1,7 +1,27 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; +import com.alipay.oceanbase.obproxy.data.TableEntryKey; +import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.Connection; -//import java.sql.PreparedStatement; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; @@ -16,27 +36,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.plugin.RecordReceiver; -import com.alibaba.datax.common.plugin.TaskPluginCollector; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; -import com.alipay.oceanbase.obproxy.data.TableEntryKey; -import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator; +//import java.sql.PreparedStatement; public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class); @@ -62,6 +62,7 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private ObPartitionIdCalculator partCalculator = null; private HashMap> groupInsertValues; + List unknownPartRecords = new ArrayList(); // private List unknownPartRecords; private List partitionKeyIndexes; @@ -104,10 +105,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { connectInfo.getFullUserName(), connectInfo.password); checkConnHolder.initConnection(); if (isOracleCompatibleMode) { - connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); - table = table.toUpperCase(); - LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", - connectInfo.databaseName, table)); + connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); + //在转义的情况下不翻译 + if (!(table.startsWith("\"") && table.endsWith("\""))) { + table = table.toUpperCase(); + } + + LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", + connectInfo.databaseName, table)); } if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) { @@ -289,20 +294,15 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } private void addLeftRecords() { + //不需要刷新Cache,已经是最后一批数据了 for (List groupValues : groupInsertValues.values()) { if (groupValues.size() > 0 ) { - int retry = 0; - while (true) { - try { - concurrentWriter.addBatchRecords(groupValues); - break; - } catch (InterruptedException e) { - retry++; - LOG.info("Concurrent table writer is interrupted, retry {}", retry); - } - } + addRecordsToWriteQueue(groupValues); } } + if (unknownPartRecords.size() > 0) { + addRecordsToWriteQueue(unknownPartRecords); + } } private void addRecordToCache(final Record record) { @@ -326,41 +326,40 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } groupValues.add(record); if (groupValues.size() >= batchSize) { - int i = 0; - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(groupValues); - printEveryTime(); - break; - } catch (InterruptedException e) { - LOG.info("Concurrent table writer is interrupted"); - } - } - groupValues = new ArrayList(batchSize); + groupValues = addRecordsToWriteQueue(groupValues); groupInsertValues.put(partId, groupValues); } } else { - LOG.warn("add unknown part record {}", record); - List unknownPartRecords = new ArrayList(); + LOG.debug("add unknown part record {}", record); unknownPartRecords.add(record); - int i = 0; - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(unknownPartRecords); - break; - } catch (InterruptedException e) { - LOG.info("Concurrent table writer is interrupted"); - } + if (unknownPartRecords.size() >= batchSize) { + unknownPartRecords = addRecordsToWriteQueue(unknownPartRecords); } + } } + /** + * + * @param records + * @return 返回一个新的Cache用于存储接下来的数据 + */ + private List addRecordsToWriteQueue(List records) { + int i = 0; + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(records); + break; + } catch (InterruptedException e) { + i++; + LOG.info("Concurrent table writer is interrupted"); + } + } + return new ArrayList(batchSize); + } private void checkMemStore() { Connection checkConn = checkConnHolder.reconnect(); long now = System.currentTimeMillis(); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index 368c3d17..ff1648a1 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -11,15 +11,47 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.*; -import java.util.stream.Collectors; public class ObWriterUtils { - protected static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE"; + private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; - + private static Set databaseKeywords; private static String compatibleMode = null; + protected static final Logger LOG = LoggerFactory.getLogger(Task.class); + private static Set keywordsFromString2HashSet(final String keywords) { + return new HashSet(Arrays.asList(keywords.split(","))); + } + public static String escapeDatabaseKeywords(String keyword) { + if (databaseKeywords == null) { + if (isOracleMode()) { + databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); + } else { + databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS); + } + } + char escapeChar = isOracleMode() ? '"' : '`'; + if (databaseKeywords.contains(keyword.toUpperCase())) { + keyword = escapeChar + keyword + escapeChar; + } + return keyword; + } + + public static void escapeDatabaseKeywords(List keywords) { + for (int i = 0; i < keywords.size(); i++) { + keywords.set(i, escapeDatabaseKeywords(keywords.get(i))); + } + } + public static Boolean isEscapeMode(String keyword){ + if(isOracleMode()){ + return keyword.startsWith("\"") && keyword.endsWith("\""); + }else{ + return keyword.startsWith("`") && keyword.endsWith("`"); + } + } public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) { PreparedStatement ps = null; ResultSet rs = null; @@ -70,7 +102,11 @@ public class ObWriterUtils { } private static int[] getColumnIndex(List columnsInIndex, List allColumns) { - allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList()); + for (int i = 0; i < allColumns.size(); i++) { + if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) { + allColumns.set(i, allColumns.get(i).toUpperCase()); + } + } int[] colIdx = new int[columnsInIndex.size()]; for (int i = 0; i < columnsInIndex.size(); i++) { int index = allColumns.indexOf(columnsInIndex.get(i)); @@ -122,7 +158,11 @@ public class ObWriterUtils { rs = stmt.executeQuery(sql); while (rs.next()) { String keyName = rs.getString("Key_name"); - String columnName = StringUtils.upperCase(rs.getString("Column_name")); + String columnName = rs.getString("Column_name"); + columnName=escapeDatabaseKeywords(columnName); + if(!ObWriterUtils.isEscapeMode(columnName)){ + columnName = columnName.toUpperCase(); + } List s = uniqueKeys.get(keyName); if (s == null) { s = new ArrayList(); @@ -135,6 +175,7 @@ public class ObWriterUtils { } finally { asyncClose(rs, stmt, null); } + //ObWriterUtils.escapeDatabaseKeywords(uniqueKeys); return uniqueKeys; } @@ -291,6 +332,7 @@ public class ObWriterUtils { * @param e * @return */ + public static boolean isFatalError(SQLException e) { String sqlState = e.getSQLState(); if (StringUtils.startsWith(sqlState, "08")) {