diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java index 228af811..e92e5025 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader; import java.sql.Connection; import java.util.List; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,21 @@ public class OceanBaseReader extends Reader { @Override public List split(int adviceNumber) { + String splitPk = originalConfig.getString(Key.SPLIT_PK); + List quotedColumns = originalConfig.getList(Key.COLUMN_LIST, String.class); + if (splitPk != null && splitPk.length() > 0 && quotedColumns != null) { + String escapeChar = ObReaderUtils.isOracleMode(originalConfig.getString(ObReaderKey.OB_COMPATIBILITY_MODE)) + ? "\"" : "`"; + if (!splitPk.startsWith(escapeChar) && !splitPk.endsWith(escapeChar)) { + splitPk = escapeChar + splitPk + escapeChar; + } + for (String column : quotedColumns) { + if (column.equals(splitPk)) { + LOG.info("splitPk is an ob reserved keyword, set to {}", splitPk); + originalConfig.set(Key.SPLIT_PK, splitPk); + } + } + } return this.readerJob.split(this.originalConfig, adviceNumber); } @@ -86,6 +102,7 @@ public class OceanBaseReader extends Reader { String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:"); Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password); String compatibleMode = ObReaderUtils.getCompatibleMode(conn); + config.set(ObReaderKey.OB_COMPATIBILITY_MODE, compatibleMode); if (ObReaderUtils.isOracleMode(compatibleMode)) { ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java new file mode 100644 index 00000000..57977ca4 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java @@ -0,0 +1,11 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; + +/** + * @author johnrobbet + */ +public class Constant { + + public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s "; + + public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)"; +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ObReaderKey.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ObReaderKey.java new file mode 100644 index 00000000..bc8f4525 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ObReaderKey.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; + +/** + * @author johnrobbet + */ +public class ObReaderKey { + + public final static String READ_BY_PARTITION = "readByPartition"; + + public final static String PARTITION_NAME = "partitionName"; + + public final static String PARTITION_TYPE = "partitionType"; + + public final static String OB_COMPATIBILITY_MODE = "obCompatibilityMode"; + +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java index f69a9166..291dc785 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -1,15 +1,16 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; +import java.util.Arrays; import java.util.List; import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Key; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.PartitionSplitUtil; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,37 +30,62 @@ public class ReaderJob extends CommonRdbmsReader.Job { ObReaderUtils.escapeDatabaseKeywords(columns); originalConfig.set(Key.COLUMN, columns); - List conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class); + List conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class); for (int i = 0; i < conns.size(); i++) { JSONObject conn = conns.get(i); Configuration connConfig = Configuration.from(conn.toString()); List tables = connConfig.getList(Key.TABLE, String.class); - ObReaderUtils.escapeDatabaseKeywords(tables); - originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables); + + // tables will be null when querySql is configured + if (tables != null) { + ObReaderUtils.escapeDatabaseKeywords(tables); + originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), + tables); + } } super.init(originalConfig); } @Override public List split(Configuration originalConfig, int adviceNumber) { - List list = super.split(originalConfig, adviceNumber); + List list; + // readByPartition is lower priority than splitPk. + // and readByPartition only works in table mode. + if (!isSplitPkValid(originalConfig) && + originalConfig.getBool(Constant.IS_TABLE_MODE) && + originalConfig.getBool(ObReaderKey.READ_BY_PARTITION, false)) { + LOG.info("try to split reader job by partition."); + list = PartitionSplitUtil.splitByPartition(originalConfig); + } else { + LOG.info("try to split reader job by splitPk."); + list = super.split(originalConfig, adviceNumber); + } + for (Configuration config : list) { String jdbcUrl = config.getString(Key.JDBC_URL); String obRegionName = getObRegionName(jdbcUrl); config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName); } + return list; } + private boolean isSplitPkValid(Configuration originalConfig) { + String splitPk = originalConfig.getString(Key.SPLIT_PK); + return splitPk != null && splitPk.trim().length() > 0; + } + private String getObRegionName(String jdbcUrl) { - if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { - String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); + final String obJdbcDelimiter = com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING; + if (jdbcUrl.startsWith(obJdbcDelimiter)) { + String[] ss = jdbcUrl.split(obJdbcDelimiter); if (ss.length >= 2) { String tenant = ss[1].trim(); String[] sss = tenant.split(":"); return sss[0]; } } + return null; } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 20c2f922..6356b97b 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -1,8 +1,10 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.Constant; import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; @@ -18,11 +20,13 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +/** + * @author johnrobbet + */ public class ObReaderUtils { - private static final Logger LOG = LoggerFactory.getLogger(ObReaderUtils.class); private static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE"; - private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; + private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH,KEY,NAME,VALUE,TYPE"; private static Set databaseKeywords; final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode"; @@ -53,9 +57,11 @@ public class ObReaderUtils { return keyword; } - public static void escapeDatabaseKeywords(List keywords) { - for (int i = 0; i < keywords.size(); i++) { - keywords.set(i, escapeDatabaseKeywords(keywords.get(i))); + public static void escapeDatabaseKeywords(List ids) { + if (ids != null && ids.size() > 0) { + for (int i = 0; i < ids.size(); i++) { + ids.set(i, escapeDatabaseKeywords(ids.get(i))); + } } } @@ -107,15 +113,19 @@ public class ObReaderUtils { context.setPkColumns(pkColumns); + final String escapeChar = isOracleMode(context.getCompatibleMode()) ? "\"" : "`"; int[] pkIndexs = new int[pkColumns.length]; for (int i = 0, n = pkColumns.length; i < n; i++) { String pkc = pkColumns[i]; + String escapedPkc = String.format("%s%s%s", escapeChar, pkc, escapeChar); int j = 0; for (int k = columns.size(); j < k; j++) { // 如果用户定义的 columns中 带有 ``,也不影响, // 最多只是在select里多加了几列PK column - if (StringUtils.equalsIgnoreCase(pkc, columns.get(j))) { + if (StringUtils.equalsIgnoreCase(pkc, columns.get(j)) + || StringUtils.equalsIgnoreCase(escapedPkc, columns.get(j))) { pkIndexs[i] = j; + pkColumns[i] = columns.get(j); break; } } @@ -133,10 +143,20 @@ public class ObReaderUtils { String sql = "show index from " + tableName + " where Key_name='PRIMARY'"; if (isOracleMode(context.getCompatibleMode())) { tableName = tableName.toUpperCase(); - sql = "SELECT cols.column_name Column_name " + + String schema; + if (tableName.contains(".")) { + schema = String.format("'%s'", tableName.substring(0, tableName.indexOf("."))); + tableName = tableName.substring(tableName.indexOf(".") + 1); + } else { + schema = "(select sys_context('USERENV','current_schema') from dual)"; + } + sql = String.format( + "SELECT cols.column_name Column_name " + "FROM all_constraints cons, all_cons_columns cols " + - "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type = 'P' " + - "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; + "WHERE cols.table_name = '%s' AND cons.constraint_type = 'P' " + + "AND cons.constraint_name = cols.constraint_name " + + "AND cons.owner = cols.owner and cons.OWNER = %s", + tableName, schema); } LOG.info("get primary key by sql: " + sql); Statement ps = null; @@ -146,25 +166,27 @@ public class ObReaderUtils { try { ps = conn.createStatement(); rs = ps.executeQuery(sql); + boolean hasPk = false; while (rs.next()) { + hasPk = true; String columnName = rs.getString("Column_name"); columnName = escapeDatabaseKeywords(columnName); - if (!isEscapeMode(columnName)) { - columnName.toLowerCase(); - } if (!realIndex.contains(columnName)) { realIndex.add(columnName); } } - String[] pks = new String[realIndex.size()]; - realIndex.toArray(pks); - return pks; + if (hasPk) { + String[] pks = new String[realIndex.size()]; + realIndex.toArray(pks); + return pks; + } } catch (Throwable e) { LOG.error("show index from table fail :" + sql, e); } finally { close(rs, ps, null); } + return null; } @@ -347,7 +369,6 @@ public class ObReaderUtils { return; } SQLExpr expr = SQLUtils.toSQLExpr(context.getWhere(), "mysql"); - LOG.info("expr: " + expr); List allColumnsInTab = getAllColumnFromTab(conn, context.getTable()); List allColNames = getColNames(allColumnsInTab, expr); @@ -439,9 +460,19 @@ public class ObReaderUtils { Map> allIndex = new HashMap>(); String sql = "show index from " + tableName; if (isOracleMode(compatibleMode)) { + String schema; tableName = tableName.toUpperCase(); - sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " + - "from dba_ind_columns where TABLE_NAME = '" + tableName + "' " + + if (tableName.contains(".")) { + schema = String.format("'%s'", tableName.substring(0, tableName.indexOf("."))); + tableName = tableName.substring(tableName.indexOf(".") + 1); + } else { + schema = "(select sys_context('USERENV','current_schema') from dual)"; + } + + sql = String.format( + "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " + + "from all_ind_columns " + + "where TABLE_NAME = '%s' and TABLE_OWNER = %s " + " union all " + "SELECT DISTINCT " + "CASE " + @@ -451,9 +482,12 @@ public class ObReaderUtils { "END AS Key_name, " + "cols.column_name Column_name " + "FROM all_constraints cons, all_cons_columns cols " + - "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " + - "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; + "WHERE cols.table_name = '%s' AND cons.constraint_type in('P', 'U') " + + "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner " + + "AND cons.owner = %s", + tableName, schema, tableName, schema); } + Statement stmt = null; ResultSet rs = null; @@ -476,11 +510,26 @@ public class ObReaderUtils { // add primary key to all index if (allIndex.containsKey("PRIMARY")) { List colsInPrimary = allIndex.get("PRIMARY"); - for (String keyName : allIndex.keySet()) { - if (keyName.equals("PRIMARY")) { + Iterator>> iterator = allIndex.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + if (entry.getKey().equals("PRIMARY")) { continue; } - allIndex.get(keyName).addAll(colsInPrimary); + + // remove the index which is identical with primary key + List indexColumns = entry.getValue(); + if (colsInPrimary.equals(indexColumns)) { + iterator.remove(); + } else { + // add primary key to the index if the index is not on the column + colsInPrimary.forEach( + c -> { + if (!indexColumns.contains(c)) { + indexColumns.add(c); + } + }); + } } } } catch (Exception e) { @@ -494,6 +543,7 @@ public class ObReaderUtils { } /** + * find out the indexes which contains all columns in where conditions * @param conn * @param table * @param colNamesInCondition @@ -507,7 +557,7 @@ public class ObReaderUtils { return indexNames; } - LOG.info("columNamesInConditions: " + String.join(",", colNamesInCondition)); + LOG.info("columnNamesInConditions: " + String.join(",", colNamesInCondition)); Map> allIndex = getAllIndex(conn, table, compatibleMode); for (String keyName : allIndex.keySet()) { @@ -518,7 +568,7 @@ public class ObReaderUtils { if (allIndex.get(keyName).size() < colNamesInCondition.size()) { indexNotMatch = true; } else { - // the the first number columns of this index + // the first number columns of this index int num = colNamesInCondition.size(); for (String colName : allIndex.get(keyName)) { if (!colNamesInCondition.contains(colName)) { @@ -718,4 +768,64 @@ public class ObReaderUtils { public static boolean isOracleMode(String mode) { return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equalsIgnoreCase(mode)); } + + public static String getDbNameFromJdbcUrl(String jdbcUrl) { + final Pattern pattern = Pattern.compile("jdbc:(oceanbase|mysql)://([\\w\\.-]+:\\d+)/([\\w\\.-]+)"); + + Matcher matcher = pattern.matcher(jdbcUrl); + if (matcher.find()) { + return matcher.group(3); + } else { + LOG.error("jdbc url {} is not valid.", jdbcUrl); + } + + return null; + } + + public static String buildQuerySql(boolean weakRead, String column, String table, String where) { + if (weakRead) { + return buildWeakReadQuerySql(column, table, where); + } else { + return SingleTableSplitUtil.buildQuerySql(column, table, where); + } + } + + public static String buildWeakReadQuerySql(String column, String table, String where) { + String querySql; + + if (StringUtils.isBlank(where)) { + querySql = String.format(Constant.WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE, column, table); + } else { + querySql = String.format(Constant.WEAK_READ_QUERY_SQL_TEMPLATE, column, table, where); + } + + return querySql; + } + + /** + * compare two ob versions + * @param version1 + * @param version2 + * @return 0 when the two versions are the same + * -1 when version1 is smaller (earlier) than version2 + * 1 when version is bigger (later) than version2 + */ + public static int compareObVersion(String version1, String version2) { + if (version1 == null || version2 == null) { + throw new RuntimeException("can not compare null version"); + } + + String[] ver1Part = version1.split("\\."); + String[] ver2Part = version2.split("\\."); + for (int i = 0; i < ver1Part.length; i++) { + int v1 = Integer.parseInt(ver1Part[i]), v2 = Integer.parseInt(ver2Part[i]); + if (v1 > v2) { + return 1; + } else if (v1 < v2) { + return -1; + } + } + + return 0; + } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartInfo.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartInfo.java new file mode 100644 index 00000000..7a9a6f70 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartInfo.java @@ -0,0 +1,35 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author johnrobbet + */ +public class PartInfo { + + private PartType partType; + + List partList; + + public PartInfo(PartType partType) { + this.partType = partType; + this.partList = new ArrayList(); + } + + public String getPartType () { + return partType.getTypeString(); + } + + public void addPart(List partList) { + this.partList.addAll(partList); + } + + public List getPartList() { + return partList; + } + + public boolean isPartitionTable() { + return partType != PartType.NONPARTITION && partList.size() > 0; + } +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartType.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartType.java new file mode 100644 index 00000000..be190755 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartType.java @@ -0,0 +1,23 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +/** + * @author johnrobbet + */ + +public enum PartType { + NONPARTITION("NONPARTITION"), + PARTITION("PARTITION"), + SUBPARTITION("SUBPARTITION"); + + private String typeString; + + PartType (String typeString) { + this.typeString = typeString; + } + + public String getTypeString() { + return typeString; + } +} + + diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java new file mode 100644 index 00000000..3bf2320a --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java @@ -0,0 +1,165 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.reader.util.HintUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +/** + * @author johnrobbet + */ +public class PartitionSplitUtil { + private static final Logger LOG = LoggerFactory.getLogger(PartitionSplitUtil.class); + + public static List splitByPartition (Configuration configuration) { + List allSlices = new ArrayList<>(); + List conns = configuration.getList(Constant.CONN_MARK, Object.class); + for (int i = 0, len = conns.size(); i < len; i++) { + Configuration sliceConfig = configuration.clone(); + Configuration connConf = Configuration.from(conns.get(i).toString()); + String jdbcUrl = connConf.getString(Key.JDBC_URL); + sliceConfig.set(Key.JDBC_URL, jdbcUrl); + sliceConfig.remove(Constant.CONN_MARK); + + List tables = connConf.getList(Key.TABLE, String.class); + for (String table : tables) { + Configuration tempSlice = sliceConfig.clone(); + tempSlice.set(Key.TABLE, table); + allSlices.addAll(splitSinglePartitionTable(tempSlice)); + } + } + + return allSlices; + } + + private static List splitSinglePartitionTable(Configuration configuration) { + String table = configuration.getString(Key.TABLE); + String where = configuration.getString(Key.WHERE, null); + String column = configuration.getString(Key.COLUMN); + final boolean weakRead = configuration.getBool(Key.WEAK_READ, true); + + List slices = new ArrayList(); + PartInfo partInfo = getObPartInfoBySQL(configuration, table); + if (partInfo != null && partInfo.isPartitionTable()) { + String partitionType = partInfo.getPartType(); + for (String partitionName : partInfo.getPartList()) { + LOG.info(String.format("add %s %s for table %s", partitionType, partitionName, table)); + Configuration slice = configuration.clone(); + slice.set(ObReaderKey.PARTITION_NAME, partitionName); + slice.set(ObReaderKey.PARTITION_TYPE, partitionType); + slice.set(Key.QUERY_SQL, + ObReaderUtils.buildQuerySql(weakRead, column, + String.format("%s partition(%s)", table, partitionName), where)); + slices.add(slice); + } + } else { + LOG.info("fail to get table part info or table is not partitioned, proceed as non-partitioned table."); + + Configuration slice = configuration.clone(); + slice.set(Key.QUERY_SQL, ObReaderUtils.buildQuerySql(weakRead, column, table, where)); + slices.add(slice); + } + + return slices; + } + + private static PartInfo getObPartInfoBySQL(Configuration config, String table) { + PartInfo partInfo = new PartInfo(PartType.NONPARTITION); + List partList; + Connection conn = null; + try { + String jdbcUrl = config.getString(Key.JDBC_URL); + String username = config.getString(Key.USERNAME); + String password = config.getString(Key.PASSWORD); + String dbname = ObReaderUtils.getDbNameFromJdbcUrl(jdbcUrl); + String allTable = "__all_table"; + + conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password); + String obVersion = getResultsFromSql(conn, "select version()").get(0); + + LOG.info("obVersion: " + obVersion); + + if (ObReaderUtils.compareObVersion("2.2.76", obVersion) < 0) { + allTable = "__all_table_v2"; + } + + String queryPart = String.format( + "select p.part_name " + + "from oceanbase.__all_part p, oceanbase.%s t, oceanbase.__all_database d " + + "where p.table_id = t.table_id " + + "and d.database_id = t.database_id " + + "and d.database_name = '%s' " + + "and t.table_name = '%s'", allTable, dbname, table); + String querySubPart = String.format( + "select p.sub_part_name " + + "from oceanbase.__all_sub_part p, oceanbase.%s t, oceanbase.__all_database d " + + "where p.table_id = t.table_id " + + "and d.database_id = t.database_id " + + "and d.database_name = '%s' " + + "and t.table_name = '%s'", allTable, dbname, table); + if (config.getString(ObReaderKey.OB_COMPATIBILITY_MODE).equals("ORACLE")) { + queryPart = String.format( + "select partition_name from all_tab_partitions where TABLE_OWNER = '%s' and table_name = '%s'", + dbname.toUpperCase(), table.toUpperCase()); + querySubPart = String.format( + "select subpartition_name from all_tab_subpartitions where TABLE_OWNER = '%s' and table_name = '%s'", + dbname.toUpperCase(), table.toUpperCase()); + } + + PartType partType = PartType.SUBPARTITION; + + // try subpartition first + partList = getResultsFromSql(conn, querySubPart); + + // if table is not sub-partitioned, the try partition + if (partList.isEmpty()) { + partList = getResultsFromSql(conn, queryPart); + partType = PartType.PARTITION; + } + + if (!partList.isEmpty()) { + partInfo = new PartInfo(partType); + partInfo.addPart(partList); + } + } catch (Exception ex) { + LOG.error("error when get partition list: " + ex.getMessage()); + } finally { + DBUtil.closeDBResources(null, conn); + } + + return partInfo; + } + + private static List getResultsFromSql(Connection conn, String sql) { + List list = new ArrayList(); + Statement stmt = null; + ResultSet rs = null; + + LOG.info("executing sql: " + sql); + + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + while (rs.next()) { + list.add(rs.getString(1)); + } + } catch (Exception e) { + LOG.error("error when executing sql: " + e.getMessage()); + } finally { + DBUtil.closeDBResources(rs, stmt, null); + } + + return list; + } +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java index 17655a52..d482232a 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java @@ -19,15 +19,6 @@ public class TaskContext { private boolean weakRead = true; private String userSavePoint; private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; - - public String getPartitionName() { - return partitionName; - } - - public void setPartitionName(String partitionName) { - this.partitionName = partitionName; - } - private String partitionName; // 断点续读的保存点 @@ -174,4 +165,12 @@ public class TaskContext { public void setCompatibleMode(String compatibleMode) { this.compatibleMode = compatibleMode; } + + public String getPartitionName() { + return partitionName; + } + + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } } diff --git a/oceanbasev10reader/src/test/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtilsTest.java b/oceanbasev10reader/src/test/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtilsTest.java new file mode 100644 index 00000000..bc387767 --- /dev/null +++ b/oceanbasev10reader/src/test/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtilsTest.java @@ -0,0 +1,22 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +import org.junit.Test; + +public class ObReaderUtilsTest { + + @Test + public void getDbTest() { + assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb"); + assert ObReaderUtils.getDbNameFromJdbcUrl("jdbc:oceanbase://127.0.0.1:2883/testdb").equalsIgnoreCase("testdb"); + assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:mysql||_dsc_ob10_dsc_||jdbc:mysql://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb"); + assert ObReaderUtils.getDbNameFromJdbcUrl("||_dsc_ob10_dsc_||obcluster:oracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:3306/testdb").equalsIgnoreCase("testdb"); + } + + @Test + public void compareObVersionTest() { + assert ObReaderUtils.compareObVersion("2.2.70", "3.2.2") == -1; + assert ObReaderUtils.compareObVersion("2.2.70", "2.2.50") == 1; + assert ObReaderUtils.compareObVersion("2.2.70", "3.1.2") == -1; + assert ObReaderUtils.compareObVersion("3.1.2", "3.1.2") == 0; + } +} diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java index d9846b39..7e09cce5 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java @@ -93,6 +93,7 @@ public class SingleTableSplitUtil { allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); + tempConfig.set(Key.WHERE, (hasWhere ? ("(" + where + ") and") : "") + range); pluginParams.add(tempConfig); } } else { @@ -103,6 +104,7 @@ public class SingleTableSplitUtil { + String.format(" %s IS NOT NULL", splitPkName); allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); + tempConfig.set(Key.WHERE, (hasWhere ? "(" + where + ") and" : "") + String.format(" %s IS NOT NULL", splitPkName)); pluginParams.add(tempConfig); } @@ -118,6 +120,7 @@ public class SingleTableSplitUtil { StringUtils.join(allQuerySql, "\n")); tempConfig.set(Key.QUERY_SQL, tempQuerySql); + tempConfig.set(Key.WHERE, (hasWhere ? "(" + where + ") and" : "") + String.format(" %s IS NULL", splitPkName)); pluginParams.add(tempConfig); return pluginParams; @@ -254,6 +257,7 @@ public class SingleTableSplitUtil { switch (SingleTableSplitUtil.DATABASE_TYPE) { case Oracle: + case OceanBase: isValidLongType |= type == Types.NUMERIC; break; default: