优化了代码的一部分

This commit is contained in:
sanChouIsACat 2021-12-23 18:10:08 +08:00
parent b017102e0e
commit 19a44e8d9d
5 changed files with 94 additions and 86 deletions

View File

@ -1,12 +1,8 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader;
import java.sql.Array;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,7 +22,6 @@ public class OceanBaseReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig = null;
private ReaderJob readerJob;
private DataBaseType DATABASE_TYPE;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
@ -40,7 +35,7 @@ public class OceanBaseReader extends Reader {
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
setDatabaseType(originalConfig);
this.readerJob = new ReaderJob();
this.readerJob.init(this.originalConfig, DATABASE_TYPE);
this.readerJob.init(this.originalConfig);
}
@Override
@ -51,7 +46,7 @@ public class OceanBaseReader extends Reader {
@Override
public void preCheck() {
init();
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.DATABASE_TYPE);
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.compatibleMode);
}
@ -92,13 +87,12 @@ public class OceanBaseReader extends Reader {
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
if (ObReaderUtils.isOracleMode(compatibleMode)) {
ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase;
ObReaderUtils.compatibleMode = DataBaseType.Oracle;
ObReaderUtils.databaseType = DataBaseType.OceanBase;
}
} catch (Exception e) {
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
} finally {
DATABASE_TYPE = ObReaderUtils.DATABASE_TYPE;
}
}
}

View File

@ -9,7 +9,6 @@ import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.DatabaseKeywordTransformer;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
@ -20,19 +19,14 @@ public class ReaderJob extends CommonRdbmsReader.Job {
private Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class);
public ReaderJob() {
super(ObReaderUtils.DATABASE_TYPE);
super(ObReaderUtils.databaseType);
}
public void init(Configuration originalConfig, DataBaseType databaseType) {
@Override
public void init(Configuration originalConfig) {
//将config中的column和table中的关键字进行转义
try {
DatabaseKeywordTransformer.setDatabaseType(databaseType);
} catch (Exception e) {
LOG.warn("database type is " + databaseType + e.getMessage());
}
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(columns);
ObReaderUtils.transferDatabaseKeywords(columns);
originalConfig.set(Key.COLUMN, columns);
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
@ -40,7 +34,7 @@ public class ReaderJob extends CommonRdbmsReader.Job {
JSONObject conn = conns.get(i);
Configuration connConfig = Configuration.from(conn.toString());
List<String> tables = connConfig.getList(Key.TABLE, String.class);
DatabaseKeywordTransformer.transferDatabaseKeywords(tables);
ObReaderUtils.transferDatabaseKeywords(tables);
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
}
super.init(originalConfig);

View File

@ -41,11 +41,12 @@ public class ReaderTask extends CommonRdbmsReader.Task {
private boolean reuseConn = false;
public ReaderTask(int taskGroupId, int taskId) {
super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId);
super(ObReaderUtils.compatibleMode, taskGroupId, taskId);
this.taskGroupId = taskGroupId;
this.taskId = taskId;
}
@Override
public void init(Configuration readerSliceConfig) {
/* for database connection */
username = readerSliceConfig.getString(Key.USERNAME);
@ -54,7 +55,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND,
Config.DEFAULT_QUERY_TIMEOUT_SECOND);
// ob10的处理
if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length == 3) {
LOG.info("this is ob1_0 jdbc url.");
@ -63,7 +64,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
}
if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) {
if (ObReaderUtils.databaseType == DataBaseType.OceanBase) {
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
} else {
@ -72,7 +73,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
LOG.info("retryLimit: "+ retryLimit);
LOG.info("retryLimit: " + retryLimit);
}
private void buildSavePoint(TaskContext context) {
@ -83,7 +84,6 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
/**
*
* 如果isTableMode && table有PK
* <p>
* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游)
@ -92,7 +92,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
*/
@Override
public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl);
@ -131,14 +131,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender,
TaskPluginCollector taskPluginCollector) {
TaskPluginCollector taskPluginCollector) {
// 不是table模式 直接使用原来的做法
if (!isTableMode) {
doRead(recordSender, taskPluginCollector, context);
return;
}
// check primary key index
Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
Connection conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
try {
@ -184,11 +184,11 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
} catch (Throwable e) {
if (retryLimit == ++retryCount) {
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e),
throw RdbmsException.asQueryException(ObReaderUtils.compatibleMode, new Exception(e),
context.getQuerySql(), context.getTable(), username);
}
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
context.getSavePoint() + ", error: "+ e.getMessage());
context.getSavePoint() + ", error: " + e.getMessage());
ObReaderUtils.sleep(60000); // sleep 10s
}
// 假如原来的查询有查出数据,则改成增量查询
@ -227,7 +227,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
LOG.info("connection is alive, will reuse this connection.");
} else {
LOG.info("Create new connection for reader.");
conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
}
@ -287,7 +287,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
ObReaderUtils.close(null, null, context.getConn());
context.setConn(null);
LOG.error("reader data fail", e);
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(),
throw RdbmsException.asQueryException(ObReaderUtils.compatibleMode, e, context.getQuerySql(),
context.getTable(), username);
} finally {
perfRecord.end();

View File

@ -162,6 +162,7 @@ public class TaskContext {
public String getUserSavePoint() {
return userSavePoint;
}
public void setUserSavePoint(String userSavePoint) {
this.userSavePoint = userSavePoint;
}