修复了由于SQL中的保留字作为表名或者字段名而引起的sql错误

This commit is contained in:
sanChouIsACat 2021-12-22 19:54:38 +08:00
parent 4e916b0f4b
commit 738c11e0e3
2 changed files with 44 additions and 45 deletions

View File

@ -37,20 +37,17 @@ public class OceanBaseReader extends Reader {
if (userConfigedFetchSize != null) { if (userConfigedFetchSize != null) {
LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead."); LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead.");
} }
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE); this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
setDatabaseType(originalConfig); setDatabaseType(originalConfig);
this.readerJob = new ReaderJob(); this.readerJob = new ReaderJob();
this.readerJob.init(this.originalConfig, DATABASE_TYPE); this.readerJob.init(this.originalConfig, DATABASE_TYPE);
} }
@Override @Override
public void prepare() { public void prepare() {
//ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式 //ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式
} }
@Override @Override
public void preCheck() { public void preCheck() {
init(); init();
@ -97,10 +94,10 @@ public class OceanBaseReader extends Reader {
if (ObReaderUtils.isOracleMode(compatibleMode)) { if (ObReaderUtils.isOracleMode(compatibleMode)) {
ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase; ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase;
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage()); LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
} } finally {
finally {
DATABASE_TYPE = ObReaderUtils.DATABASE_TYPE; DATABASE_TYPE = ObReaderUtils.DATABASE_TYPE;
} }
} }

View File

@ -1,24 +1,31 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
//java api //java api
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
//dataX api //dataX api
import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
//该类用于转义数据库中的关键字 /**
* 该类用于转义数据库中的关键字
*
* @author:qianzhang
*/
public class DatabaseKeywordTransformer { public class DatabaseKeywordTransformer {
private static DataBaseType databaseType; private static DataBaseType databaseType;
static HashSet<String> databaseKeywords; static HashSet<String> databaseKeywords;
private static HashSet<String> keywordsFromString2HashSet(final String Keywords){
String[] keywordArray =Keywords.split(","); private static HashSet<String> keywordsFromString2HashSet(final String keywords) {
String[] keywordArray = keywords.split(",");
HashSet<String> res = new HashSet<String>(); HashSet<String> res = new HashSet<String>();
for (String keyword : keywordArray) { for (String keyword : keywordArray) {
res.add(keyword); res.add(keyword);
} }
return res; return res;
} }
public static void setDatabaseType(final DataBaseType databaseType) throws Exception { public static void setDatabaseType(final DataBaseType databaseType) throws Exception {
if (databaseType == DatabaseKeywordTransformer.databaseType) { if (databaseType == DatabaseKeywordTransformer.databaseType) {
return; return;
@ -26,14 +33,11 @@ public class DatabaseKeywordTransformer {
DatabaseKeywordTransformer.databaseType = databaseType; DatabaseKeywordTransformer.databaseType = databaseType;
if (databaseType == DataBaseType.MySql) { if (databaseType == DataBaseType.MySql) {
databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.MYSQL_KEYWORDS); databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.MYSQL_KEYWORDS);
} } else if (databaseType == DataBaseType.Oracle || databaseType == DataBaseType.OceanBase) {
else if(databaseType==DataBaseType.Oracle || databaseType==DataBaseType.OceanBase){
databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.ORACLE_KEYWORDS); databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.ORACLE_KEYWORDS);
} } else if (databaseType == DataBaseType.SQLServer) {
else if(databaseType==DataBaseType.SQLServer){
databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.SQLSERVER_KEYWORDS); databaseKeywords = keywordsFromString2HashSet(DatabaseKeywords.SQLSERVER_KEYWORDS);
} } else {
else{
throw new Exception("sorry,unknown database tpye..."); throw new Exception("sorry,unknown database tpye...");
} }
} }
@ -44,11 +48,9 @@ public class DatabaseKeywordTransformer {
if (databaseKeywords.contains(keyword)) { if (databaseKeywords.contains(keyword)) {
if (databaseType == DataBaseType.MySql) { if (databaseType == DataBaseType.MySql) {
keyword = '`' + keyword + '`'; keyword = '`' + keyword + '`';
} } else if (databaseType == DataBaseType.Oracle || databaseType == DataBaseType.OceanBase) {
else if(databaseType==DataBaseType.Oracle || databaseType==DataBaseType.OceanBase){
keyword = '"' + keyword + '"'; keyword = '"' + keyword + '"';
} } else if (databaseType == DataBaseType.SQLServer) {
else if(databaseType==DataBaseType.SQLServer){
keyword = '[' + keyword + ']'; keyword = '[' + keyword + ']';
} }
} }