add oceanbase plugins

This commit is contained in:
江煦 2021-05-20 17:22:58 +08:00
parent 6c3fb66711
commit d3bd2c5fcf
34 changed files with 4043 additions and 9 deletions

View File

@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oceanbasev10reader</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/oceanbasev10reader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>oceanbasev10reader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/oceanbasev10reader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/oceanbasev10reader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,16 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader;
public interface Config {
// queryTimeoutSecond
String QUERY_TIMEOUT_SECOND = "memstoreCheckIntervalSecond";
int DEFAULT_QUERY_TIMEOUT_SECOND = 60 * 60 * 48;// 2天
// readBatchSize
String READ_BATCH_SIZE = "readBatchSize";
int DEFAULT_READ_BATCH_SIZE = 100000;// 10万
String RETRY_LIMIT = "retryLimit";
int DEFAULT_RETRY_LIMIT = 10;
}

View File

@ -0,0 +1,127 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader;
import java.sql.Connection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderJob;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderTask;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
public class OceanBaseReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig = null;
private ReaderJob readerJob;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if (userConfigedFetchSize != null) {
LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead.");
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
setDatabaseType(originalConfig);
this.readerJob = new ReaderJob();
this.readerJob.init(this.originalConfig);
}
@Override
public void preCheck() {
init();
this.readerJob.preCheck(this.originalConfig, ObReaderUtils.DATABASE_TYPE);
}
@Override
public List<Configuration> split(int adviceNumber) {
return this.readerJob.split(this.originalConfig, adviceNumber);
}
@Override
public void post() {
this.readerJob.post(this.originalConfig);
}
@Override
public void destroy() {
this.readerJob.destroy(this.originalConfig);
}
private void setDatabaseType(Configuration config) {
String username = config.getString(Key.USERNAME);
String password = config.getString(Key.PASSWORD);
List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
Configuration connConf = Configuration.from(conns.get(0).toString());
List<String> jdbcUrls = connConf.getList(Key.JDBC_URL, String.class);
String jdbcUrl = jdbcUrls.get(0);
if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length != 3) {
LOG.warn("unrecognized jdbc url: " + jdbcUrl);
return;
}
username = ss[1].trim() + ":" + username;
jdbcUrl = ss[2];
}
// Use ob-client to get compatible mode.
try {
String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:");
Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password);
String compatibleMode = ObReaderUtils.getCompatibleMode(conn);
if (ObReaderUtils.isOracleMode(compatibleMode)) {
ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase;
}
} catch (Exception e){
LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage());
}
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfig;
private ReaderTask commonRdbmsReaderTask;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.commonRdbmsReaderTask = new ReaderTask(super.getTaskGroupId(), super.getTaskId());
this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, super.getTaskPluginCollector(),
fetchSize);
}
@Override
public void post() {
this.commonRdbmsReaderTask.post(this.readerSliceConfig);
}
@Override
public void destroy() {
this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
}
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
import java.util.List;
import com.alibaba.datax.common.constant.CommonConstant;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
public class ReaderJob extends CommonRdbmsReader.Job {
public ReaderJob() {
super(ObReaderUtils.DATABASE_TYPE);
}
@Override
public List<Configuration> split(Configuration originalConfig, int adviceNumber) {
List<Configuration> list = super.split(originalConfig, adviceNumber);
for (Configuration config : list) {
String jdbcUrl = config.getString(Key.JDBC_URL);
String obRegionName = getObRegionName(jdbcUrl);
config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName);
}
return list;
}
private String getObRegionName(String jdbcUrl) {
if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length >= 2) {
String tenant = ss[1].trim();
String[] sss = tenant.split(":");
return sss[0];
}
}
return null;
}
}

View File

@ -0,0 +1,301 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
public class ReaderTask extends CommonRdbmsReader.Task {
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
private int taskGroupId = -1;
private int taskId = -1;
private String username;
private String password;
private String jdbcUrl;
private String mandatoryEncoding;
private int queryTimeoutSeconds;// 查询超时 默认48小时
private int readBatchSize;
private int retryLimit = 0;
private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL;
private boolean reuseConn = false;
public ReaderTask(int taskGroupId, int taskId) {
super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId);
this.taskGroupId = taskGroupId;
this.taskId = taskId;
}
public void init(Configuration readerSliceConfig) {
/* for database connection */
username = readerSliceConfig.getString(Key.USERNAME);
password = readerSliceConfig.getString(Key.PASSWORD);
jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL);
queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND,
Config.DEFAULT_QUERY_TIMEOUT_SECOND);
// ob10的处理
if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length == 3) {
LOG.info("this is ob1_0 jdbc url.");
username = ss[1].trim() + ":" + username;
jdbcUrl = ss[2];
}
}
if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) {
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
} else {
jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
}
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
LOG.info("retryLimit: "+ retryLimit);
}
private void buildSavePoint(TaskContext context) {
if (!ObReaderUtils.isUserSavePointValid(context)) {
LOG.info("user save point is not valid, set to null.");
context.setUserSavePoint(null);
}
}
/**
*
* 如果isTableMode && table有PK
* <p>
* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游)
* <p>
* 否则,则使用旧模式
*/
@Override
public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl);
List<String> columns = readerSliceConfig.getList(Key.COLUMN_LIST, String.class);
String where = readerSliceConfig.getString(Key.WHERE);
boolean weakRead = readerSliceConfig.getBool(Key.WEAK_READ, true); // default true, using weak read
String userSavePoint = readerSliceConfig.getString(Key.SAVE_POINT, null);
reuseConn = readerSliceConfig.getBool(Key.REUSE_CONN, false);
String partitionName = readerSliceConfig.getString(Key.PARTITION_NAME, null);
// 从配置文件中取readBatchSize,若无则用默认值
readBatchSize = readerSliceConfig.getInt(Config.READ_BATCH_SIZE, Config.DEFAULT_READ_BATCH_SIZE);
// 不能少于1万
if (readBatchSize < 10000) {
readBatchSize = 10000;
}
TaskContext context = new TaskContext(table, columns, where, fetchSize);
context.setQuerySql(querySql);
context.setWeakRead(weakRead);
context.setCompatibleMode(compatibleMode);
if (partitionName != null) {
context.setPartitionName(partitionName);
}
// Add the user save point into the context
context.setUserSavePoint(userSavePoint);
PerfRecord allPerf = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allPerf.start();
boolean isTableMode = readerSliceConfig.getBool(Constant.IS_TABLE_MODE);
try {
startRead0(isTableMode, context, recordSender, taskPluginCollector);
} finally {
ObReaderUtils.close(null, null, context.getConn());
}
allPerf.end(context.getCost());
// 目前大盘是依赖这个打印而之前这个Finish read record是包含了sql查询和result next的全部时间
LOG.info("finished read record by Sql: [{}\n] {}.", context.getQuerySql(), jdbcUrl);
}
private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender,
TaskPluginCollector taskPluginCollector) {
// 不是table模式 直接使用原来的做法
if (!isTableMode) {
doRead(recordSender, taskPluginCollector, context);
return;
}
// check primary key index
Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
try {
ObReaderUtils.initIndex(conn, context);
ObReaderUtils.matchPkIndexs(conn, context);
} catch (Throwable e) {
LOG.warn("fetch PkIndexs fail,table=" + context.getTable(), e);
}
// 如果不是table pk不存在 则仍然使用原来的做法
if (context.getPkIndexs() == null) {
doRead(recordSender, taskPluginCollector, context);
return;
}
// setup the user defined save point
buildSavePoint(context);
// 从这里开始就是 断点续读功能
// while(true) {
// 正常读 ( order by pk asc)
// 如果遇到失败,分两种情况:
// a)已读出记录,则开始走增量读逻辑
// b)未读出记录,则走正常读逻辑(仍然需要order by pk asc)
// 正常结束 break
// }
context.setReadBatchSize(readBatchSize);
String getFirstQuerySql = ObReaderUtils.buildFirstQuerySql(context);
String appendQuerySql = ObReaderUtils.buildAppendQuerySql(conn, context);
LOG.warn("start table scan key : {}", context.getIndexName() == null ? "primary" : context.getIndexName());
context.setQuerySql(getFirstQuerySql);
boolean firstQuery = true;
// 原来打算firstQuery时 limit 1 减少
// 后来经过对比发现其实是多余的,因为:
// 1.假如走gmt_modified辅助索引,则直接索引扫描 不需要topN的order by
// 2.假如不走辅助索引,而是pk table scan,则减少排序规模并没有好处,因为下一次仍然要排序
// 减少这个多余的优化tip 可以让代码更易读
int retryCount = 0;
while (true) {
try {
boolean finish = doRead(recordSender, taskPluginCollector, context);
if (finish) {
break;
}
} catch (Throwable e) {
if (retryLimit == ++retryCount) {
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e),
context.getQuerySql(), context.getTable(), username);
}
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
context.getSavePoint() + ", error: "+ e.getMessage());
ObReaderUtils.sleep(60000); // sleep 10s
}
// 假如原来的查询有查出数据,则改成增量查询
if (firstQuery && context.getPkIndexs() != null && context.getSavePoint() != null) {
context.setQuerySql(appendQuerySql);
firstQuery = false;
}
}
DBUtil.closeDBResources(null, context.getConn());
}
private boolean isConnectionAlive(Connection conn) {
if (conn == null) {
return false;
}
Statement stmt = null;
ResultSet rs = null;
String sql = "select 1" + (compatibleMode == ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE ? " from dual" : "");
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
rs.next();
} catch (Exception ex) {
LOG.info("connection is not alive: " + ex.getMessage());
return false;
} finally {
DBUtil.closeDBResources(rs, stmt, null);
}
return true;
}
private boolean doRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector, TaskContext context) {
LOG.info("exe sql: {}", context.getQuerySql());
Connection conn = context.getConn();
if (reuseConn && isConnectionAlive(conn)) {
LOG.info("connection is alive, will reuse this connection.");
} else {
LOG.info("Create new connection for reader.");
conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
}
PreparedStatement ps = null;
ResultSet rs = null;
PerfRecord perfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);
perfRecord.start();
try {
ps = conn.prepareStatement(context.getQuerySql(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (context.getPkIndexs() != null && context.getSavePoint() != null) {
Record savePoint = context.getSavePoint();
List<Column> point = ObReaderUtils.buildPoint(savePoint, context.getPkIndexs());
ObReaderUtils.binding(ps, point);
if (LOG.isWarnEnabled()) {
List<String> pointForLog = new ArrayList<String>();
for (Column c : point) {
pointForLog.add(c.asString());
}
LOG.warn("{} save point : {}", context.getTable(), StringUtils.join(pointForLog, ','));
}
}
// 打开流式接口
ps.setFetchSize(context.getFetchSize());
rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
int columnNumber = metaData.getColumnCount();
long lastTime = System.nanoTime();
int count = 0;
for (; rs.next(); count++) {
context.addCost(System.nanoTime() - lastTime);
Record row = buildRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding,
taskPluginCollector);
// // 如果第一个record重复了,则不需要发送
// if (count == 0 &&
// ObReaderUtils.isPkEquals(context.getSavePoint(), row,
// context.getPkIndexs())) {
// continue;
// }
// 如果是querySql
if (context.getTransferColumnNumber() == -1
|| row.getColumnNumber() == context.getTransferColumnNumber()) {
recordSender.sendToWriter(row);
} else {
Record newRow = recordSender.createRecord();
for (int i = 0; i < context.getTransferColumnNumber(); i++) {
newRow.addColumn(row.getColumn(i));
}
recordSender.sendToWriter(newRow);
}
context.setSavePoint(row);
lastTime = System.nanoTime();
}
LOG.info("end of sql: {}, " + count + "rows are read.", context.getQuerySql());
return context.getReadBatchSize() <= 0 || count < readBatchSize;
} catch (Exception e) {
ObReaderUtils.close(null, null, context.getConn());
context.setConn(null);
LOG.error("reader data fail", e);
throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(),
context.getTable(), username);
} finally {
perfRecord.end();
if (reuseConn) {
ObReaderUtils.close(rs, ps, null);
} else {
ObReaderUtils.close(rs, ps, conn);
}
}
}
}

View File

@ -0,0 +1,697 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
public class ObReaderUtils {
private static final Logger LOG = LoggerFactory.getLogger(ObReaderUtils.class);
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
public static DataBaseType DATABASE_TYPE = DataBaseType.MySql;
public static void initConn4Reader(Connection conn, long queryTimeoutSeconds) {
String setQueryTimeout = "set ob_query_timeout=" + (queryTimeoutSeconds * 1000 * 1000L);
String setTrxTimeout = "set ob_trx_timeout=" + ((queryTimeoutSeconds + 5) * 1000 * 1000L);
Statement stmt = null;
try {
conn.setAutoCommit(true);
stmt = conn.createStatement();
stmt.execute(setQueryTimeout);
stmt.execute(setTrxTimeout);
LOG.warn("setAutoCommit=true;"+setQueryTimeout+";"+setTrxTimeout+";");
} catch (Throwable e) {
LOG.warn("initConn4Reader fail", e);
} finally {
DBUtil.closeDBResources(stmt, null);
}
}
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
}
}
/**
*
* @param conn
* @param context
*/
public static void matchPkIndexs(Connection conn, TaskContext context) {
String[] pkColumns = getPkColumns(conn, context);
if (ArrayUtils.isEmpty(pkColumns)) {
LOG.warn("table=" + context.getTable() + " has no primary key");
return;
}
List<String> columns = context.getColumns();
// 最后参与排序的索引列
context.setPkColumns(pkColumns);
int[] pkIndexs = new int[pkColumns.length];
for (int i = 0, n = pkColumns.length; i < n; i++) {
String pkc = pkColumns[i];
int j = 0;
for (int k = columns.size(); j < k; j++) {
// 如果用户定义的 columns中 带有 ``,也不影响,
// 最多只是在select里多加了几列PK column
if (StringUtils.equalsIgnoreCase(pkc, columns.get(j))) {
pkIndexs[i] = j;
break;
}
}
// 到这里 说明主键列不在columns中,则主动追加到尾部
if (j == columns.size()) {
columns.add(pkc);
pkIndexs[i] = columns.size() - 1;
}
}
context.setPkIndexs(pkIndexs);
}
private static String[] getPkColumns(Connection conn, TaskContext context) {
String tableName = context.getTable();
String sql = "show index from " + tableName + " where Key_name='PRIMARY'";
if (isOracleMode(context.getCompatibleMode())) {
tableName = tableName.toUpperCase();
sql = "SELECT cols.column_name Column_name "+
"FROM all_constraints cons, all_cons_columns cols " +
"WHERE cols.table_name = '" + tableName+ "' AND cons.constraint_type = 'P' " +
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
}
LOG.info("get primary key by sql: " + sql);
Statement ps = null;
ResultSet rs = null;
List<String> realIndex = new ArrayList<String>();
realIndex.addAll(context.getSecondaryIndexColumns());
try {
ps = conn.createStatement();
rs = ps.executeQuery(sql);
while (rs.next()) {
String columnName = StringUtils.lowerCase(rs.getString("Column_name"));
if (!realIndex.contains(columnName)) {
realIndex.add(columnName);
}
}
String[] pks = new String[realIndex.size()];
realIndex.toArray(pks);
return pks;
} catch (Throwable e) {
LOG.error("show index from table fail :" + sql, e);
} finally {
close(rs, ps, null);
}
return null;
}
/**
* 首次查的SQL
*
* @param context
* @return
*/
public static String buildFirstQuerySql(TaskContext context) {
String userSavePoint = context.getUserSavePoint();
String indexName = context.getIndexName();
String sql = "select ";
boolean weakRead = context.getWeakRead();
if (StringUtils.isNotEmpty(indexName)) {
String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+";
sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ ";
} else if (weakRead){
sql += " /*+READ_CONSISTENCY(WEAK)*/ ";
}
sql += StringUtils.join(context.getColumns(), ',');
sql += " from " + context.getTable();
if (context.getPartitionName() != null) {
sql += String.format(" partition(%s) ", context.getPartitionName());
}
if (StringUtils.isNotEmpty(context.getWhere())) {
sql += " where " + context.getWhere();
}
if (userSavePoint != null && userSavePoint.length() != 0) {
userSavePoint = userSavePoint.replace("=", ">");
sql += (StringUtils.isNotEmpty(context.getWhere()) ? " and " : " where ") + userSavePoint;
}
sql += " order by " + StringUtils.join(context.getPkColumns(), ',') + " asc";
// Using sub-query to apply rownum < readBatchSize since where has higher priority than order by
if (ObReaderUtils.isOracleMode(context.getCompatibleMode()) && context.getReadBatchSize() != -1) {
sql = String.format("select * from (%s) where rownum <= %d", sql, context.getReadBatchSize());
}
return sql;
}
/**
* 增量查的SQL
*
* @param conn
*
* @param context
* @return sql
*/
public static String buildAppendQuerySql(Connection conn, TaskContext context) {
String indexName = context.getIndexName();
boolean weakRead = context.getWeakRead();
String sql = "select ";
if (StringUtils.isNotEmpty(indexName)) {
String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+";
sql += " /*"+ weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ ";
} else if (weakRead){
sql += " /*+READ_CONSISTENCY(WEAK)*/ ";
}
sql += StringUtils.join(context.getColumns(), ',') + " from " + context.getTable();
if (context.getPartitionName() != null) {
sql += String.format(" partition(%s) ", context.getPartitionName());
}
sql += " where ";
String append = "(" + StringUtils.join(context.getPkColumns(), ',') + ") > ("
+ buildPlaceHolder(context.getPkColumns().length) + ")";
if (StringUtils.isNotEmpty(context.getWhere())) {
sql += "(" + context.getWhere() + ") and ";
}
sql = String.format("%s %s order by %s asc", sql, append, StringUtils.join(context.getPkColumns(), ','));
// Using sub-query to apply rownum < readBatchSize since where has higher priority than order by
if (ObReaderUtils.isOracleMode(context.getCompatibleMode()) && context.getReadBatchSize() != -1) {
sql = String.format("select * from (%s) where rownum <= %d", sql, context.getReadBatchSize());
}
return sql;
}
/**
* check if the userSavePoint is valid
*
* @param context
* @return true - valid, false - invalid
*/
public static boolean isUserSavePointValid(TaskContext context) {
String userSavePoint = context.getUserSavePoint();
if (userSavePoint == null || userSavePoint.length() == 0) {
LOG.info("user save point is empty!");
return false;
}
LOG.info("validating user save point: " + userSavePoint);
final String patternString = "(.+)=(.+)";
Pattern parttern = Pattern.compile(patternString);
Matcher matcher = parttern.matcher(userSavePoint);
if (!matcher.find()) {
LOG.error("user save point format is not correct: " + userSavePoint);
return false;
}
List<String> columnsInUserSavePoint = getColumnsFromUserSavePoint(userSavePoint);
List<String> valuesInUserSavePoint = getValuesFromUserSavePoint(userSavePoint);
if (columnsInUserSavePoint.size() == 0 || valuesInUserSavePoint.size() == 0 ||
columnsInUserSavePoint.size() != valuesInUserSavePoint.size()) {
LOG.error("number of columns and values in user save point are different:" + userSavePoint);
return false;
}
String where = context.getWhere();
if (StringUtils.isNotEmpty(where)) {
for (String column : columnsInUserSavePoint) {
if (where.contains(column)) {
LOG.error("column " + column + " is conflict with where: " + where);
return false;
}
}
}
// Columns in userSavePoint must be the selected index.
String[] pkColumns = context.getPkColumns();
if (pkColumns.length != columnsInUserSavePoint.size()) {
LOG.error("user save point is not on the selected index.");
return false;
}
for (String column : columnsInUserSavePoint) {
boolean found = false;
for (String pkCol : pkColumns) {
if (pkCol.equals(column)) {
found = true;
break;
}
}
if (!found) {
LOG.error("column " + column + " is not on the selected index.");
return false;
}
}
return true;
}
private static String removeBracket(String str) {
final char leftBracket = '(';
final char rightBracket = ')';
if (str != null && str.contains(String.valueOf(leftBracket)) && str.contains(String.valueOf(rightBracket)) &&
str.indexOf(leftBracket) < str.indexOf(rightBracket)) {
return str.substring(str.indexOf(leftBracket)+1, str.indexOf(rightBracket));
}
return str;
}
private static List<String> getColumnsFromUserSavePoint(String userSavePoint) {
return Arrays.asList(removeBracket(userSavePoint.split("=")[0]).split(","));
}
private static List<String> getValuesFromUserSavePoint(String userSavePoint) {
return Arrays.asList(removeBracket(userSavePoint.split("=")[1]).split(","));
}
/**
* 先解析成where
* <p>
* 再判断是否存在索引
*
* @param conn
* @param context
* @return
*/
public static void initIndex(Connection conn, TaskContext context) {
if (StringUtils.isEmpty(context.getWhere())) {
return;
}
SQLExpr expr = SQLUtils.toSQLExpr(context.getWhere(), "mysql");
LOG.info("expr: " + expr);
List<String> allColumnsInTab = getAllColumnFromTab(conn, context.getTable());
List<String> allColNames = getColNames(allColumnsInTab, expr);
if (allColNames == null) {
return;
}
// Remove the duplicated column names
Set<String> colNames = new TreeSet<String>();
for (String colName : allColNames) {
if (!colNames.contains(colName)) {
colNames.add(colName);
}
}
List<String> indexNames = getIndexName(conn, context.getTable(), colNames, context.getCompatibleMode());
findBestIndex(conn, indexNames, context.getTable(), context);
}
private static List<String> getAllColumnFromTab(Connection conn, String tableName) {
String sql = "show columns from " + tableName;
Statement stmt = null;
ResultSet rs = null;
List<String> allColumns = new ArrayList<String>();
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
allColumns.add(rs.getString("Field").toUpperCase());
}
} catch (Exception e) {
LOG.warn("fail to get all columns from table " + tableName, e);
} finally {
close(rs, stmt, null);
}
LOG.info("all columns in tab: " + String.join(",", allColumns));
return allColumns;
}
/**
* 找出where条件中的列名目前仅支持全部为and条件并且操作符为大于大约等于等于小于小于等于和不等于的表达式
*
* test coverage: - c6 = 20180710 OR c4 = 320: no index selected - 20180710
* = c6: correct index selected - 20180710 = c6 and c4 = 320 or c2 < 100: no
* index selected
*
* @param expr
* @return
*/
private static List<String> getColNames(List<String> allColInTab, SQLExpr expr) {
List<String> colNames = new ArrayList<String>();
if (expr instanceof SQLBinaryOpExpr) {
SQLBinaryOpExpr exp = (SQLBinaryOpExpr) expr;
if (exp.getOperator() == SQLBinaryOperator.BooleanAnd) {
List<String> leftColumns = getColNames(allColInTab, exp.getLeft());
List<String> rightColumns = getColNames(allColInTab, exp.getRight());
if (leftColumns == null || rightColumns == null) {
return null;
}
colNames.addAll(leftColumns);
colNames.addAll(rightColumns);
} else if (exp.getOperator() == SQLBinaryOperator.GreaterThan
|| exp.getOperator() == SQLBinaryOperator.GreaterThanOrEqual
|| exp.getOperator() == SQLBinaryOperator.Equality
|| exp.getOperator() == SQLBinaryOperator.LessThan
|| exp.getOperator() == SQLBinaryOperator.LessThanOrEqual
|| exp.getOperator() == SQLBinaryOperator.NotEqual) {
// only support simple comparison operators
String left = SQLUtils.toMySqlString(exp.getLeft()).toUpperCase();
String right = SQLUtils.toMySqlString(exp.getRight()).toUpperCase();
LOG.debug("left: " + left + ", right: " + right);
if (allColInTab.contains(left)) {
colNames.add(left);
}
if (allColInTab.contains(right)) {
colNames.add(right);
}
} else {
// unsupported operators
return null;
}
}
return colNames;
}
private static Map<String, List<String>> getAllIndex(Connection conn, String tableName, String compatibleMode) {
Map<String, List<String>> allIndex = new HashMap<String, List<String>>();
String sql = "show index from " + tableName;
if (isOracleMode(compatibleMode)) {
tableName = tableName.toUpperCase();
sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " +
"from dba_ind_columns where TABLE_NAME = '" + tableName +"' " +
" union all " +
"SELECT DISTINCT " +
"CASE " +
"WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " +
"WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " +
"ELSE '' " +
"END AS Key_name, " +
"cols.column_name Column_name " +
"FROM all_constraints cons, all_cons_columns cols " +
"WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " +
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
}
Statement stmt = null;
ResultSet rs = null;
try {
LOG.info("running sql to get index: " + sql);
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
String keyName = rs.getString("Key_name");
String colName = rs.getString("Column_name").toUpperCase();
if (allIndex.containsKey(keyName)) {
allIndex.get(keyName).add(colName);
} else {
List<String> allColumns = new ArrayList<String>();
allColumns.add(colName);
allIndex.put(keyName, allColumns);
}
}
// add primary key to all index
if (allIndex.containsKey("PRIMARY")) {
List<String> colsInPrimary = allIndex.get("PRIMARY");
for (String keyName : allIndex.keySet()) {
if (keyName.equals("PRIMARY")) {
continue;
}
allIndex.get(keyName).addAll(colsInPrimary);
}
}
} catch (Exception e) {
LOG.error("fail to get all keys from table" + sql, e);
} finally {
close(rs, stmt, null);
}
LOG.info("all index: " + allIndex.toString());
return allIndex;
}
/**
*
* @param conn
* @param table
* @param colNamesInCondition
* @return
*/
private static List<String> getIndexName(Connection conn, String table,
Set<String> colNamesInCondition, String compatibleMode) {
List<String> indexNames = new ArrayList<String>();
if (colNamesInCondition == null || colNamesInCondition.size() == 0) {
LOG.info("there is no qulified conditions in the where clause, skip index selection.");
return indexNames;
}
LOG.info("columNamesInConditions: " + String.join(",", colNamesInCondition));
Map<String, List<String>> allIndex = getAllIndex(conn, table, compatibleMode);
for (String keyName : allIndex.keySet()) {
boolean indexNotMatch = false;
// If the index does not have all the column in where conditions, it
// can not be chosen
// the selected index must start with the columns in where condition
if (allIndex.get(keyName).size() < colNamesInCondition.size()) {
indexNotMatch = true;
} else {
// the the first number columns of this index
int num = colNamesInCondition.size();
for (String colName : allIndex.get(keyName)) {
if (!colNamesInCondition.contains(colName)) {
indexNotMatch = true;
break;
}
if (--num == 0) {
break;
}
}
}
if (indexNotMatch) {
continue;
} else {
indexNames.add(keyName);
}
}
return indexNames;
}
/**
* column开头的索引,可能有多个,也可能存在多列的情形
* <p>
* 所以,需要选择列数最少的
*
* @param indexNames
* @param context
*/
private static void findBestIndex(Connection conn, List<String> indexNames, String table, TaskContext context) {
if (indexNames.size() == 0) {
LOG.warn("table has no index.");
return;
}
Map<String, Map<Integer, String>> allIndexs = new HashMap<String, Map<Integer, String>>();
String sql = "show index from " + table + " where key_name in (" + buildPlaceHolder(indexNames.size()) + ")";
if (isOracleMode(context.getCompatibleMode())) {
Map<String, List<String>> allIndexInTab = getAllIndex(conn, table, context.getCompatibleMode());
for (String indexName : indexNames) {
if (allIndexInTab.containsKey(indexName)) {
Map<Integer, String> index = new TreeMap<Integer, String>();
List<String> columnList = allIndexInTab.get(indexName);
for (int i = 1; i <= columnList.size(); i++) {
index.put(i, columnList.get(i-1));
}
allIndexs.put(indexName, index);
} else {
LOG.error("index does not exist: " + indexName);
}
}
} else {
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(sql);
for (int i = 0, n = indexNames.size(); i < n; i++) {
ps.setString(i + 1, indexNames.get(i));
}
rs = ps.executeQuery();
while (rs.next()) {
String keyName = rs.getString("Key_name");
Map<Integer, String> index = allIndexs.get(keyName);
if (index == null) {
index = new TreeMap<Integer, String>();
allIndexs.put(keyName, index);
}
int keyInIndex = rs.getInt("Seq_in_index");
String column = rs.getString("Column_name");
index.put(keyInIndex, column);
}
} catch (Throwable e) {
LOG.error("show index from table fail :" + sql, e);
} finally {
close(rs, ps, null);
}
}
LOG.info("possible index:" + allIndexs + ",where:" + context.getWhere());
Entry<String, Map<Integer, String>> chooseIndex = null;
int columnCount = Integer.MAX_VALUE;
for (Entry<String, Map<Integer, String>> entry : allIndexs.entrySet()) {
if (entry.getValue().size() < columnCount) {
columnCount = entry.getValue().size();
chooseIndex = entry;
}
}
if (chooseIndex != null) {
LOG.info("choose index name:" + chooseIndex.getKey() + ",columns:" + chooseIndex.getValue());
context.setIndexName(chooseIndex.getKey());
context.setSecondaryIndexColumns(new ArrayList<String>(chooseIndex.getValue().values()));
}
}
/**
* 由于ObProxy存在bug,事务超时或事务被杀时,conn的close是没有响应的
*
* @param rs
* @param stmt
* @param conn
*/
public static void close(final ResultSet rs, final Statement stmt, final Connection conn) {
DBUtil.closeDBResources(rs, stmt, conn);
}
/**
* 判断是否重复record
*
* @param savePoint
* @param row
* @param pkIndexs
* @return
*/
public static boolean isPkEquals(Record savePoint, Record row, int[] pkIndexs) {
if (savePoint == null || row == null) {
return false;
}
try {
for (int index : pkIndexs) {
Object left = savePoint.getColumn(index).getRawData();
Object right = row.getColumn(index).getRawData();
if (!left.equals(right)) {
return false;
}
}
} catch (Throwable e) {
return false;
}
return true;
}
public static String buildPlaceHolder(int n) {
if (n <= 0) {
return "";
}
StringBuilder str = new StringBuilder(2 * n);
str.append('?');
for (int i = 1; i < n; i++) {
str.append(",?");
}
return str.toString();
}
public static void binding(PreparedStatement ps, List<Column> list) throws SQLException {
for (int i = 0, n = list.size(); i < n; i++) {
Column c = list.get(i);
if(c instanceof BoolColumn){
ps.setLong(i + 1, ((BoolColumn)c).asLong());
}else if(c instanceof BytesColumn){
ps.setBytes(i + 1, ((BytesColumn)c).asBytes());
}else if(c instanceof DateColumn){
ps.setTimestamp(i + 1, new Timestamp(((DateColumn)c).asDate().getTime()));
}else if(c instanceof DoubleColumn){
ps.setDouble(i + 1, ((DoubleColumn)c).asDouble());
}else if(c instanceof LongColumn){
ps.setLong(i + 1, ((LongColumn)c).asLong());
}else if(c instanceof StringColumn){
ps.setString(i + 1, ((StringColumn)c).asString());
}else{
ps.setObject(i + 1, c.getRawData());
}
}
}
public static List<Column> buildPoint(Record savePoint, int[] pkIndexs) {
List<Column> result = new ArrayList<Column>(pkIndexs.length);
for (int i = 0, n = pkIndexs.length; i < n; i++) {
result.add(savePoint.getColumn(pkIndexs[i]));
}
return result;
}
public static String getCompatibleMode(Connection conn) {
String compatibleMode = OB_COMPATIBLE_MODE_MYSQL;
String getCompatibleModeSql = "SHOW VARIABLES LIKE 'ob_compatibility_mode'";
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(getCompatibleModeSql);
if (rs.next()) {
compatibleMode = rs.getString("VALUE");
}
} catch (Exception e) {
LOG.error("fail to get ob compatible mode, using mysql as default: " + e.getMessage());
} finally {
DBUtil.closeDBResources(rs, stmt, conn);
}
LOG.info("ob compatible mode is " + compatibleMode);
return compatibleMode;
}
public static boolean isOracleMode(String mode) {
return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equals(mode));
}
}

View File

@ -0,0 +1,176 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import java.sql.Connection;
import java.util.Collections;
import java.util.List;
import com.alibaba.datax.common.element.Record;
public class TaskContext {
private Connection conn;
private final String table;
private String indexName;
// 辅助索引的字段列表
private List<String> secondaryIndexColumns = Collections.emptyList();
private String querySql;
private final String where;
private final int fetchSize;
private long readBatchSize = -1;
private boolean weakRead = true;
private String userSavePoint;
private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL;
public String getPartitionName() {
return partitionName;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
private String partitionName;
// 断点续读的保存点
private volatile Record savePoint;
// pk在column中的index,用于绑定变量时从savePoint中读取值
// 如果这个值为null,则表示 不是断点续读的场景
private int[] pkIndexs;
private final List<String> columns;
private String[] pkColumns;
private long cost;
private final int transferColumnNumber;
public TaskContext(String table, List<String> columns, String where, int fetchSize) {
super();
this.table = table;
this.columns = columns;
// 针对只有querySql的场景
this.transferColumnNumber = columns == null ? -1 : columns.size();
this.where = where;
this.fetchSize = fetchSize;
}
public Connection getConn() {
return conn;
}
public void setConn(Connection conn) {
this.conn = conn;
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public List<String> getSecondaryIndexColumns() {
return secondaryIndexColumns;
}
public void setSecondaryIndexColumns(List<String> secondaryIndexColumns) {
this.secondaryIndexColumns = secondaryIndexColumns;
}
public String getQuerySql() {
if (readBatchSize == -1 || ObReaderUtils.isOracleMode(compatibleMode)) {
return querySql;
} else {
return querySql + " limit " + readBatchSize;
}
}
public void setQuerySql(String querySql) {
this.querySql = querySql;
}
public String getWhere() {
return where;
}
public Record getSavePoint() {
return savePoint;
}
public void setSavePoint(Record savePoint) {
this.savePoint = savePoint;
}
public int[] getPkIndexs() {
return pkIndexs;
}
public void setPkIndexs(int[] pkIndexs) {
this.pkIndexs = pkIndexs;
}
public List<String> getColumns() {
return columns;
}
public String[] getPkColumns() {
return pkColumns;
}
public void setPkColumns(String[] pkColumns) {
this.pkColumns = pkColumns;
}
public String getTable() {
return table;
}
public int getFetchSize() {
return fetchSize;
}
public long getCost() {
return cost;
}
public void addCost(long cost) {
this.cost += cost;
}
public int getTransferColumnNumber() {
return transferColumnNumber;
}
public long getReadBatchSize() {
return readBatchSize;
}
public void setReadBatchSize(long readBatchSize) {
this.readBatchSize = readBatchSize;
}
public boolean getWeakRead() {
return weakRead;
}
public void setWeakRead(boolean weakRead) {
this.weakRead = weakRead;
}
public String getUserSavePoint() {
return userSavePoint;
}
public void setUserSavePoint(String userSavePoint) {
this.userSavePoint = userSavePoint;
}
public String getCompatibleMode() {
return compatibleMode;
}
public void setCompatibleMode(String compatibleMode) {
this.compatibleMode = compatibleMode;
}
}

View File

@ -0,0 +1,6 @@
{
"name": "oceanbasev10reader",
"class": "com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader",
"description": "read data from oceanbase with SQL interface",
"developer": "oceanbase"
}

126
oceanbasev10writer/pom.xml Normal file
View File

@ -0,0 +1,126 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oceanbasev10writer</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.0.4.RELEASE</version>
<scope>test</scope>
</dependency>
<!--
<dependency>
<groupId>com.alipay.oceanbase</groupId>
<artifactId>oceanbase-partition</artifactId>
<version>0.0.5</version>
</dependency>
-->
<dependency>
<groupId>com.alipay.oceanbase</groupId>
<artifactId>oceanbase-connector-java</artifactId>
<version>3.2.0</version>
<scope>system</scope>
<systemPath>${basedir}/src/main/libs/oceanbase-connector-java-3.2.0.jar</systemPath>
<exclusions>
<exclusion>
<groupId>com.alipay.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/oceanbasev10writer</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>oceanbasev10writer-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/oceanbasev10writer</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/oceanbasev10writer/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,62 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
public interface Config {
String MEMSTORE_THRESHOLD = "memstoreThreshold";
double DEFAULT_MEMSTORE_THRESHOLD = 0.9d;
String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond";
long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30;
int DEFAULT_BATCH_SIZE = 100;
int MAX_BATCH_SIZE = 4096;
String FAIL_TRY_COUNT = "failTryCount";
int DEFAULT_FAIL_TRY_COUNT = 10000;
String WRITER_THREAD_COUNT = "writerThreadCount";
int DEFAULT_WRITER_THREAD_COUNT = 1;
String CONCURRENT_WRITE = "concurrentWrite";
boolean DEFAULT_CONCURRENT_WRITE = true;
String OB_VERSION = "obVersion";
String TIMEOUT = "timeout";
String PRINT_COST = "printCost";
boolean DEFAULT_PRINT_COST = false;
String COST_BOUND = "costBound";
long DEFAULT_COST_BOUND = 20;
String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
String WRITER_SUB_TASK_COUNT = "writerSubTaskCount";
int DEFAULT_WRITER_SUB_TASK_COUNT = 1;
int MAX_WRITER_SUB_TASK_COUNT = 4096;
String OB_WRITE_MODE = "obWriteMode";
String OB_COMPATIBLE_MODE = "obCompatibilityMode";
String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout";
int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout";
int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout";
int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s
String OB_UPDATE_COLUMNS = "obUpdateColumns";
String USE_PART_CALCULATOR = "usePartCalculator";
boolean DEFAULT_USE_PART_CALCULATOR = false;
}

View File

@ -0,0 +1,246 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.SingleTableWriterTask;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
/**
* 2016-04-07
* <p>
* 专门针对OceanBase1.0的Writer
*
* @author biliang.wbl
*
*/
public class OceanBaseV10Writer extends Writer {
private static DataBaseType DATABASE_TYPE = DataBaseType.OceanBase;
/**
* Job 中的方法仅执行一次Task 中方法会由框架启动多个 Task 线程并行执行
* <p/>
* 整个 Writer 执行流程是
*
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startWrite-->post-->destroy
* Task类init-->prepare-->startWrite-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
private CommonRdbmsWriter.Job commonJob;
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
/**
* 注意此方法仅执行一次 最佳实践通常在这里对用户的配置进行校验是否缺失必填项有无错误值有没有无关配置项...
* 并给出清晰的报错/警告提示校验通常建议采用静态工具类进行以保证本类结构清晰
*/
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
checkCompatibleMode(originalConfig);
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonJob.init(this.originalConfig);
}
/**
* 注意此方法仅执行一次 最佳实践如果 Job 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉
*/
// 一般来说是需要推迟到 task 中进行pre 的执行单表情况例外
@Override
public void prepare() {
int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (tableNumber == 1) {
this.commonJob.prepare(this.originalConfig);
final String version = fetchServerVersion(originalConfig);
originalConfig.set(Config.OB_VERSION, version);
}
String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
// 获取presql配置并执行
List<String> preSqls = originalConfig.getList(Key.PRE_SQL, String.class);
if (preSqls == null || preSqls.size() == 0) {
return;
}
List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
for (Object connConfObject : conns) {
Configuration connConf = Configuration.from(connConfObject.toString());
// 这里的 jdbcUrl 已经 append 了合适后缀参数
String jdbcUrl = connConf.getString(Key.JDBC_URL);
List<String> tableList = connConf.getList(Key.TABLE, String.class);
for (String table : tableList) {
List<String> renderedPreSqls = WriterUtil.renderPreOrPostSqls(preSqls, table);
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(renderedPreSqls, ";"), jdbcUrl);
WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, DATABASE_TYPE);
ObWriterUtils.asyncClose(null, null, conn);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("After job prepare(), originalConfig now is:[\n{}\n]", originalConfig.toJSON());
}
}
/**
* 注意此方法仅执行一次 最佳实践通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作 这里的
* mandatoryNumber 是强制必须切分的份数
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (tableNumber == 1) {
return this.commonJob.split(this.originalConfig, mandatoryNumber);
}
Configuration simplifiedConf = this.originalConfig;
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
for (int j = 0; j < mandatoryNumber; j++) {
splitResultConfigs.add(simplifiedConf.clone());
}
return splitResultConfigs;
}
/**
* 注意此方法仅执行一次 最佳实践如果 Job 中有需要进行数据同步之后的后续处理可以在此处完成
*/
@Override
public void post() {
int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (tableNumber == 1) {
commonJob.post(this.originalConfig);
return;
}
String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
List<String> postSqls = originalConfig.getList(Key.POST_SQL, String.class);
if (postSqls == null || postSqls.size() == 0) {
return;
}
for (Object connConfObject : conns) {
Configuration connConf = Configuration.from(connConfObject.toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
List<String> tableList = connConf.getList(Key.TABLE, String.class);
for (String table : tableList) {
List<String> renderedPostSqls = WriterUtil.renderPreOrPostSqls(postSqls, table);
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
// 说明有 postSql 配置则此处删除掉
Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcUrl, username, password);
LOG.info("Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(renderedPostSqls, ";"), jdbcUrl);
WriterUtil.executeSqls(conn, renderedPostSqls, jdbcUrl, DATABASE_TYPE);
ObWriterUtils.asyncClose(null, null, conn);
}
}
}
originalConfig.remove(Key.POST_SQL);
}
/**
* 注意此方法仅执行一次 最佳实践通常配合 Job 中的 post() 方法一起完成 Job 的资源释放
*/
@Override
public void destroy() {
this.commonJob.destroy(this.originalConfig);
}
private String fetchServerVersion(Configuration config) {
final String fetchVersionSql = "show variables like 'version'";
return DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql);
}
private void checkCompatibleMode(Configuration configure) {
final String fetchCompatibleModeSql = "SHOW VARIABLES LIKE 'ob_compatibility_mode'";
String compatibleMode = DbUtils.fetchSingleValueWithRetry(configure, fetchCompatibleModeSql);
ObWriterUtils.setCompatibleMode(compatibleMode);
configure.set(Config.OB_COMPATIBLE_MODE, compatibleMode);
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task writerTask;
/**
* 注意此方法每个 Task 都会执行一次 最佳实践此处通过对 taskConfig 配置的读取进而初始化一些资源为
* startWrite()做准备
*/
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
int tableNumber = writerSliceConfig.getInt(Constant.TABLE_NUMBER_MARK);
if (tableNumber == 1) {
// always use concurrentTableWriter
this.writerTask = new ConcurrentTableWriterTask(DATABASE_TYPE);
} else {
throw new RuntimeException("writing to multi-tables is not supported.");
}
LOG.info("tableNumber:" + tableNumber + ",writerTask Class:" + writerTask.getClass().getName());
this.writerTask.init(this.writerSliceConfig);
}
/**
* 注意此方法每个 Task 都会执行一次 最佳实践如果 Task
* 中有需要进行数据同步之前的处理可以在此处完成如果没有必要则可以直接去掉
*/
@Override
public void prepare() {
this.writerTask.prepare(this.writerSliceConfig);
}
/**
* 注意此方法每个 Task 都会执行一次 最佳实践此处适当封装确保简洁清晰完成数据写入工作
*/
public void startWrite(RecordReceiver recordReceiver) {
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
}
/**
* 注意此方法每个 Task 都会执行一次 最佳实践如果 Task 中有需要进行数据同步之后的后续处理可以在此处完成
*/
@Override
public void post() {
this.writerTask.post(this.writerSliceConfig);
}
/**
* 注意此方法每个 Task 都会执行一次 最佳实践通常配合Task 中的 post() 方法一起完成 Task 的资源释放
*/
@Override
public void destroy() {
this.writerTask.destroy(this.writerSliceConfig);
}
}
}

View File

@ -0,0 +1,37 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import java.sql.Connection;
public abstract class ConnHolder {
protected final Configuration config;
protected Connection conn;
public ConnHolder(Configuration config) {
this.config = config;
}
public abstract Connection initConnection();
public Configuration getConfig() {
return config;
}
public Connection getConn() {
return conn;
}
public Connection reconnect() {
DBUtil.closeDBResources(null, conn);
return initConnection();
}
public abstract String getJdbcUrl();
public abstract String getUserName();
public abstract void destroy();
}

View File

@ -0,0 +1,101 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author oceanbase
*
*/
public class DataBaseWriterBuffer {
private static final Logger LOG = LoggerFactory.getLogger(DataBaseWriterBuffer.class);
private final ConnHolder connHolder;
private final String dbName;
private Map<String, LinkedList<Record>> tableBuffer = new HashMap<String, LinkedList<Record>>();
private long lastCheckMemstoreTime;
public DataBaseWriterBuffer(Configuration config,String jdbcUrl, String userName, String password,String dbName){
this.connHolder = new ObClientConnHolder(config, jdbcUrl, userName, password);
this.dbName=dbName;
}
public ConnHolder getConnHolder(){
return connHolder;
}
public void initTableBuffer(List<String> tableList) {
for (String table : tableList) {
tableBuffer.put(table, new LinkedList<Record>());
}
}
public List<String> getTableList(){
return new ArrayList<String>(tableBuffer.keySet());
}
public void addRecord(Record record, String tableName) {
LinkedList<Record> recordList = tableBuffer.get(tableName);
if (recordList == null) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR,
String.format("The [table] calculated based on the rules does not exist. The calculated [tableName]=%s, [db]=%s. Please check the rules you configured.",
tableName, connHolder.getJdbcUrl()));
}
recordList.add(record);
}
public Map<String, LinkedList<Record>> getTableBuffer() {
return tableBuffer;
}
public String getDbName() {
return dbName;
}
public long getLastCheckMemstoreTime() {
return lastCheckMemstoreTime;
}
public void setLastCheckMemstoreTime(long lastCheckMemstoreTime) {
this.lastCheckMemstoreTime = lastCheckMemstoreTime;
}
/**
* 检查当前DB的memstore使用状态
* <p>
* 若超过阈值,则休眠
*
* @param memstoreCheckIntervalSecond
* @param memstoreThreshold
*/
public synchronized void checkMemstore(long memstoreCheckIntervalSecond, double memstoreThreshold) {
long now = System.currentTimeMillis();
if (now - getLastCheckMemstoreTime() < 1000 * memstoreCheckIntervalSecond) {
return;
}
LOG.debug(String.format("checking memstore usage: lastCheckTime=%d, now=%d, check interval=%d, threshold=%f",
getLastCheckMemstoreTime(), now, memstoreCheckIntervalSecond, memstoreThreshold));
Connection conn = getConnHolder().getConn();
while (ObWriterUtils.isMemstoreFull(conn, memstoreThreshold)) {
LOG.warn("OB memstore is full,sleep 60 seconds, jdbc=" + getConnHolder().getJdbcUrl()
+ ",threshold=" + memstoreThreshold);
ObWriterUtils.sleep(60000);
}
setLastCheckMemstoreTime(now);
}
}

View File

@ -0,0 +1,190 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alipay.oceanbase.obproxy.datasource.ObGroupDataSource;
import com.alipay.oceanbase.obproxy.exception.ConnectionPropertiesNotSupportedException;
import com.alipay.oceanbase.obproxy.util.StringParser.IllegalFormatException;
import com.google.common.collect.Maps;
public class OBDataSourceV10 {
private static final Logger LOG = LoggerFactory.getLogger(OBDataSourceV10.class);
private static final Map<String, DataSourceHolder> dataSources = Maps.newHashMap();
private static int ocjGetConnectionTimeout = 0;
private static int ocjGlobalProxyroGetConnectionTimeout = 0;
private static int ocjMaxWaitOfCreateClusterResourceMs = 0;
private static Configuration taskConfig;
public static String genKey(String fullUserName, String dbName) {
//username@tenantName#clusterName/dbName
return fullUserName + "/" + dbName;
}
public static synchronized void init(Configuration configuration,
final String fullUsername,
final String password,
final String dbName) {
taskConfig = configuration;
final String rsUrl = "";
final String dataSourceKey = genKey(fullUsername, dbName);
final int maxActiveConnection = configuration.getInt(Config.MAX_ACTIVE_CONNECTION, Config.DEFAULT_MAX_ACTIVE_CONNECTION);
if (dataSources.containsKey(dataSourceKey)) {
dataSources.get(dataSourceKey).increseRefercnce();
} else {
long timeout = configuration.getInt(Config.TIMEOUT, 30);
if (timeout < 30) {
timeout = 30;
}
if (ocjGetConnectionTimeout == 0) {
ocjGetConnectionTimeout = configuration.getInt(Config.OCJ_GET_CONNECT_TIMEOUT,
Config.DEFAULT_OCJ_GET_CONNECT_TIMEOUT);
ocjGlobalProxyroGetConnectionTimeout = configuration.getInt(Config.OCJ_PROXY_CONNECT_TIMEOUT,
Config.DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT);
ocjMaxWaitOfCreateClusterResourceMs = configuration.getInt(Config.OCJ_CREATE_RESOURCE_TIMEOUT,
Config.DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT);
LOG.info(String.format("initializing OCJ with ocjGetConnectionTimeout=%d, " +
"ocjGlobalProxyroGetConnectionTimeout=%d, ocjMaxWaitOfCreateClusterResourceMs=%d",
ocjGetConnectionTimeout, ocjGlobalProxyroGetConnectionTimeout, ocjMaxWaitOfCreateClusterResourceMs));
}
DataSourceHolder holder = null;
try {
holder = new DataSourceHolder(rsUrl, fullUsername, password, dbName, maxActiveConnection, timeout);
dataSources.put(dataSourceKey, holder);
} catch (ConnectionPropertiesNotSupportedException e) {
e.printStackTrace();
throw new DataXException(ObDataSourceErrorCode.DESC, "connect error");
} catch (IllegalArgumentException e) {
e.printStackTrace();
throw new DataXException(ObDataSourceErrorCode.DESC, "connect error");
} catch (IllegalFormatException e) {
e.printStackTrace();
throw new DataXException(ObDataSourceErrorCode.DESC, "connect error");
} catch (SQLException e) {
e.printStackTrace();
throw new DataXException(ObDataSourceErrorCode.DESC, "connect error");
}
}
}
public static synchronized void destory(final String dataSourceKey){
DataSourceHolder holder = dataSources.get(dataSourceKey);
holder.decreaseReference();
if (holder.canClose()) {
dataSources.remove(dataSourceKey);
holder.close();
LOG.info(String.format("close datasource success [%s]", dataSourceKey));
}
}
public static Connection getConnection(final String url) {
Connection conn = null;
try {
conn = dataSources.get(url).getconnection();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
private static Map<String, String> buildJdbcProperty() {
Map<String, String> property = new HashMap<String, String>();
property.put("useServerPrepStmts", "false");
property.put("characterEncoding", "UTF-8");
property.put("useLocalSessionState", "false");
property.put("rewriteBatchedStatements", "true");
property.put("socketTimeout", "25000");
return property;
}
private static class DataSourceHolder {
private volatile int reference;
private final ObGroupDataSource groupDataSource;
public static final Map<String, String> jdbcProperty = buildJdbcProperty();;
public DataSourceHolder(final String rsUrl,
final String fullUsername,
final String password,
final String dbName,
final int maxActive,
final long timeout) throws ConnectionPropertiesNotSupportedException, IllegalFormatException, IllegalArgumentException, SQLException {
this.reference = 1;
this.groupDataSource = new ObGroupDataSource();
this.groupDataSource.setUrl(rsUrl);
this.groupDataSource.setFullUsername(fullUsername);
this.groupDataSource.setPassword(password);
this.groupDataSource.setDatabase(dbName);
this.groupDataSource.setConnectionProperties(jdbcProperty);
this.groupDataSource.setGetConnectionTimeout(ocjGetConnectionTimeout);
this.groupDataSource.setGlobalProxyroGetConnectionTimeout(ocjGlobalProxyroGetConnectionTimeout);
this.groupDataSource.setMaxWaitOfCreateClusterResourceMs(ocjMaxWaitOfCreateClusterResourceMs);
this.groupDataSource.setMaxActive(maxActive);
this.groupDataSource.setGlobalSlowQueryThresholdUs(3000000); // 3s, sql with response time more than 3s will be logged
this.groupDataSource.setGlobalCleanLogFileEnabled(true); // enable log cleanup
this.groupDataSource.setGlobalLogFileSizeThreshold(17179869184L); // 16G, log file total size
this.groupDataSource.setGlobalCleanLogFileInterval(10000); // 10s, check interval
this.groupDataSource.setInitialSize(1);
List<String> initSqls = new ArrayList<String>();
if (taskConfig != null) {
List<String> sessionConfig = taskConfig.getList(Key.SESSION, new ArrayList(), String.class);
if (sessionConfig != null || sessionConfig.size() > 0) {
initSqls.addAll(sessionConfig);
}
}
// set up for writing timestamp columns
if (ObWriterUtils.isOracleMode()) {
initSqls.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS';");
initSqls.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF';");
initSqls.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD';");
}
this.groupDataSource.setConnectionInitSqls(initSqls);
this.groupDataSource.init();
// this.groupDataSource;
LOG.info("Create GroupDataSource rsUrl=[{}], fullUserName=[{}], dbName=[{}], getConnectionTimeout= {}ms, maxActive={}",
rsUrl, fullUsername, dbName, 5000, maxActive);
}
public Connection getconnection() throws SQLException {
return groupDataSource.getConnection();
}
public synchronized void increseRefercnce() {
this.reference++;
}
public synchronized void decreaseReference() {
this.reference--;
}
public synchronized boolean canClose() {
return reference == 0;
}
public synchronized void close() {
if (this.canClose()) {
groupDataSource.destroy();
}
}
}
}

View File

@ -0,0 +1,55 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
/**
* wrap oceanbase java client
* @author oceanbase
*/
public class OCJConnHolder extends ConnHolder {
private ServerConnectInfo connectInfo;
private String dataSourceKey;
public OCJConnHolder (Configuration config, ServerConnectInfo connInfo) {
super(config);
this.connectInfo = connInfo;
this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName);
OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName);
}
@Override
public Connection initConnection() {
conn = OBDataSourceV10.getConnection(dataSourceKey);
return conn;
}
@Override
public Connection reconnect() {
DBUtil.closeDBResources(null, conn);
return initConnection();
}
@Override
public Connection getConn() {
return conn;
}
@Override
public String getJdbcUrl() {
return connectInfo.jdbcUrl;
}
@Override
public String getUserName() {
return connectInfo.userName;
}
public void destroy() {
OBDataSourceV10.destory(this.dataSourceKey);
}
}

View File

@ -0,0 +1,63 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
/**
* 数据库连接代理对象,负责创建连接重新连接
*
* @author oceanbase
*
*/
public class ObClientConnHolder extends ConnHolder {
private final String jdbcUrl;
private final String userName;
private final String password;
public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
super(config);
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
// Connect to ob with obclient and obproxy
@Override
public Connection initConnection() {
String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl);
DataBaseType dbType = DataBaseType.OceanBase;
if (ObWriterUtils.isOracleMode()) {
// set up for writing timestamp columns
List<String> sessionConfig = config.getList(Key.SESSION, new ArrayList<String>(), String.class);
sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'");
config.set(Key.SESSION, sessionConfig);
}
conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password);
DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE);
return conn;
}
@Override
public String getJdbcUrl() {
return jdbcUrl;
}
@Override
public String getUserName() {
return userName;
}
@Override
public void destroy() {
DBUtil.closeDBResources(null, conn);
}
}

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import com.alibaba.datax.common.spi.ErrorCode;
public enum ObDataSourceErrorCode implements ErrorCode {
DESC("ObDataSourceError code","connect error");
private final String code;
private final String describe;
private ObDataSourceErrorCode(String code, String describe) {
this.code = code;
this.describe = describe;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.describe;
}
@Override
public String toString() {
return String.format("Code:[%s], Describe:[%s]. ", this.code,
this.describe);
}
}

View File

@ -0,0 +1,58 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ServerConnectInfo {
public String clusterName;
public String tenantName;
public String userName;
public String password;
public String databaseName;
public String ipPort;
public String jdbcUrl;
public ServerConnectInfo(final String jdbcUrl, final String username, final String password) {
if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length != 3) {
throw new RuntimeException("jdbc url format is not correct: " + jdbcUrl);
}
this.userName = username;
this.clusterName = ss[1].trim().split(":")[0];
this.tenantName = ss[1].trim().split(":")[1];
this.jdbcUrl = ss[2].replace("jdbc:mysql:", "jdbc:oceanbase:");
} else {
throw new RuntimeException ("jdbc url format is not correct: " + jdbcUrl);
}
this.password = password;
parseJdbcUrl(jdbcUrl);
}
private void parseJdbcUrl(final String jdbcUrl) {
Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w]+)\\?");
Matcher matcher = pattern.matcher(jdbcUrl);
if (matcher.find()) {
String ipPort = matcher.group(1);
String dbName = matcher.group(2);
this.ipPort = ipPort;
this.databaseName = dbName;
} else {
throw new RuntimeException("Invalid argument:" + jdbcUrl);
}
}
public String toString() {
StringBuffer strBuffer = new StringBuffer();
return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName)
.append(", userName:").append(userName).append(", databaseName:").append(databaseName)
.append(", ipPort:").append(ipPort).append(", jdbcUrl:").append(jdbcUrl).toString();
}
public String getFullUserName() {
StringBuilder builder = new StringBuilder();
builder.append(userName).append("@").append(tenantName).append("#").append(clusterName);
return builder.toString();
}
}

View File

@ -0,0 +1,41 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
public class ColumnMetaCache {
private static final Logger LOG = LoggerFactory.getLogger(ColumnMetaCache.class);
private static String tableName;
private static Triple<List<String>, List<Integer>, List<String>> columnMeta = null;
public ColumnMetaCache() {
}
public static void init(Connection connection, final String tableName, final List<String> columns) throws SQLException {
if (columnMeta == null) {
synchronized(ColumnMetaCache.class) {
ColumnMetaCache.tableName = tableName;
if (columnMeta == null) {
columnMeta = DBUtil.getColumnMetaData(connection,
tableName, StringUtils.join(columns, ","));
LOG.info("fetch columnMeta of table {} success", tableName);
}
}
}
}
public static Triple<List<String>, List<Integer>, List<String>> getColumnMeta() {
return columnMeta;
}
}

View File

@ -0,0 +1,523 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.sql.Connection;
//import java.sql.PreparedStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import com.alipay.oceanbase.obproxy.data.TableEntryKey;
import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
private static AtomicLong totalTask = new AtomicLong(0);
private long taskId = -1;
private AtomicBoolean isMemStoreFull = new AtomicBoolean(false);
private ConnHolder checkConnHolder;
public ConcurrentTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
taskId = totalTask.getAndIncrement();
}
private ObPartitionIdCalculator partCalculator = null;
private HashMap<Long, List<Record>> groupInsertValues;
// private List<Record> unknownPartRecords;
private List<Integer> partitionKeyIndexes;
private ConcurrentTableWriter concurrentWriter = null;
private ConnHolder connHolder;
private boolean allTaskInQueue = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private long startTime;
private boolean isOb2 = false;
private String obWriteMode = "update";
private boolean isOracleCompatibleMode = false;
private String obUpdateColumns = null;
private List<Pair<String, int[]>> deleteColPos;
private String dbName;
@Override
public void init(Configuration config) {
super.init(config);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
obWriteMode = config.getString(Config.OB_WRITE_MODE, "update");
ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password);
dbName = connectInfo.databaseName;
//init check memstore
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
this.isOracleCompatibleMode = ObWriterUtils.isOracleMode();
LOG.info("configure url is unavailable, use obclient for connections.");
this.checkConnHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
checkConnHolder.initConnection();
if (isOracleCompatibleMode) {
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
table = table.toUpperCase();
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
connectInfo.databaseName, table));
}
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
initPartCalculator(connectInfo);
} else {
LOG.info("Disable partition calculation feature.");
}
obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null);
groupInsertValues = new HashMap<Long, List<Record>>();
partitionKeyIndexes = new ArrayList<Integer>();
rewriteSql();
if (null == concurrentWriter) {
concurrentWriter = new ConcurrentTableWriter(config, connectInfo, writeRecordSql);
allTaskInQueue = false;
}
String version = config.getString(Config.OB_VERSION);
int pIdx = version.lastIndexOf('.');
if ((Float.valueOf(version.substring(0, pIdx)) >= 2.1f)) {
isOb2 = true;
}
}
private void initPartCalculator(ServerConnectInfo connectInfo) {
int retry = 0;
LOG.info(String.format("create tableEntryKey with clusterName %s, tenantName %s, databaseName %s, tableName %s",
connectInfo.clusterName, connectInfo.tenantName, connectInfo.databaseName, table));
TableEntryKey tableEntryKey = new TableEntryKey(connectInfo.clusterName, connectInfo.tenantName,
connectInfo.databaseName, table);
do {
try {
if (retry > 0) {
int sleep = retry > 8 ? 500 : (1 << retry);
TimeUnit.SECONDS.sleep(sleep);
LOG.info("retry create new part calculator, the {} times", retry);
}
LOG.info("create partCalculator with address: " + connectInfo.ipPort);
partCalculator = new ObPartitionIdCalculator(connectInfo.ipPort, tableEntryKey);
} catch (Exception ex) {
++retry;
LOG.warn("create new part calculator failed, retry ... {}", retry, ex);
}
} while (partCalculator == null && retry < 3); // try 3 times
}
public boolean isFinished() {
return allTaskInQueue && concurrentWriter.checkFinish();
}
public boolean allTaskInQueue() {
return allTaskInQueue;
}
public void setPutAllTaskInQueue() {
this.allTaskInQueue = true;
LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}",
concurrentWriter.getTaskQueueSize(),
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount());
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) {
// change obWriteMode to insert so the insert statement will be generated.
obWriteMode = "insert";
deleteColPos = ObWriterUtils.buildDeleteSql(conn, dbName, table, columns);
}
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
LOG.info("writeRecordSql :{}", this.writeRecordSql);
}
public void prepare(Configuration writerSliceConfig) {
super.prepare(writerSliceConfig);
calPartitionKeyIndex(partitionKeyIndexes);
concurrentWriter.start();
}
private void calPartitionKeyIndex(List<Integer> partKeyIndexes) {
partKeyIndexes.clear();
if (null == partCalculator) {
LOG.error("partCalculator is null");
return;
}
for (int i = 0; i < columns.size(); ++i) {
if (partCalculator.isPartitionKeyColumn(columns.get(i))) {
LOG.info(columns.get(i) + " is partition key.");
partKeyIndexes.add(i);
}
}
}
private Long calPartitionId(List<Integer> partKeyIndexes, Record record) {
if (partCalculator == null) {
return null;
}
for (Integer i : partKeyIndexes) {
partCalculator.addColumn(columns.get(i), record.getColumn(i).asString());
}
return partCalculator.calculate();
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
this.taskPluginCollector = taskPluginCollector;
// 用于写入数据的时候的类型根据目的表字段类型转换
int retryTimes = 0;
boolean needRetry = false;
do {
try {
if (retryTimes > 0) {
TimeUnit.SECONDS.sleep((1 << retryTimes));
DBUtil.closeDBResources(null, connection);
connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes);
}
ColumnMetaCache.init(connection, this.table, this.columns);
this.resultSetMetaData = ColumnMetaCache.getColumnMeta();
needRetry = false;
} catch (SQLException e) {
needRetry = true;
++retryTimes;
e.printStackTrace();
LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes);
} catch (InterruptedException e) {
LOG.warn("startWriteWithConnection interrupt, ignored");
} finally {
}
} while (needRetry && retryTimes < 100);
try {
Record record;
startTime = System.currentTimeMillis();
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等直接报错
LOG.error("column not equal {} != {}, record = {}",
this.columnNumber, record.getColumnNumber(), record.toString());
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format("Recoverable exception in OB. Roll back this write and hibernate for one minute. SQLState: %d. ErrorCode: %d",
record.getColumnNumber(),
this.columnNumber));
}
addRecordToCache(record);
}
addLeftRecords();
waitTaskFinish();
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(null, null, connection);
}
}
public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
return fillPreparedStatement(preparedStatement, record);
}
public PreparedStatement fillStatementIndex(PreparedStatement preparedStatement,
int prepIdx, int columnIndex, Column column) throws SQLException {
int columnSqltype = this.resultSetMetaData.getMiddle().get(columnIndex);
String typeName = this.resultSetMetaData.getRight().get(columnIndex);
return fillPreparedStatementColumnType(preparedStatement, prepIdx, columnSqltype, typeName, column);
}
public void collectDirtyRecord(Record record, SQLException e) {
taskPluginCollector.collectDirtyRecord(record, e);
}
public void insertOneRecord(Connection connection, List<Record> buffer) {
doOneInsert(connection, buffer);
}
private void addLeftRecords() {
for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0 ) {
int retry = 0;
while (true) {
try {
concurrentWriter.addBatchRecords(groupValues);
break;
} catch (InterruptedException e) {
retry++;
LOG.info("Concurrent table writer is interrupted, retry {}", retry);
}
}
}
}
}
private void addRecordToCache(final Record record) {
Long partId =null;
try {
partId = calPartitionId(partitionKeyIndexes, record);
} catch (Exception e1) {
LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record);
}
if (partId == null && isOb2) {
LOG.debug("fail to calculate parition id, just put into the default buffer.");
partId = Long.MAX_VALUE;
}
if (partId != null) {
List<Record> groupValues = groupInsertValues.get(partId);
if (groupValues == null) {
groupValues = new ArrayList<Record>(batchSize);
groupInsertValues.put(partId, groupValues);
}
groupValues.add(record);
if (groupValues.size() >= batchSize) {
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(groupValues);
printEveryTime();
break;
} catch (InterruptedException e) {
LOG.info("Concurrent table writer is interrupted");
}
}
groupValues = new ArrayList<Record>(batchSize);
groupInsertValues.put(partId, groupValues);
}
} else {
LOG.warn("add unknown part record {}", record);
List<Record> unknownPartRecords = new ArrayList<Record>();
unknownPartRecords.add(record);
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(unknownPartRecords);
break;
} catch (InterruptedException e) {
LOG.info("Concurrent table writer is interrupted");
}
}
}
}
private void checkMemStore() {
Connection checkConn = checkConnHolder.reconnect();
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
boolean isFull = ObWriterUtils.isMemstoreFull(checkConn, memstoreThreshold);
this.isMemStoreFull.set(isFull);
if (isFull) {
LOG.warn("OB memstore is full,sleep 30 seconds, threshold=" + memstoreThreshold);
}
lastCheckMemstoreTime = now;
}
public boolean isMemStoreFull() {
return isMemStoreFull.get();
}
public void printEveryTime() {
long cost = System.currentTimeMillis() - startTime;
if (cost > 10000) { //10s
print();
startTime = System.currentTimeMillis();
}
}
public void print() {
LOG.debug("Statistic total task {}, finished {}, queue Size {}",
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount(),
concurrentWriter.getTaskQueueSize());
concurrentWriter.printStatistics();
}
public void waitTaskFinish() {
setPutAllTaskInQueue();
lock.lock();
try {
while (!concurrentWriter.checkFinish()) {
condition.await(15, TimeUnit.SECONDS);
print();
checkMemStore();
}
} catch (InterruptedException e) {
LOG.warn("Concurrent table writer wait task finish interrupt");
} finally {
lock.unlock();
}
LOG.debug("wait all InsertTask finished ...");
}
public void singalTaskFinish() {
lock.lock();
condition.signal();
lock.unlock();
}
@Override
public void destroy(Configuration writerSliceConfig) {
if(concurrentWriter!=null) {
concurrentWriter.destory();
}
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
DBUtil.closeDBResources(null, checkConnHolder.getConn());
checkConnHolder.destroy();
super.destroy(writerSliceConfig);
}
public class ConcurrentTableWriter {
private BlockingQueue<List<Record>> queue;
private List<InsertTask> insertTasks;
private Configuration config;
private ServerConnectInfo connectInfo;
private String rewriteRecordSql;
private AtomicLong totalTaskCount;
private AtomicLong finishTaskCount;
private final int threadCount;
public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) {
threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT);
queue = new LinkedBlockingQueue<List<Record>>(threadCount << 1);
insertTasks = new ArrayList<InsertTask>(threadCount);
this.config = config;
this.connectInfo = connInfo;
this.rewriteRecordSql = rewriteRecordSql;
this.totalTaskCount = new AtomicLong(0);
this.finishTaskCount = new AtomicLong(0);
}
public long getTotalTaskCount() {
return totalTaskCount.get();
}
public long getFinishTaskCount() {
return finishTaskCount.get();
}
public int getTaskQueueSize() {
return queue.size();
}
public void increFinishCount() {
finishTaskCount.incrementAndGet();
}
//should check after put all the task in the queue
public boolean checkFinish() {
long finishCount = finishTaskCount.get();
long totalCount = totalTaskCount.get();
return finishCount == totalCount;
}
public synchronized void start() {
for (int i = 0; i < threadCount; ++i) {
LOG.info("start {} insert task.", (i+1));
InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql, deleteColPos);
insertTask.setWriterTask(ConcurrentTableWriterTask.this);
insertTask.setWriter(this);
insertTasks.add(insertTask);
}
WriterThreadPool.executeBatch(insertTasks);
}
public void printStatistics() {
long insertTotalCost = 0;
long insertTotalCount = 0;
for (InsertTask task: insertTasks) {
insertTotalCost += task.getTotalCost();
insertTotalCount += task.getInsertCount();
}
long avgCost = 0;
if (insertTotalCount != 0) {
avgCost = insertTotalCost / insertTotalCount;
}
ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms",
insertTotalCount, insertTotalCost, avgCost);
}
public void addBatchRecords(final List<Record> records) throws InterruptedException {
boolean isSucc = false;
while (!isSucc) {
isSucc = queue.offer(records, 5, TimeUnit.SECONDS);
checkMemStore();
}
totalTaskCount.incrementAndGet();
}
public synchronized void destory() {
if (insertTasks != null) {
for(InsertTask task : insertTasks) {
task.setStop();
}
for(InsertTask task: insertTasks) {
task.destroy();
}
}
}
}
}

View File

@ -0,0 +1,287 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask.ConcurrentTableWriter;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
public class InsertTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class);
private ConcurrentTableWriterTask writerTask;
private ConcurrentTableWriter writer;
private String writeRecordSql;
private long totalCost = 0;
private long insertCount = 0;
private Queue<List<Record>> queue;
private boolean isStop;
private ConnHolder connHolder;
private final long taskId;
private ServerConnectInfo connInfo;
// 失败重试次数
private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT;
private boolean printCost = Config.DEFAULT_PRINT_COST;
private long costBound = Config.DEFAULT_COST_BOUND;
private List<Pair<String, int[]>> deleteMeta;
public InsertTask(
final long taskId,
Queue<List<Record>> recordsQueue,
Configuration config,
ServerConnectInfo connectInfo,
String writeRecordSql,
List<Pair<String, int[]>> deleteMeta) {
this.taskId = taskId;
this.queue = recordsQueue;
this.connInfo = connectInfo;
failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT);
printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST);
costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND);
this.connHolder = new ObClientConnHolder(config, connInfo.jdbcUrl,
connInfo.getFullUserName(), connInfo.password);
this.writeRecordSql = writeRecordSql;
this.isStop = false;
this.deleteMeta = deleteMeta;
}
void setWriterTask(ConcurrentTableWriterTask writerTask) {
this.writerTask = writerTask;
}
void setWriter(ConcurrentTableWriter writer) {
this.writer = writer;
}
private boolean isStop() { return isStop; }
public void setStop() { isStop = true; }
public long getTotalCost() { return totalCost; }
public long getInsertCount() { return insertCount; }
@Override
public void run() {
Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId()));
LOG.debug("Task {} start to execute...", taskId);
while (!isStop()) {
try {
List<Record> records = queue.poll();
if (null != records) {
doMultiInsert(records, this.printCost, this.costBound);
} else if (writerTask.isFinished()) {
writerTask.singalTaskFinish();
LOG.debug("not more task, thread exist ...");
break;
} else {
TimeUnit.MILLISECONDS.sleep(5);
}
} catch (InterruptedException e) {
LOG.debug("TableWriter is interrupt");
} catch (Exception e) {
LOG.warn("ERROR UNEXPECTED {}", e);
}
}
LOG.debug("Thread exist...");
}
public void destroy() {
connHolder.destroy();
};
public void calStatistic(final long cost) {
writer.increFinishCount();
++insertCount;
totalCost += cost;
if (this.printCost && cost > this.costBound) {
LOG.info("slow multi insert cost {}ms", cost);
}
}
private void doDelete(Connection conn, final List<Record> buffer) throws SQLException {
if(deleteMeta == null || deleteMeta.size() == 0) {
return;
}
for (int i = 0; i < deleteMeta.size(); i++) {
String deleteSql = deleteMeta.get(i).getKey();
int[] valueIdx = deleteMeta.get(i).getValue();
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(deleteSql);
StringBuilder builder = new StringBuilder();
for (Record record : buffer) {
int bindIndex = 0;
for (int idx : valueIdx) {
writerTask.fillStatementIndex(ps, bindIndex++, idx, record.getColumn(idx));
builder.append(record.getColumn(idx).asString()).append(",");
}
ps.addBatch();
}
LOG.debug("delete values: " + builder.toString());
ps.executeBatch();
} catch (SQLException ex) {
LOG.error("SQL Exception when delete records with {}", deleteSql, ex);
throw ex;
} finally {
DBUtil.closeDBResources(ps, null);
}
}
}
public void doMultiInsert(final List<Record> buffer, final boolean printCost, final long restrict) {
checkMemstore();
connHolder.initConnection();
Connection conn = connHolder.getConn();
boolean success = false;
long cost = 0;
long startTime = 0;
try {
for (int i = 0; i < failTryCount; ++i) {
if (i > 0) {
try {
int sleep = i >= 9 ? 500 : 1 << i;//不明白为什么要sleep 500s
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
LOG.info("thread interrupted ..., ignore");
}
connHolder.initConnection();
conn = connHolder.getConn();
LOG.info("retry {}, start do batch insert, size={}", i, buffer.size());
checkMemstore();
}
startTime = System.currentTimeMillis();
PreparedStatement ps = null;
try {
conn.setAutoCommit(false);
// do delete if necessary
doDelete(conn, buffer);
ps = conn.prepareStatement(writeRecordSql);
for (Record record : buffer) {
ps = writerTask.fillStatement(ps, record);
ps.addBatch();
}
ps.executeBatch();
conn.commit();
success = true;
cost = System.currentTimeMillis() - startTime;
calStatistic(cost);
break;
} catch (SQLException e) {
LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e);
if (i == 0 || i > 10 ) {
for (Record record : buffer) {
LOG.warn("ERROR : record {}", record);
}
}
// 按照错误码分类分情况处理
// 如果是OB系统级异常,则需要重建连接
boolean fatalFail = ObWriterUtils.isFatalError(e);
if (fatalFail) {
ObWriterUtils.sleep(300000);
connHolder.reconnect();
// 如果是可恢复的异常,则重试
} else if (ObWriterUtils.isRecoverableError(e)) {
conn.rollback();
ObWriterUtils.sleep(60000);
} else {// 其它异常直接退出,采用逐条写入方式
conn.rollback();
ObWriterUtils.sleep(1000);
break;
}
} catch (Exception e) {
e.printStackTrace();
LOG.warn("Insert error unexpected {}", e);
} finally {
DBUtil.closeDBResources(ps, null);
}
}
} catch (SQLException e) {
LOG.warn("ERROR:retry failSql State ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e);
}
if (!success) {
try {
LOG.info("do one insert");
conn = connHolder.reconnect();
doOneInsert(conn, buffer);
cost = System.currentTimeMillis() - startTime;
calStatistic(cost);
} finally {
}
}
}
// process one row, delete before insert
private void doOneInsert(Connection connection, List<Record> buffer) {
List<PreparedStatement> deletePstmtList = new ArrayList();
PreparedStatement preparedStatement = null;
try {
connection.setAutoCommit(false);
if (deleteMeta != null && deleteMeta.size() > 0) {
for (int i = 0; i < deleteMeta.size(); i++) {
String deleteSql = deleteMeta.get(i).getKey();
deletePstmtList.add(connection.prepareStatement(deleteSql));
}
}
preparedStatement = connection.prepareStatement(this.writeRecordSql);
for (Record record : buffer) {
try {
for (int i = 0; i < deletePstmtList.size(); i++) {
PreparedStatement deleteStmt = deletePstmtList.get(i);
int[] valueIdx = deleteMeta.get(i).getValue();
int bindIndex = 0;
for (int idx : valueIdx) {
writerTask.fillStatementIndex(deleteStmt, bindIndex++, idx, record.getColumn(idx));
}
deleteStmt.execute();
}
preparedStatement = writerTask.fillStatement(preparedStatement, record);
preparedStatement.execute();
connection.commit();
} catch (SQLException e) {
writerTask.collectDirtyRecord(record, e);
} finally {
// 此处不应该关闭statement后续的数据还需要用到
}
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(preparedStatement, null);
for (PreparedStatement pstmt : deletePstmtList) {
DBUtil.closeDBResources(pstmt, null);
}
}
}
private void checkMemstore() {
while (writerTask.isMemStoreFull()) {
ObWriterUtils.sleep(30000);
}
}
}

View File

@ -0,0 +1,152 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
public class SingleTableWriterTask extends CommonRdbmsWriter.Task {
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
// 失败重试次数
private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT;
private ConnHolder connHolder;
private String obWriteMode = "update";
private boolean isOracleCompatibleMode = false;
private String obUpdateColumns = null;
public SingleTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
}
@Override
public void init(Configuration config) {
super.init(config);
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
this.connHolder = new ObClientConnHolder(config, jdbcUrl, username, password);
//ob1.0里面
this.batchSize = Math.min(128, config.getInt(Key.BATCH_SIZE, 128));
LOG.info("In Write OceanBase 1.0, Real Batch Size : " + this.batchSize);
isOracleCompatibleMode = ObWriterUtils.isOracleMode();
LOG.info("isOracleCompatibleMode=" + isOracleCompatibleMode);
obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null);
obWriteMode = config.getString(Config.OB_WRITE_MODE, "update");
if (isOracleCompatibleMode) {
obWriteMode = "insert";
}
rewriteSql();
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
}
protected void doBatchInsert(Connection conn, List<Record> buffer) throws SQLException {
doBatchInsert(buffer);
}
private void doBatchInsert(List<Record> buffer) {
Connection conn = connHolder.getConn();
// 检查内存
checkMemstore(conn);
boolean success = false;
try {
for (int i = 0; i < failTryCount; i++) {
PreparedStatement ps = null;
try {
conn.setAutoCommit(false);
ps = conn.prepareStatement(this.writeRecordSql);
for (Record record : buffer) {
ps = fillPreparedStatement(ps, record);
ps.addBatch();
}
ps.executeBatch();
conn.commit();
// 标记执行正常,且退出for循环
success = true;
break;
} catch (SQLException e) {
// 如果是OB系统级异常,则需要重建连接
boolean fatalFail = ObWriterUtils.isFatalError(e);
if (fatalFail) {
LOG.warn("Fatal exception in OB. Roll back this write and hibernate for five minutes. SQLState: {}. ErrorCode: {}",
e.getSQLState(), e.getErrorCode(), e);
ObWriterUtils.sleep(300000);
DBUtil.closeDBResources(null, conn);
conn = connHolder.reconnect();
// 如果是可恢复的异常,则重试
} else if (ObWriterUtils.isRecoverableError(e)) {
LOG.warn("Recoverable exception in OB. Roll back this write and hibernate for one minute. SQLState: {}. ErrorCode: {}",
e.getSQLState(), e.getErrorCode(), e);
conn.rollback();
ObWriterUtils.sleep(60000);
// 其它异常直接退出,采用逐条写入方式
} else {
LOG.warn("Exception in OB. Roll back this write and hibernate for one second. Write and submit the records one by one. SQLState: {}. ErrorCode: {}",
e.getSQLState(), e.getErrorCode(), e);
conn.rollback();
ObWriterUtils.sleep(1000);
break;
}
} finally {
DBUtil.closeDBResources(ps, null);
}
}
} catch (SQLException e) {
LOG.warn("Exception in OB. Roll back this write. Write and submit the records one by one. SQLState: {}. ErrorCode: {}",
e.getSQLState(), e.getErrorCode(), e);
}
if (!success) {
doOneInsert(conn, buffer);
}
}
private void checkMemstore(Connection conn) {
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
while (ObWriterUtils.isMemstoreFull(conn, memstoreThreshold)) {
LOG.warn("OB memstore is full,sleep 60 seconds, threshold=" + memstoreThreshold);
ObWriterUtils.sleep(60000);
}
lastCheckMemstoreTime = now;
}
@Override
public void destroy(Configuration writerSliceConfig) {
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
super.destroy(writerSliceConfig);
}
}

View File

@ -0,0 +1,37 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WriterThreadPool {
private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class);
private static ExecutorService executorService = Executors.newCachedThreadPool();
public WriterThreadPool() {
}
public static ExecutorService getInstance() {
return executorService;
}
public static synchronized void shutdown() {
LOG.info("start shutdown executor service...");
executorService.shutdown();
LOG.info("shutdown executor service success...");
}
public static synchronized void execute(InsertTask task) {
executorService.execute(task);
}
public static synchronized void executeBatch(List<InsertTask> tasks) {
for (InsertTask task : tasks) {
executorService.execute(task);
}
}
}

View File

@ -0,0 +1,71 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class DbUtils {
protected static final Logger LOG = LoggerFactory.getLogger(DbUtils.class);
public static String fetchSingleValueWithRetry(Configuration config, String query) {
final String username = config.getString(Key.USERNAME);
final String password = config.getString(Key.PASSWORD);
String jdbcUrl = config.getString(Key.JDBC_URL);
if(jdbcUrl == null) {
List<Object> conns = config.getList(Constant.CONN_MARK, Object.class);
Configuration connConf = Configuration.from(conns.get(0).toString());
jdbcUrl = connConf.getString(Key.JDBC_URL);
}
Connection conn = null;
PreparedStatement stmt = null;
ResultSet result = null;
boolean need_retry = false;
String value = null;
int retry = 0;
do {
try {
if (retry > 0) {
int sleep = retry > 9 ? 500 : 1 << retry;
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
}
LOG.warn("retry fetch value for {} the {} times", query, retry);
}
conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password);
stmt = conn.prepareStatement(query);
result = stmt.executeQuery();
if (result.next()) {
value = result.getString("Value");
} else {
throw new RuntimeException("no values returned for " + query);
}
LOG.info("value for query [{}] is [{}]", query, value);
break;
} catch (SQLException e) {
need_retry = true;
++retry;
LOG.warn("fetch value with {} error {}", query, e);
} finally {
DBUtil.closeDBResources(result, stmt, null);
}
} while (need_retry);
return value;
}
}

View File

@ -0,0 +1,390 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.util;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
public class ObWriterUtils {
protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?";
private static String compatibleMode = null;
public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) {
PreparedStatement ps = null;
ResultSet rs = null;
boolean result = false;
try {
String sysDbName = "oceanbase";
if (isOracleMode()) {
sysDbName = "sys";
}
ps = conn.prepareStatement(String.format(CHECK_MEMSTORE, sysDbName));
ps.setDouble(1, memstoreThreshold);
rs = ps.executeQuery();
// 只要有满足条件的,则表示当前租户 有个机器的memstore即将满
result = rs.next();
} catch (Throwable e) {
LOG.error("check memstore fail" + e.getMessage());
result = false;
} finally {
//do not need to close the statment in ob1.0
}
LOG.info("isMemstoreFull=" + result);
return result;
}
public static boolean isOracleMode(){
return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE));
}
public static String getCompatibleMode() {
return compatibleMode;
}
public static void setCompatibleMode(String mode) {
compatibleMode = mode;
}
private static String buildDeleteSql (String tableName, List<String> columns) {
StringBuilder builder = new StringBuilder("DELETE FROM ");
builder.append(tableName).append(" WHERE ");
for (int i = 0; i < columns.size(); i++) {
builder.append(columns.get(i)).append(" = ?");
if (i != columns.size() - 1) {
builder.append(" and ");
}
}
return builder.toString();
}
private static int[] getColumnIndex(List<String> columnsInIndex, List<String> allColumns) {
allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList());
int[] colIdx = new int[columnsInIndex.size()];
for (int i = 0; i < columnsInIndex.size(); i++) {
int index = allColumns.indexOf(columnsInIndex.get(i));
if (index < 0) {
throw new RuntimeException(
String.format("column {} is in unique or primary key but not in the column list.",
columnsInIndex.get(i)));
}
colIdx[i] = index;
}
return colIdx;
}
public static List<Pair<String, int[]>> buildDeleteSql(Connection conn, String dbName, String tableName,
List<String> columns) {
List<Pair<String, int[]>> deleteMeta = new ArrayList();
Map<String, List<String>> uniqueKeys = getAllUniqueIndex(conn, dbName, tableName);
for (Map.Entry<String, List<String>> entry : uniqueKeys.entrySet()) {
List<String> colNames = entry.getValue();
String deleteSql = buildDeleteSql(tableName, colNames);
int[] colIdx = getColumnIndex(colNames, columns);
LOG.info("delete sql [{}], column index: {}", deleteSql, Arrays.toString(colIdx));
deleteMeta.add(new ImmutablePair(deleteSql, colIdx));
}
return deleteMeta;
}
// this function is just for oracle mode
private static Map<String, List<String>> getAllUniqueIndex(Connection conn, String dbName, String tableName) {
Map<String, List<String>> uniqueKeys = new HashMap();
if (tableName.contains("\\.")) {
dbName = tableName.split("\\.")[0];
tableName = tableName.split("\\.")[1];
}
dbName = dbName.toUpperCase();
String sql = String.format("select cons.CONSTRAINT_NAME AS KEY_NAME, cols.COLUMN_NAME COLUMN_NAME " +
"from all_constraints cons, all_cons_columns cols " +
"WHERE cols.table_name = '%s' AND cons.constraint_type in('P', 'U') " +
" AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner " +
" AND cols.owner = '%s' " +
"Order by KEY_NAME, cols.POSITION", tableName, dbName);
LOG.info("get all unique keys by sql {}", sql);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
String keyName = rs.getString("Key_name");
String columnName = StringUtils.upperCase(rs.getString("Column_name"));
List<String> s = uniqueKeys.get(keyName);
if (s == null) {
s = new ArrayList();
uniqueKeys.put(keyName, s);
}
s.add(columnName);
}
} catch (Throwable e) {
LOG.error("show index from table fail :" + sql, e);
} finally {
asyncClose(rs, stmt, null);
}
return uniqueKeys;
}
/**
*
* @param tableName
* @param columnHolders
* @param conn
* @param writeMode
* @return
*/
public static String buildWriteSql(String tableName, List<String> columnHolders,
Connection conn, String writeMode, String obUpdateColumns) {
List<String> valueHolders = new ArrayList<String>(columnHolders.size());
for (int i = 0; i < columnHolders.size(); i++) {
valueHolders.add("?");
}
String writeDataSqlTemplate = new StringBuilder().append("INSERT INTO " + tableName + " (")
.append(StringUtils.join(columnHolders, ",")).append(") VALUES(")
.append(StringUtils.join(valueHolders, ",")).append(")").toString();
LOG.info("write mode: " + writeMode);
// update mode
if (!writeMode.equals("insert")) {
if (obUpdateColumns == null) {
Set<String> skipColumns = getSkipColumns(conn, tableName);
StringBuilder columnList = new StringBuilder();
for (String column : skipColumns) {
columnList.append(column).append(",");
}
LOG.info("Skip columns: " + columnList.toString());
writeDataSqlTemplate = writeDataSqlTemplate + onDuplicateKeyUpdateString(columnHolders, skipColumns);
} else {
LOG.info("Update columns: " + obUpdateColumns);
writeDataSqlTemplate = writeDataSqlTemplate + onDuplicateKeyUpdateString(obUpdateColumns);
}
}
return writeDataSqlTemplate;
}
private static Set<String> getSkipColumns(Connection conn, String tableName) {
String sql = "show index from " + tableName;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
Map<String, Set<String>> uniqueKeys = new HashMap<String, Set<String>>();
while (rs.next()) {
String nonUnique = rs.getString("Non_unique");
if (!"0".equals(nonUnique)) {
continue;
}
String keyName = rs.getString("Key_name");
String columnName = StringUtils.upperCase(rs.getString("Column_name"));
Set<String> s = uniqueKeys.get(keyName);
if (s == null) {
s = new HashSet();
uniqueKeys.put(keyName, s);
}
s.add(columnName);
}
// If the table has only one primary/unique key, just skip the column in the update list,
// it is safe since this primary/unique key does not change when the data in this inserting
// row conflicts with existing values.
if (uniqueKeys.size() == 1) {
return uniqueKeys.values().iterator().next();
} else if (uniqueKeys.size() > 1) {
// If this table has more than one primary/unique keys, then just skip the common columns in
// all primary/unique keys. These columns can be found in every the primary/unique keys so they
// must be intact when there are at least one primary/unique key conflicts between the new
// data and existing data. So keeping them unchanged is safe.
//
// We can not skip all the columns in primary/unique keys because there might be some fields
// which do not conflict with existing value. If we skip them in the update list of the INSERT
// statement, these fields will not get updated, then we will have some fields with new values
// while some with old values in the same row, which breaks data consistency.
Iterator<String> keyNameIterator = uniqueKeys.keySet().iterator();
Set<String> skipColumns = uniqueKeys.get(keyNameIterator.next());
while(keyNameIterator.hasNext()) {
skipColumns.retainAll(uniqueKeys.get(keyNameIterator.next()));
}
return skipColumns;
}
} catch (Throwable e) {
LOG.error("show index from table fail :" + sql, e);
} finally {
asyncClose(rs, stmt, null);
}
return Collections.emptySet();
}
/*
* build ON DUPLICATE KEY UPDATE sub clause from updateColumns user specified
*/
private static String onDuplicateKeyUpdateString(String updateColumns) {
if (updateColumns == null || updateColumns.length() < 1) {
return "";
}
StringBuilder builder = new StringBuilder();
builder.append(" ON DUPLICATE KEY UPDATE ");
List<String> list = new ArrayList<String>();
for (String column : updateColumns.split(",")) {
list.add(column + "=VALUES(" + column + ")");
}
builder.append(StringUtils.join(list, ','));
return builder.toString();
}
private static String onDuplicateKeyUpdateString(List<String> columnHolders, Set<String> skipColumns) {
if (columnHolders == null || columnHolders.size() < 1) {
return "";
}
StringBuilder builder = new StringBuilder();
builder.append(" ON DUPLICATE KEY UPDATE ");
List<String> list = new ArrayList<String>();
for (String column : columnHolders) {
// skip update columns
if (skipColumns.contains(column.toUpperCase())) {
continue;
}
list.add(column + "=VALUES(" + column + ")");
}
if (!list.isEmpty()) {
builder.append(StringUtils.join(list, ','));
} else {
// 如果除了UK 没有别的字段,则更新第一个字段
String column = columnHolders.get(0);
builder.append(column + "=VALUES(" + column + ")");
}
return builder.toString();
}
/**
* 休眠n毫秒
*
* @param ms
* 毫秒
*/
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
}
}
/**
* 致命错误
*
* @param e
* @return
*/
public static boolean isFatalError(SQLException e) {
String sqlState = e.getSQLState();
if (StringUtils.startsWith(sqlState, "08")) {
return true;
}
final int errorCode = Math.abs(e.getErrorCode());
switch (errorCode) {
// Communications Errors
case 1040: // ER_CON_COUNT_ERROR
case 1042: // ER_BAD_HOST_ERROR
case 1043: // ER_HANDSHAKE_ERROR
case 1047: // ER_UNKNOWN_COM_ERROR
case 1081: // ER_IPSOCK_ERROR
case 1129: // ER_HOST_IS_BLOCKED
case 1130: // ER_HOST_NOT_PRIVILEGED
// Authentication Errors
case 1045: // ER_ACCESS_DENIED_ERROR
// Resource errors
case 1004: // ER_CANT_CREATE_FILE
case 1005: // ER_CANT_CREATE_TABLE
case 1015: // ER_CANT_LOCK
case 1021: // ER_DISK_FULL
case 1041: // ER_OUT_OF_RESOURCES
case 1094: // Unknown thread id: %lu
// Out-of-memory errors
case 1037: // ER_OUTOFMEMORY
case 1038: // ER_OUT_OF_SORTMEMORY
return true;
}
if (StringUtils.isNotBlank(e.getMessage())) {
final String errorText = e.getMessage().toUpperCase();
if (errorCode == 0
&& (errorText.indexOf("COMMUNICATIONS LINK FAILURE") > -1
|| errorText.indexOf("COULD NOT CREATE CONNECTION") > -1)
|| errorText.indexOf("NO DATASOURCE") > -1 || errorText.indexOf("NO ALIVE DATASOURCE") > -1
|| errorText.indexOf("NO OPERATIONS ALLOWED AFTER CONNECTION CLOSED") > -1) {
return true;
}
}
return false;
}
/**
* 可恢复的错误
*
* @param e
* @return
*/
public static boolean isRecoverableError(SQLException e) {
int error = Math.abs(e.getErrorCode());
// 明确可恢复
if (white.contains(error)) {
return true;
}
// 明确不可恢复
if (black.contains(error)) {
return false;
}
// 超过4000的,都是OB特有的ErrorCode
return error > 4020;
}
private static Set<Integer> white = new HashSet<Integer>();
static {
int[] errList = { 1213, 1047, 1041, 1094, 4000, 4012 };
for (int err : errList) {
white.add(err);
}
}
// 不考虑4000以下的
private static Set<Integer> black = new HashSet<Integer>();
static {
int[] errList = { 4022, 4025, 4026, 4028, 4029, 4031, 4033, 4034, 4037, 4041, 4044 };
for (int err : errList) {
black.add(err);
}
}
/**
* 由于ObProxy存在bug,事务超时或事务被杀时,conn的close是没有响应的
*
* @param rs
* @param stmt
* @param conn
*/
public static void asyncClose(final ResultSet rs, final Statement stmt, final Connection conn) {
Thread t = new Thread() {
public void run() {
DBUtil.closeDBResources(rs, stmt, conn);
}
};
t.setDaemon(true);
t.start();
}
}

View File

@ -0,0 +1,6 @@
{
"name": "oceanbasev10writer",
"class": "com.alibaba.datax.plugin.writer.oceanbasev10writer.OceanBaseV10Writer",
"description": "write data into oceanbase with sql interface",
"developer": "oceanbase"
}

View File

@ -33,7 +33,7 @@
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>oceanbasereader/target/datax/</directory>
<directory>oceanbasev10reader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
@ -385,5 +385,12 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>oceanbasev10writer/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -48,4 +48,11 @@ public final class Key {
public static String SPLIT_FACTOR = "splitFactor";
}
public final static String WEAK_READ = "weakRead";
public final static String SAVE_POINT = "savePoint";
public final static String REUSE_CONN = "reuseConn";
public final static String PARTITION_NAME = "partitionName";
}

View File

@ -358,7 +358,7 @@ public final class DBUtil {
String url, String user, String pass, String socketTimeout) {
//ob10的处理
if (url.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && dataBaseType == DataBaseType.MySql) {
if (url.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = url.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length != 3) {
throw DataXException
@ -367,7 +367,7 @@ public final class DBUtil {
}
LOG.info("this is ob1_0 jdbc url.");
user = ss[1].trim() +":"+user;
url = ss[2];
url = ss[2].replace("jdbc:mysql:", "jdbc:oceanbase:");
LOG.info("this is ob1_0 jdbc url. user="+user+" :url="+url);
}

View File

@ -21,7 +21,8 @@ public enum DataBaseType {
ADS("ads","com.mysql.jdbc.Driver"),
ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"),
KingbaseES("kingbasees", "com.kingbase8.Driver"),
Oscar("oscar", "com.oscar.Driver");
Oscar("oscar", "com.oscar.Driver"),
OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver");
private String typeName;
@ -42,6 +43,7 @@ public enum DataBaseType {
switch (this) {
case MySql:
case DRDS:
case OceanBase:
suffix = "yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true";
if (jdbc.contains("?")) {
result = jdbc + "&" + suffix;

View File

@ -402,13 +402,20 @@ public class CommonRdbmsWriter {
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, record.getColumn(i));
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
return preparedStatement;
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, Column column) throws SQLException {
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, Column column) throws SQLException {
return fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqltype, null, column);
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
@ -451,8 +458,11 @@ public class CommonRdbmsWriter {
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (this.resultSetMetaData.getRight().get(columnIndex)
.equalsIgnoreCase("year")) {
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {

View File

@ -69,6 +69,7 @@
<module>opentsdbreader</module>
<module>cassandrareader</module>
<module>gdbreader</module>
<module>oceanbasev10reader</module>
<!-- writer -->
<module>mysqlwriter</module>
@ -99,6 +100,7 @@
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<module>oscarwriter</module>
<module>oceanbasev10writer</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>