mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 03:40:24 +08:00
同时修改了reader和writer,同时将DatabaseKeywordTransformer类中的功能合并到writer(reader)utils中
This commit is contained in:
parent
19a44e8d9d
commit
55351fcbaa
@ -46,7 +46,7 @@ public class OceanBaseReader extends Reader {
|
|||||||
@Override
|
@Override
|
||||||
public void preCheck() {
|
public void preCheck() {
|
||||||
init();
|
init();
|
||||||
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.compatibleMode);
|
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.databaseType);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,8 +87,7 @@ public class OceanBaseReader extends Reader {
|
|||||||
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
|
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
|
||||||
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
|
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
|
||||||
if (ObReaderUtils.isOracleMode(compatibleMode)) {
|
if (ObReaderUtils.isOracleMode(compatibleMode)) {
|
||||||
ObReaderUtils.compatibleMode = DataBaseType.Oracle;
|
ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||||
ObReaderUtils.databaseType = DataBaseType.OceanBase;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -41,7 +41,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
private boolean reuseConn = false;
|
private boolean reuseConn = false;
|
||||||
|
|
||||||
public ReaderTask(int taskGroupId, int taskId) {
|
public ReaderTask(int taskGroupId, int taskId) {
|
||||||
super(ObReaderUtils.compatibleMode, taskGroupId, taskId);
|
super(ObReaderUtils.databaseType, taskGroupId, taskId);
|
||||||
this.taskGroupId = taskGroupId;
|
this.taskGroupId = taskGroupId;
|
||||||
this.taskId = taskId;
|
this.taskId = taskId;
|
||||||
}
|
}
|
||||||
@ -184,7 +184,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (retryLimit == ++retryCount) {
|
if (retryLimit == ++retryCount) {
|
||||||
throw RdbmsException.asQueryException(ObReaderUtils.compatibleMode, new Exception(e),
|
throw RdbmsException.asQueryException(ObReaderUtils.databaseType, new Exception(e),
|
||||||
context.getQuerySql(), context.getTable(), username);
|
context.getQuerySql(), context.getTable(), username);
|
||||||
}
|
}
|
||||||
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
|
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
|
||||||
@ -287,7 +287,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
ObReaderUtils.close(null, null, context.getConn());
|
ObReaderUtils.close(null, null, context.getConn());
|
||||||
context.setConn(null);
|
context.setConn(null);
|
||||||
LOG.error("reader data fail", e);
|
LOG.error("reader data fail", e);
|
||||||
throw RdbmsException.asQueryException(ObReaderUtils.compatibleMode, e, context.getQuerySql(),
|
throw RdbmsException.asQueryException(ObReaderUtils.databaseType, e, context.getQuerySql(),
|
||||||
context.getTable(), username);
|
context.getTable(), username);
|
||||||
} finally {
|
} finally {
|
||||||
perfRecord.end();
|
perfRecord.end();
|
||||||
|
File diff suppressed because one or more lines are too long
@ -40,14 +40,14 @@ public class ObReaderUtils {
|
|||||||
private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
|
private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
|
||||||
|
|
||||||
private static Set<String> databaseKeywords;
|
private static Set<String> databaseKeywords;
|
||||||
private static DataBaseType currentKeywordsTpye;
|
private static String currentKeywordsTpye;
|
||||||
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
|
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
|
||||||
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
|
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
|
||||||
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
|
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
|
||||||
|
|
||||||
public static DataBaseType compatibleMode = DataBaseType.MySql;
|
public static String compatibleMode = OB_COMPATIBLE_MODE_MYSQL;
|
||||||
|
|
||||||
public static DataBaseType databaseType = DataBaseType.MySql;
|
public static final DataBaseType databaseType = DataBaseType.OceanBase;
|
||||||
|
|
||||||
|
|
||||||
private static Set<String> keywordsFromString2HashSet(final String keywords) {
|
private static Set<String> keywordsFromString2HashSet(final String keywords) {
|
||||||
|
@ -6,6 +6,7 @@ import java.util.List;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -60,6 +61,19 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
public void init() {
|
public void init() {
|
||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
checkCompatibleMode(originalConfig);
|
checkCompatibleMode(originalConfig);
|
||||||
|
//将config中的column和table中的关键字进行转义
|
||||||
|
List<String> columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, String.class);
|
||||||
|
ObWriterUtils.transferDatabaseKeywords(columns);
|
||||||
|
originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Key.COLUMN, columns);
|
||||||
|
|
||||||
|
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
|
||||||
|
for (int i = 0; i < conns.size(); i++) {
|
||||||
|
JSONObject conn = conns.get(i);
|
||||||
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
|
List<String> tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Key.TABLE, String.class);
|
||||||
|
ObWriterUtils.transferDatabaseKeywords(tables);
|
||||||
|
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.reader.Key.TABLE), tables);
|
||||||
|
}
|
||||||
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
||||||
this.commonJob.init(this.originalConfig);
|
this.commonJob.init(this.originalConfig);
|
||||||
}
|
}
|
||||||
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue
Block a user