mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 03:40:24 +08:00
修改了上一个版本提到的问题
This commit is contained in:
parent
55351fcbaa
commit
dcb541f048
@ -26,7 +26,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
|
|||||||
public void init(Configuration originalConfig) {
|
public void init(Configuration originalConfig) {
|
||||||
//将config中的column和table中的关键字进行转义
|
//将config中的column和table中的关键字进行转义
|
||||||
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
|
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
|
||||||
ObReaderUtils.transferDatabaseKeywords(columns);
|
ObReaderUtils.escapeDatabaseKeywords(columns);
|
||||||
originalConfig.set(Key.COLUMN, columns);
|
originalConfig.set(Key.COLUMN, columns);
|
||||||
|
|
||||||
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
|
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
|
||||||
@ -34,7 +34,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
|
|||||||
JSONObject conn = conns.get(i);
|
JSONObject conn = conns.get(i);
|
||||||
Configuration connConfig = Configuration.from(conn.toString());
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
||||||
ObReaderUtils.transferDatabaseKeywords(tables);
|
ObReaderUtils.escapeDatabaseKeywords(tables);
|
||||||
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
|
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
|
||||||
}
|
}
|
||||||
super.init(originalConfig);
|
super.init(originalConfig);
|
||||||
|
@ -66,10 +66,12 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
|
|
||||||
if (ObReaderUtils.databaseType == DataBaseType.OceanBase) {
|
if (ObReaderUtils.databaseType == DataBaseType.OceanBase) {
|
||||||
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
||||||
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
|
||||||
} else {
|
} else {
|
||||||
jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
||||||
}
|
}
|
||||||
|
if(ObReaderUtils.compatibleMode==ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE){
|
||||||
|
compatibleMode=ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||||
|
}
|
||||||
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
||||||
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
||||||
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
|
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
|
||||||
|
@ -40,7 +40,6 @@ public class ObReaderUtils {
|
|||||||
private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
|
private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
|
||||||
|
|
||||||
private static Set<String> databaseKeywords;
|
private static Set<String> databaseKeywords;
|
||||||
private static String currentKeywordsTpye;
|
|
||||||
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
|
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
|
||||||
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
|
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
|
||||||
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
|
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
|
||||||
@ -54,23 +53,20 @@ public class ObReaderUtils {
|
|||||||
return new HashSet(Arrays.asList(keywords.split(",")));
|
return new HashSet(Arrays.asList(keywords.split(",")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void transferDatabaseKeywords(List<String> keywords) {
|
public static void escapeDatabaseKeywords(List<String> keywords) {
|
||||||
//判断是否需要更改关键字集合
|
if (databaseKeywords == null) {
|
||||||
if (databaseKeywords == null || currentKeywordsTpye != compatibleMode) {
|
|
||||||
if (isOracleMode(compatibleMode.toString())) {
|
if (isOracleMode(compatibleMode.toString())) {
|
||||||
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
|
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
|
||||||
} else {
|
} else {
|
||||||
databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
|
databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
|
||||||
}
|
}
|
||||||
currentKeywordsTpye = compatibleMode;
|
|
||||||
}
|
}
|
||||||
char escapeChar = isOracleMode(compatibleMode.toString()) ? '"' : '`';
|
char escapeChar = isOracleMode(compatibleMode.toString()) ? '"' : '`';
|
||||||
for (int i = 0; i < keywords.size(); i++) {
|
for (int i = 0; i < keywords.size(); i++) {
|
||||||
String keyword = keywords.get(i).toUpperCase();
|
String keyword = keywords.get(i);
|
||||||
if (databaseKeywords.contains(keyword)) {
|
if (databaseKeywords.contains(keyword.toUpperCase())) {
|
||||||
keyword = escapeChar + keyword + escapeChar;
|
keyword = escapeChar + keyword + escapeChar;
|
||||||
}
|
}
|
||||||
keyword = keyword.toLowerCase();
|
|
||||||
keywords.set(i, keyword);
|
keywords.set(i, keyword);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,7 +156,7 @@ public class ObReaderUtils {
|
|||||||
realIndex.add(columnName);
|
realIndex.add(columnName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
transferDatabaseKeywords(realIndex);
|
escapeDatabaseKeywords(realIndex);
|
||||||
|
|
||||||
String[] pks = new String[realIndex.size()];
|
String[] pks = new String[realIndex.size()];
|
||||||
realIndex.toArray(pks);
|
realIndex.toArray(pks);
|
||||||
|
@ -3,7 +3,6 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer;
|
|||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
@ -62,17 +61,17 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
checkCompatibleMode(originalConfig);
|
checkCompatibleMode(originalConfig);
|
||||||
//将config中的column和table中的关键字进行转义
|
//将config中的column和table中的关键字进行转义
|
||||||
List<String> columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, String.class);
|
List<String> columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, String.class);
|
||||||
ObWriterUtils.transferDatabaseKeywords(columns);
|
ObWriterUtils.escapeDatabaseKeywords(columns);
|
||||||
originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, columns);
|
originalConfig.set(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, columns);
|
||||||
|
|
||||||
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
|
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, JSONObject.class);
|
||||||
for (int i = 0; i < conns.size(); i++) {
|
for (int i = 0; i < conns.size(); i++) {
|
||||||
JSONObject conn = conns.get(i);
|
JSONObject conn = conns.get(i);
|
||||||
Configuration connConfig = Configuration.from(conn.toString());
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
List<String> tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.TABLE, String.class);
|
List<String> tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE, String.class);
|
||||||
ObWriterUtils.transferDatabaseKeywords(tables);
|
ObWriterUtils.escapeDatabaseKeywords(tables);
|
||||||
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.reader.Key.TABLE), tables);
|
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.writer.Key.TABLE), tables);
|
||||||
}
|
}
|
||||||
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
||||||
this.commonJob.init(this.originalConfig);
|
this.commonJob.init(this.originalConfig);
|
||||||
|
@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
import com.alibaba.datax.common.element.Column;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
||||||
@ -105,7 +106,11 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
checkConnHolder.initConnection();
|
checkConnHolder.initConnection();
|
||||||
if (isOracleCompatibleMode) {
|
if (isOracleCompatibleMode) {
|
||||||
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
||||||
table = table.toUpperCase();
|
//在转义的情况下不翻译
|
||||||
|
if(!Pattern.matches("\"\\w*\"",table)){
|
||||||
|
table = table.toUpperCase();
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
||||||
connectInfo.databaseName, table));
|
connectInfo.databaseName, table));
|
||||||
}
|
}
|
||||||
|
@ -21,26 +21,24 @@ public class ObWriterUtils {
|
|||||||
private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?";
|
private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?";
|
||||||
private static Set<String> databaseKeywords;
|
private static Set<String> databaseKeywords;
|
||||||
private static String compatibleMode = null;
|
private static String compatibleMode = null;
|
||||||
private static String currentKeywordsTpye =null;
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||||
private static Set<String> keywordsFromString2HashSet(final String keywords) {
|
private static Set<String> keywordsFromString2HashSet(final String keywords) {
|
||||||
return new HashSet(Arrays.asList(keywords.split(",")));
|
return new HashSet(Arrays.asList(keywords.split(",")));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void transferDatabaseKeywords(List<String> keywords) {
|
public static void escapeDatabaseKeywords(List<String> keywords) {
|
||||||
//判断是否需要更改关键字集合
|
//判断是否需要更改关键字集合
|
||||||
if (databaseKeywords == null || currentKeywordsTpye != compatibleMode) {
|
if (databaseKeywords == null) {
|
||||||
if (isOracleMode()) {
|
if (isOracleMode()) {
|
||||||
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
|
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
|
||||||
} else {
|
} else {
|
||||||
databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
|
databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
|
||||||
}
|
}
|
||||||
currentKeywordsTpye = compatibleMode;
|
|
||||||
}
|
}
|
||||||
char escapeChar = isOracleMode() ? '"' : '`';
|
char escapeChar = isOracleMode() ? '"' : '`';
|
||||||
for (int i = 0; i < keywords.size(); i++) {
|
for (int i = 0; i < keywords.size(); i++) {
|
||||||
String keyword = keywords.get(i).toUpperCase();
|
String keyword = keywords.get(i);
|
||||||
if (databaseKeywords.contains(keyword)) {
|
if (databaseKeywords.contains(keyword.toUpperCase())) {
|
||||||
keyword = escapeChar + keyword + escapeChar;
|
keyword = escapeChar + keyword + escapeChar;
|
||||||
}
|
}
|
||||||
keyword = keyword.toLowerCase();
|
keyword = keyword.toLowerCase();
|
||||||
|
Loading…
Reference in New Issue
Block a user