mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 19:50:29 +08:00
Merge pull request #1255 from sanChouIsACat/master
修复了由于数据库保留字是表名或者字段名时引起的sql错误
This commit is contained in:
commit
f68c4134d6
@ -32,19 +32,21 @@ public class OceanBaseReader extends Reader {
|
|||||||
if (userConfigedFetchSize != null) {
|
if (userConfigedFetchSize != null) {
|
||||||
LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead.");
|
LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead.");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
|
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
|
||||||
|
|
||||||
setDatabaseType(originalConfig);
|
setDatabaseType(originalConfig);
|
||||||
|
|
||||||
this.readerJob = new ReaderJob();
|
this.readerJob = new ReaderJob();
|
||||||
this.readerJob.init(this.originalConfig);
|
this.readerJob.init(this.originalConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
//ObReaderUtils.DATABASE_TYPE获取当前数据库的语法模式
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preCheck() {
|
public void preCheck() {
|
||||||
init();
|
init();
|
||||||
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.DATABASE_TYPE);
|
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.databaseType);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +72,7 @@ public class OceanBaseReader extends Reader {
|
|||||||
Configuration connConf = Configuration.from(conns.get(0).toString());
|
Configuration connConf = Configuration.from(conns.get(0).toString());
|
||||||
List<String> jdbcUrls = connConf.getList(Key.JDBC_URL, String.class);
|
List<String> jdbcUrls = connConf.getList(Key.JDBC_URL, String.class);
|
||||||
String jdbcUrl = jdbcUrls.get(0);
|
String jdbcUrl = jdbcUrls.get(0);
|
||||||
if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
|
if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
|
||||||
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
|
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
|
||||||
if (ss.length != 3) {
|
if (ss.length != 3) {
|
||||||
LOG.warn("unrecognized jdbc url: " + jdbcUrl);
|
LOG.warn("unrecognized jdbc url: " + jdbcUrl);
|
||||||
@ -85,9 +87,10 @@ public class OceanBaseReader extends Reader {
|
|||||||
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
|
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
|
||||||
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
|
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
|
||||||
if (ObReaderUtils.isOracleMode(compatibleMode)) {
|
if (ObReaderUtils.isOracleMode(compatibleMode)) {
|
||||||
ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase;
|
ObReaderUtils.compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||||
}
|
}
|
||||||
} catch (Exception e){
|
|
||||||
|
} catch (Exception e) {
|
||||||
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
|
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,13 +6,38 @@ import com.alibaba.datax.common.constant.CommonConstant;
|
|||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
|
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
|
||||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||||
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
public class ReaderJob extends CommonRdbmsReader.Job {
|
public class ReaderJob extends CommonRdbmsReader.Job {
|
||||||
|
private Logger LOG = LoggerFactory.getLogger(OceanBaseReader.Task.class);
|
||||||
|
|
||||||
public ReaderJob() {
|
public ReaderJob() {
|
||||||
super(ObReaderUtils.DATABASE_TYPE);
|
super(ObReaderUtils.databaseType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration originalConfig) {
|
||||||
|
//将config中的column和table中的关键字进行转义
|
||||||
|
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
|
||||||
|
ObReaderUtils.escapeDatabaseKeywords(columns);
|
||||||
|
originalConfig.set(Key.COLUMN, columns);
|
||||||
|
|
||||||
|
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, JSONObject.class);
|
||||||
|
for (int i = 0; i < conns.size(); i++) {
|
||||||
|
JSONObject conn = conns.get(i);
|
||||||
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
|
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
||||||
|
ObReaderUtils.escapeDatabaseKeywords(tables);
|
||||||
|
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
|
||||||
|
}
|
||||||
|
super.init(originalConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,13 +1,5 @@
|
|||||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
||||||
|
|
||||||
import java.sql.*;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
import com.alibaba.datax.common.element.Column;
|
||||||
import com.alibaba.datax.common.element.Record;
|
import com.alibaba.datax.common.element.Record;
|
||||||
import com.alibaba.datax.common.plugin.RecordSender;
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
@ -19,11 +11,17 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
|
|||||||
import com.alibaba.datax.plugin.rdbms.reader.Constant;
|
import com.alibaba.datax.plugin.rdbms.reader.Constant;
|
||||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class ReaderTask extends CommonRdbmsReader.Task {
|
public class ReaderTask extends CommonRdbmsReader.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
|
||||||
@ -41,11 +39,12 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
private boolean reuseConn = false;
|
private boolean reuseConn = false;
|
||||||
|
|
||||||
public ReaderTask(int taskGroupId, int taskId) {
|
public ReaderTask(int taskGroupId, int taskId) {
|
||||||
super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId);
|
super(ObReaderUtils.databaseType, taskGroupId, taskId);
|
||||||
this.taskGroupId = taskGroupId;
|
this.taskGroupId = taskGroupId;
|
||||||
this.taskId = taskId;
|
this.taskId = taskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void init(Configuration readerSliceConfig) {
|
public void init(Configuration readerSliceConfig) {
|
||||||
/* for database connection */
|
/* for database connection */
|
||||||
username = readerSliceConfig.getString(Key.USERNAME);
|
username = readerSliceConfig.getString(Key.USERNAME);
|
||||||
@ -54,7 +53,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND,
|
queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND,
|
||||||
Config.DEFAULT_QUERY_TIMEOUT_SECOND);
|
Config.DEFAULT_QUERY_TIMEOUT_SECOND);
|
||||||
// ob10的处理
|
// ob10的处理
|
||||||
if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
|
if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
|
||||||
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
|
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
|
||||||
if (ss.length == 3) {
|
if (ss.length == 3) {
|
||||||
LOG.info("this is ob1_0 jdbc url.");
|
LOG.info("this is ob1_0 jdbc url.");
|
||||||
@ -63,16 +62,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) {
|
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
||||||
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
if (ObReaderUtils.compatibleMode.equals(ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)) {
|
||||||
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||||
} else {
|
|
||||||
jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
|
||||||
}
|
}
|
||||||
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
||||||
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
||||||
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
|
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
|
||||||
LOG.info("retryLimit: "+ retryLimit);
|
LOG.info("retryLimit: " + retryLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void buildSavePoint(TaskContext context) {
|
private void buildSavePoint(TaskContext context) {
|
||||||
@ -83,7 +80,6 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* 如果isTableMode && table有PK
|
* 如果isTableMode && table有PK
|
||||||
* <p>
|
* <p>
|
||||||
* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游)
|
* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游)
|
||||||
@ -92,7 +88,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
|
public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
|
||||||
TaskPluginCollector taskPluginCollector, int fetchSize) {
|
TaskPluginCollector taskPluginCollector, int fetchSize) {
|
||||||
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
|
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
|
||||||
String table = readerSliceConfig.getString(Key.TABLE);
|
String table = readerSliceConfig.getString(Key.TABLE);
|
||||||
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl);
|
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl);
|
||||||
@ -131,14 +127,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender,
|
private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender,
|
||||||
TaskPluginCollector taskPluginCollector) {
|
TaskPluginCollector taskPluginCollector) {
|
||||||
// 不是table模式 直接使用原来的做法
|
// 不是table模式 直接使用原来的做法
|
||||||
if (!isTableMode) {
|
if (!isTableMode) {
|
||||||
doRead(recordSender, taskPluginCollector, context);
|
doRead(recordSender, taskPluginCollector, context);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// check primary key index
|
// check primary key index
|
||||||
Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
|
Connection conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
|
||||||
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
|
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
|
||||||
context.setConn(conn);
|
context.setConn(conn);
|
||||||
try {
|
try {
|
||||||
@ -184,11 +180,11 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (retryLimit == ++retryCount) {
|
if (retryLimit == ++retryCount) {
|
||||||
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e),
|
throw RdbmsException.asQueryException(ObReaderUtils.databaseType, new Exception(e),
|
||||||
context.getQuerySql(), context.getTable(), username);
|
context.getQuerySql(), context.getTable(), username);
|
||||||
}
|
}
|
||||||
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
|
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
|
||||||
context.getSavePoint() + ", error: "+ e.getMessage());
|
context.getSavePoint() + ", error: " + e.getMessage());
|
||||||
ObReaderUtils.sleep(60000); // sleep 10s
|
ObReaderUtils.sleep(60000); // sleep 10s
|
||||||
}
|
}
|
||||||
// 假如原来的查询有查出数据,则改成增量查询
|
// 假如原来的查询有查出数据,则改成增量查询
|
||||||
@ -227,7 +223,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
LOG.info("connection is alive, will reuse this connection.");
|
LOG.info("connection is alive, will reuse this connection.");
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Create new connection for reader.");
|
LOG.info("Create new connection for reader.");
|
||||||
conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
|
conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
|
||||||
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
|
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
|
||||||
context.setConn(conn);
|
context.setConn(conn);
|
||||||
}
|
}
|
||||||
@ -287,7 +283,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
ObReaderUtils.close(null, null, context.getConn());
|
ObReaderUtils.close(null, null, context.getConn());
|
||||||
context.setConn(null);
|
context.setConn(null);
|
||||||
LOG.error("reader data fail", e);
|
LOG.error("reader data fail", e);
|
||||||
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(),
|
throw RdbmsException.asQueryException(ObReaderUtils.databaseType, e, context.getQuerySql(),
|
||||||
context.getTable(), username);
|
context.getTable(), username);
|
||||||
} finally {
|
} finally {
|
||||||
perfRecord.end();
|
perfRecord.end();
|
||||||
|
File diff suppressed because one or more lines are too long
@ -162,6 +162,7 @@ public class TaskContext {
|
|||||||
public String getUserSavePoint() {
|
public String getUserSavePoint() {
|
||||||
return userSavePoint;
|
return userSavePoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setUserSavePoint(String userSavePoint) {
|
public void setUserSavePoint(String userSavePoint) {
|
||||||
this.userSavePoint = userSavePoint;
|
this.userSavePoint = userSavePoint;
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,5 @@
|
|||||||
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
|
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
|
||||||
|
|
||||||
import java.sql.*;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
@ -20,7 +10,16 @@ import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
|||||||
import com.alibaba.datax.plugin.rdbms.writer.Key;
|
import com.alibaba.datax.plugin.rdbms.writer.Key;
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
|
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 2016-04-07
|
* 2016-04-07
|
||||||
@ -60,6 +59,19 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
public void init() {
|
public void init() {
|
||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
checkCompatibleMode(originalConfig);
|
checkCompatibleMode(originalConfig);
|
||||||
|
//将config中的column和table中的关键字进行转义
|
||||||
|
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
|
||||||
|
ObWriterUtils.escapeDatabaseKeywords(columns);
|
||||||
|
originalConfig.set(Key.COLUMN, columns);
|
||||||
|
|
||||||
|
List<JSONObject> conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
|
||||||
|
for (int i = 0; i < conns.size(); i++) {
|
||||||
|
JSONObject conn = conns.get(i);
|
||||||
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
|
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
||||||
|
ObWriterUtils.escapeDatabaseKeywords(tables);
|
||||||
|
originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), tables);
|
||||||
|
}
|
||||||
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
||||||
this.commonJob.init(this.originalConfig);
|
this.commonJob.init(this.originalConfig);
|
||||||
}
|
}
|
||||||
@ -222,6 +234,7 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
/**
|
/**
|
||||||
* 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
|
* 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void startWrite(RecordReceiver recordReceiver) {
|
public void startWrite(RecordReceiver recordReceiver) {
|
||||||
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
|
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,27 @@
|
|||||||
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
|
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
||||||
|
import com.alipay.oceanbase.obproxy.data.TableEntryKey;
|
||||||
|
import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
//import java.sql.PreparedStatement;
|
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -16,27 +36,7 @@ import java.util.concurrent.locks.Condition;
|
|||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
//import java.sql.PreparedStatement;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
|
||||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
|
||||||
import com.alipay.oceanbase.obproxy.data.TableEntryKey;
|
|
||||||
import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
|
|
||||||
|
|
||||||
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
|
||||||
@ -62,6 +62,7 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
private ObPartitionIdCalculator partCalculator = null;
|
private ObPartitionIdCalculator partCalculator = null;
|
||||||
|
|
||||||
private HashMap<Long, List<Record>> groupInsertValues;
|
private HashMap<Long, List<Record>> groupInsertValues;
|
||||||
|
List<Record> unknownPartRecords = new ArrayList<Record>();
|
||||||
// private List<Record> unknownPartRecords;
|
// private List<Record> unknownPartRecords;
|
||||||
private List<Integer> partitionKeyIndexes;
|
private List<Integer> partitionKeyIndexes;
|
||||||
|
|
||||||
@ -104,10 +105,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
connectInfo.getFullUserName(), connectInfo.password);
|
connectInfo.getFullUserName(), connectInfo.password);
|
||||||
checkConnHolder.initConnection();
|
checkConnHolder.initConnection();
|
||||||
if (isOracleCompatibleMode) {
|
if (isOracleCompatibleMode) {
|
||||||
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
||||||
table = table.toUpperCase();
|
//在转义的情况下不翻译
|
||||||
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
if (!(table.startsWith("\"") && table.endsWith("\""))) {
|
||||||
connectInfo.databaseName, table));
|
table = table.toUpperCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
||||||
|
connectInfo.databaseName, table));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
|
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
|
||||||
@ -289,20 +294,15 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void addLeftRecords() {
|
private void addLeftRecords() {
|
||||||
|
//不需要刷新Cache,已经是最后一批数据了
|
||||||
for (List<Record> groupValues : groupInsertValues.values()) {
|
for (List<Record> groupValues : groupInsertValues.values()) {
|
||||||
if (groupValues.size() > 0 ) {
|
if (groupValues.size() > 0 ) {
|
||||||
int retry = 0;
|
addRecordsToWriteQueue(groupValues);
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
concurrentWriter.addBatchRecords(groupValues);
|
|
||||||
break;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
retry++;
|
|
||||||
LOG.info("Concurrent table writer is interrupted, retry {}", retry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (unknownPartRecords.size() > 0) {
|
||||||
|
addRecordsToWriteQueue(unknownPartRecords);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addRecordToCache(final Record record) {
|
private void addRecordToCache(final Record record) {
|
||||||
@ -326,41 +326,40 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
}
|
}
|
||||||
groupValues.add(record);
|
groupValues.add(record);
|
||||||
if (groupValues.size() >= batchSize) {
|
if (groupValues.size() >= batchSize) {
|
||||||
int i = 0;
|
groupValues = addRecordsToWriteQueue(groupValues);
|
||||||
while (true) {
|
|
||||||
if (i > 0) {
|
|
||||||
LOG.info("retry add batch record the {} times", i);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
concurrentWriter.addBatchRecords(groupValues);
|
|
||||||
printEveryTime();
|
|
||||||
break;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.info("Concurrent table writer is interrupted");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
groupValues = new ArrayList<Record>(batchSize);
|
|
||||||
groupInsertValues.put(partId, groupValues);
|
groupInsertValues.put(partId, groupValues);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("add unknown part record {}", record);
|
LOG.debug("add unknown part record {}", record);
|
||||||
List<Record> unknownPartRecords = new ArrayList<Record>();
|
|
||||||
unknownPartRecords.add(record);
|
unknownPartRecords.add(record);
|
||||||
int i = 0;
|
if (unknownPartRecords.size() >= batchSize) {
|
||||||
while (true) {
|
unknownPartRecords = addRecordsToWriteQueue(unknownPartRecords);
|
||||||
if (i > 0) {
|
|
||||||
LOG.info("retry add batch record the {} times", i);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
concurrentWriter.addBatchRecords(unknownPartRecords);
|
|
||||||
break;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.info("Concurrent table writer is interrupted");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param records
|
||||||
|
* @return 返回一个新的Cache用于存储接下来的数据
|
||||||
|
*/
|
||||||
|
private List<Record> addRecordsToWriteQueue(List<Record> records) {
|
||||||
|
int i = 0;
|
||||||
|
while (true) {
|
||||||
|
if (i > 0) {
|
||||||
|
LOG.info("retry add batch record the {} times", i);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
concurrentWriter.addBatchRecords(records);
|
||||||
|
break;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
i++;
|
||||||
|
LOG.info("Concurrent table writer is interrupted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ArrayList<Record>(batchSize);
|
||||||
|
}
|
||||||
private void checkMemStore() {
|
private void checkMemStore() {
|
||||||
Connection checkConn = checkConnHolder.reconnect();
|
Connection checkConn = checkConnHolder.reconnect();
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue
Block a user