diff --git a/oceanbasev10reader/pom.xml b/oceanbasev10reader/pom.xml new file mode 100644 index 00000000..49477241 --- /dev/null +++ b/oceanbasev10reader/pom.xml @@ -0,0 +1,97 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + oceanbasev10reader + com.alibaba.datax + 0.0.1-SNAPSHOT + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + mysql + mysql-connector-java + 5.1.40 + + + log4j + log4j + 1.2.16 + + + junit + junit + 4.11 + test + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/oceanbasev10reader/src/main/assembly/package.xml b/oceanbasev10reader/src/main/assembly/package.xml new file mode 100644 index 00000000..76673e91 --- /dev/null +++ b/oceanbasev10reader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/oceanbasev10reader + + + target/ + + oceanbasev10reader-0.0.1-SNAPSHOT.jar + + plugin/reader/oceanbasev10reader + + + + + + false + plugin/reader/oceanbasev10reader/libs + runtime + + + diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/Config.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/Config.java new file mode 100644 index 00000000..ca803c49 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/Config.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader; + +public interface Config { + // queryTimeoutSecond + String QUERY_TIMEOUT_SECOND = "memstoreCheckIntervalSecond"; + + int DEFAULT_QUERY_TIMEOUT_SECOND = 60 * 60 * 48;// 2天 + + // readBatchSize + String READ_BATCH_SIZE = "readBatchSize"; + + int DEFAULT_READ_BATCH_SIZE = 100000;// 10万 + + String RETRY_LIMIT = "retryLimit"; + int DEFAULT_RETRY_LIMIT = 10; +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java new file mode 100644 index 00000000..0a4934a1 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/OceanBaseReader.java @@ -0,0 +1,127 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader; + +import java.sql.Connection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderJob; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderTask; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; + +public class OceanBaseReader extends Reader { + + public static class Job extends Reader.Job { + private Configuration originalConfig = null; + private ReaderJob readerJob; + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + + Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE); + if (userConfigedFetchSize != null) { + LOG.warn("The [fetchSize] is not recognized, please use readBatchSize instead."); + } + + this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE); + + setDatabaseType(originalConfig); + + this.readerJob = new ReaderJob(); + this.readerJob.init(this.originalConfig); + } + + @Override + public void preCheck() { + init(); + this.readerJob.preCheck(this.originalConfig, ObReaderUtils.DATABASE_TYPE); + + } + + @Override + public List split(int adviceNumber) { + return this.readerJob.split(this.originalConfig, adviceNumber); + } + + @Override + public void post() { + this.readerJob.post(this.originalConfig); + } + + @Override + public void destroy() { + this.readerJob.destroy(this.originalConfig); + } + + private void setDatabaseType(Configuration config) { + String username = config.getString(Key.USERNAME); + String password = config.getString(Key.PASSWORD); + List conns = originalConfig.getList(Constant.CONN_MARK, Object.class); + Configuration connConf = Configuration.from(conns.get(0).toString()); + List jdbcUrls = connConf.getList(Key.JDBC_URL, String.class); + String jdbcUrl = jdbcUrls.get(0); + if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); + if (ss.length != 3) { + LOG.warn("unrecognized jdbc url: " + jdbcUrl); + return; + } + username = ss[1].trim() + ":" + username; + jdbcUrl = ss[2]; + } + // Use ob-client to get compatible mode. + try { + String obJdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:"); + Connection conn = DBUtil.getConnection(DataBaseType.OceanBase, obJdbcUrl, username, password); + String compatibleMode = ObReaderUtils.getCompatibleMode(conn); + if (ObReaderUtils.isOracleMode(compatibleMode)) { + ObReaderUtils.DATABASE_TYPE = DataBaseType.OceanBase; + } + } catch (Exception e){ + LOG.warn("error in get compatible mode, using mysql as default: " + e.getMessage()); + } + } + } + + public static class Task extends Reader.Task { + private Configuration readerSliceConfig; + private ReaderTask commonRdbmsReaderTask; + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + @Override + public void init() { + this.readerSliceConfig = super.getPluginJobConf(); + this.commonRdbmsReaderTask = new ReaderTask(super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderTask.init(this.readerSliceConfig); + + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE); + this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, super.getTaskPluginCollector(), + fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderTask.post(this.readerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderTask.destroy(this.readerSliceConfig); + } + } + +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java new file mode 100644 index 00000000..c56155f6 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderJob.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; + +import java.util.List; + +import com.alibaba.datax.common.constant.CommonConstant; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; + +public class ReaderJob extends CommonRdbmsReader.Job { + + public ReaderJob() { + super(ObReaderUtils.DATABASE_TYPE); + } + + @Override + public List split(Configuration originalConfig, int adviceNumber) { + List list = super.split(originalConfig, adviceNumber); + for (Configuration config : list) { + String jdbcUrl = config.getString(Key.JDBC_URL); + String obRegionName = getObRegionName(jdbcUrl); + config.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, obRegionName); + } + return list; + } + + private String getObRegionName(String jdbcUrl) { + if (jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { + String[] ss = jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); + if (ss.length >= 2) { + String tenant = ss[1].trim(); + String[] sss = tenant.split(":"); + return sss[0]; + } + } + return null; + } +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java new file mode 100644 index 00000000..073bb3cb --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java @@ -0,0 +1,301 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.statistics.PerfRecord; +import com.alibaba.datax.common.statistics.PerfTrace; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.util.RdbmsException; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; +import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext; + +public class ReaderTask extends CommonRdbmsReader.Task { + private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class); + private int taskGroupId = -1; + private int taskId = -1; + + private String username; + private String password; + private String jdbcUrl; + private String mandatoryEncoding; + private int queryTimeoutSeconds;// 查询超时 默认48小时 + private int readBatchSize; + private int retryLimit = 0; + private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; + private boolean reuseConn = false; + + public ReaderTask(int taskGroupId, int taskId) { + super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId); + this.taskGroupId = taskGroupId; + this.taskId = taskId; + } + + public void init(Configuration readerSliceConfig) { + /* for database connection */ + username = readerSliceConfig.getString(Key.USERNAME); + password = readerSliceConfig.getString(Key.PASSWORD); + jdbcUrl = readerSliceConfig.getString(Key.JDBC_URL); + queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND, + Config.DEFAULT_QUERY_TIMEOUT_SECOND); + // ob10的处理 + if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); + if (ss.length == 3) { + LOG.info("this is ob1_0 jdbc url."); + username = ss[1].trim() + ":" + username; + jdbcUrl = ss[2]; + } + } + + if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) { + jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 + compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; + } else { + jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 + } + LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl); + mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); + retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT); + LOG.info("retryLimit: "+ retryLimit); + } + + private void buildSavePoint(TaskContext context) { + if (!ObReaderUtils.isUserSavePointValid(context)) { + LOG.info("user save point is not valid, set to null."); + context.setUserSavePoint(null); + } + } + + /** + * + * 如果isTableMode && table有PK + *

+ * 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游) + *

+ * 否则,则使用旧模式 + */ + @Override + public void startRead(Configuration readerSliceConfig, RecordSender recordSender, + TaskPluginCollector taskPluginCollector, int fetchSize) { + String querySql = readerSliceConfig.getString(Key.QUERY_SQL); + String table = readerSliceConfig.getString(Key.TABLE); + PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl); + List columns = readerSliceConfig.getList(Key.COLUMN_LIST, String.class); + String where = readerSliceConfig.getString(Key.WHERE); + boolean weakRead = readerSliceConfig.getBool(Key.WEAK_READ, true); // default true, using weak read + String userSavePoint = readerSliceConfig.getString(Key.SAVE_POINT, null); + reuseConn = readerSliceConfig.getBool(Key.REUSE_CONN, false); + String partitionName = readerSliceConfig.getString(Key.PARTITION_NAME, null); + // 从配置文件中取readBatchSize,若无则用默认值 + readBatchSize = readerSliceConfig.getInt(Config.READ_BATCH_SIZE, Config.DEFAULT_READ_BATCH_SIZE); + // 不能少于1万 + if (readBatchSize < 10000) { + readBatchSize = 10000; + } + TaskContext context = new TaskContext(table, columns, where, fetchSize); + context.setQuerySql(querySql); + context.setWeakRead(weakRead); + context.setCompatibleMode(compatibleMode); + if (partitionName != null) { + context.setPartitionName(partitionName); + } + // Add the user save point into the context + context.setUserSavePoint(userSavePoint); + PerfRecord allPerf = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL); + allPerf.start(); + boolean isTableMode = readerSliceConfig.getBool(Constant.IS_TABLE_MODE); + try { + startRead0(isTableMode, context, recordSender, taskPluginCollector); + } finally { + ObReaderUtils.close(null, null, context.getConn()); + } + allPerf.end(context.getCost()); + // 目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间 + LOG.info("finished read record by Sql: [{}\n] {}.", context.getQuerySql(), jdbcUrl); + } + + private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender, + TaskPluginCollector taskPluginCollector) { + // 不是table模式 直接使用原来的做法 + if (!isTableMode) { + doRead(recordSender, taskPluginCollector, context); + return; + } + // check primary key index + Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password); + ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds); + context.setConn(conn); + try { + ObReaderUtils.initIndex(conn, context); + ObReaderUtils.matchPkIndexs(conn, context); + } catch (Throwable e) { + LOG.warn("fetch PkIndexs fail,table=" + context.getTable(), e); + } + // 如果不是table 且 pk不存在 则仍然使用原来的做法 + if (context.getPkIndexs() == null) { + doRead(recordSender, taskPluginCollector, context); + return; + } + + // setup the user defined save point + buildSavePoint(context); + + // 从这里开始就是 断点续读功能 + // while(true) { + // 正常读 (需 order by pk asc) + // 如果遇到失败,分两种情况: + // a)已读出记录,则开始走增量读逻辑 + // b)未读出记录,则走正常读逻辑(仍然需要order by pk asc) + // 正常结束 则 break + // } + context.setReadBatchSize(readBatchSize); + String getFirstQuerySql = ObReaderUtils.buildFirstQuerySql(context); + String appendQuerySql = ObReaderUtils.buildAppendQuerySql(conn, context); + LOG.warn("start table scan key : {}", context.getIndexName() == null ? "primary" : context.getIndexName()); + context.setQuerySql(getFirstQuerySql); + boolean firstQuery = true; + // 原来打算firstQuery时 limit 1 减少 + // 后来经过对比发现其实是多余的,因为: + // 1.假如走gmt_modified辅助索引,则直接索引扫描 不需要topN的order by + // 2.假如不走辅助索引,而是pk table scan,则减少排序规模并没有好处,因为下一次仍然要排序 + // 减少这个多余的优化tip 可以让代码更易读 + int retryCount = 0; + while (true) { + try { + boolean finish = doRead(recordSender, taskPluginCollector, context); + if (finish) { + break; + } + } catch (Throwable e) { + if (retryLimit == ++retryCount) { + throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e), + context.getQuerySql(), context.getTable(), username); + } + LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" + + context.getSavePoint() + ", error: "+ e.getMessage()); + ObReaderUtils.sleep(60000); // sleep 10s + } + // 假如原来的查询有查出数据,则改成增量查询 + if (firstQuery && context.getPkIndexs() != null && context.getSavePoint() != null) { + context.setQuerySql(appendQuerySql); + firstQuery = false; + } + } + DBUtil.closeDBResources(null, context.getConn()); + } + + private boolean isConnectionAlive(Connection conn) { + if (conn == null) { + return false; + } + Statement stmt = null; + ResultSet rs = null; + String sql = "select 1" + (compatibleMode == ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE ? " from dual" : ""); + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + rs.next(); + } catch (Exception ex) { + LOG.info("connection is not alive: " + ex.getMessage()); + return false; + } finally { + DBUtil.closeDBResources(rs, stmt, null); + } + return true; + } + + private boolean doRead(RecordSender recordSender, TaskPluginCollector taskPluginCollector, TaskContext context) { + LOG.info("exe sql: {}", context.getQuerySql()); + Connection conn = context.getConn(); + if (reuseConn && isConnectionAlive(conn)) { + LOG.info("connection is alive, will reuse this connection."); + } else { + LOG.info("Create new connection for reader."); + conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password); + ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds); + context.setConn(conn); + } + PreparedStatement ps = null; + ResultSet rs = null; + PerfRecord perfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY); + perfRecord.start(); + try { + ps = conn.prepareStatement(context.getQuerySql(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (context.getPkIndexs() != null && context.getSavePoint() != null) { + Record savePoint = context.getSavePoint(); + List point = ObReaderUtils.buildPoint(savePoint, context.getPkIndexs()); + ObReaderUtils.binding(ps, point); + if (LOG.isWarnEnabled()) { + List pointForLog = new ArrayList(); + for (Column c : point) { + pointForLog.add(c.asString()); + } + LOG.warn("{} save point : {}", context.getTable(), StringUtils.join(pointForLog, ',')); + } + } + // 打开流式接口 + ps.setFetchSize(context.getFetchSize()); + rs = ps.executeQuery(); + ResultSetMetaData metaData = rs.getMetaData(); + int columnNumber = metaData.getColumnCount(); + long lastTime = System.nanoTime(); + int count = 0; + for (; rs.next(); count++) { + context.addCost(System.nanoTime() - lastTime); + Record row = buildRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding, + taskPluginCollector); + // // 如果第一个record重复了,则不需要发送 + // if (count == 0 && + // ObReaderUtils.isPkEquals(context.getSavePoint(), row, + // context.getPkIndexs())) { + // continue; + // } + // 如果是querySql + if (context.getTransferColumnNumber() == -1 + || row.getColumnNumber() == context.getTransferColumnNumber()) { + recordSender.sendToWriter(row); + } else { + Record newRow = recordSender.createRecord(); + for (int i = 0; i < context.getTransferColumnNumber(); i++) { + newRow.addColumn(row.getColumn(i)); + } + recordSender.sendToWriter(newRow); + } + context.setSavePoint(row); + lastTime = System.nanoTime(); + } + LOG.info("end of sql: {}, " + count + "rows are read.", context.getQuerySql()); + return context.getReadBatchSize() <= 0 || count < readBatchSize; + } catch (Exception e) { + ObReaderUtils.close(null, null, context.getConn()); + context.setConn(null); + LOG.error("reader data fail", e); + throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(), + context.getTable(), username); + } finally { + perfRecord.end(); + if (reuseConn) { + ObReaderUtils.close(rs, ps, null); + } else { + ObReaderUtils.close(rs, ps, conn); + } + } + } +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java new file mode 100644 index 00000000..2290fb43 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -0,0 +1,697 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.element.BoolColumn; +import com.alibaba.datax.common.element.BytesColumn; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DateColumn; +import com.alibaba.datax.common.element.DoubleColumn; +import com.alibaba.datax.common.element.LongColumn; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; + +public class ObReaderUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ObReaderUtils.class); + + final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode"; + final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE"; + final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL"; + + public static DataBaseType DATABASE_TYPE = DataBaseType.MySql; + + public static void initConn4Reader(Connection conn, long queryTimeoutSeconds) { + String setQueryTimeout = "set ob_query_timeout=" + (queryTimeoutSeconds * 1000 * 1000L); + String setTrxTimeout = "set ob_trx_timeout=" + ((queryTimeoutSeconds + 5) * 1000 * 1000L); + Statement stmt = null; + try { + conn.setAutoCommit(true); + stmt = conn.createStatement(); + stmt.execute(setQueryTimeout); + stmt.execute(setTrxTimeout); + LOG.warn("setAutoCommit=true;"+setQueryTimeout+";"+setTrxTimeout+";"); + } catch (Throwable e) { + LOG.warn("initConn4Reader fail", e); + } finally { + DBUtil.closeDBResources(stmt, null); + } + } + + public static void sleep(int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } + + /** + * + * @param conn + * @param context + */ + public static void matchPkIndexs(Connection conn, TaskContext context) { + String[] pkColumns = getPkColumns(conn, context); + if (ArrayUtils.isEmpty(pkColumns)) { + LOG.warn("table=" + context.getTable() + " has no primary key"); + return; + } + List columns = context.getColumns(); + // 最后参与排序的索引列 + context.setPkColumns(pkColumns); + int[] pkIndexs = new int[pkColumns.length]; + for (int i = 0, n = pkColumns.length; i < n; i++) { + String pkc = pkColumns[i]; + int j = 0; + for (int k = columns.size(); j < k; j++) { + // 如果用户定义的 columns中 带有 ``,也不影响, + // 最多只是在select里多加了几列PK column + if (StringUtils.equalsIgnoreCase(pkc, columns.get(j))) { + pkIndexs[i] = j; + break; + } + } + // 到这里 说明主键列不在columns中,则主动追加到尾部 + if (j == columns.size()) { + columns.add(pkc); + pkIndexs[i] = columns.size() - 1; + } + } + context.setPkIndexs(pkIndexs); + } + + private static String[] getPkColumns(Connection conn, TaskContext context) { + String tableName = context.getTable(); + String sql = "show index from " + tableName + " where Key_name='PRIMARY'"; + if (isOracleMode(context.getCompatibleMode())) { + tableName = tableName.toUpperCase(); + sql = "SELECT cols.column_name Column_name "+ + "FROM all_constraints cons, all_cons_columns cols " + + "WHERE cols.table_name = '" + tableName+ "' AND cons.constraint_type = 'P' " + + "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; + } + LOG.info("get primary key by sql: " + sql); + Statement ps = null; + ResultSet rs = null; + List realIndex = new ArrayList(); + realIndex.addAll(context.getSecondaryIndexColumns()); + try { + ps = conn.createStatement(); + rs = ps.executeQuery(sql); + while (rs.next()) { + String columnName = StringUtils.lowerCase(rs.getString("Column_name")); + if (!realIndex.contains(columnName)) { + realIndex.add(columnName); + } + } + String[] pks = new String[realIndex.size()]; + realIndex.toArray(pks); + return pks; + } catch (Throwable e) { + LOG.error("show index from table fail :" + sql, e); + } finally { + close(rs, ps, null); + } + return null; + } + + /** + * 首次查的SQL + * + * @param context + * @return + */ + public static String buildFirstQuerySql(TaskContext context) { + String userSavePoint = context.getUserSavePoint(); + String indexName = context.getIndexName(); + String sql = "select "; + boolean weakRead = context.getWeakRead(); + if (StringUtils.isNotEmpty(indexName)) { + String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+"; + sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ "; + } else if (weakRead){ + sql += " /*+READ_CONSISTENCY(WEAK)*/ "; + } + sql += StringUtils.join(context.getColumns(), ','); + sql += " from " + context.getTable(); + if (context.getPartitionName() != null) { + sql += String.format(" partition(%s) ", context.getPartitionName()); + } + if (StringUtils.isNotEmpty(context.getWhere())) { + sql += " where " + context.getWhere(); + } + + if (userSavePoint != null && userSavePoint.length() != 0) { + userSavePoint = userSavePoint.replace("=", ">"); + sql += (StringUtils.isNotEmpty(context.getWhere()) ? " and " : " where ") + userSavePoint; + } + + sql += " order by " + StringUtils.join(context.getPkColumns(), ',') + " asc"; + + // Using sub-query to apply rownum < readBatchSize since where has higher priority than order by + if (ObReaderUtils.isOracleMode(context.getCompatibleMode()) && context.getReadBatchSize() != -1) { + sql = String.format("select * from (%s) where rownum <= %d", sql, context.getReadBatchSize()); + } + + return sql; + } + + /** + * 增量查的SQL + * + * @param conn + * + * @param context + * @return sql + */ + public static String buildAppendQuerySql(Connection conn, TaskContext context) { + String indexName = context.getIndexName(); + boolean weakRead = context.getWeakRead(); + String sql = "select "; + if (StringUtils.isNotEmpty(indexName)) { + String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+"; + sql += " /*"+ weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ "; + } else if (weakRead){ + sql += " /*+READ_CONSISTENCY(WEAK)*/ "; + } + sql += StringUtils.join(context.getColumns(), ',') + " from " + context.getTable(); + + if (context.getPartitionName() != null) { + sql += String.format(" partition(%s) ", context.getPartitionName()); + } + + sql += " where "; + String append = "(" + StringUtils.join(context.getPkColumns(), ',') + ") > (" + + buildPlaceHolder(context.getPkColumns().length) + ")"; + + if (StringUtils.isNotEmpty(context.getWhere())) { + sql += "(" + context.getWhere() + ") and "; + } + + sql = String.format("%s %s order by %s asc", sql, append, StringUtils.join(context.getPkColumns(), ',')); + + // Using sub-query to apply rownum < readBatchSize since where has higher priority than order by + if (ObReaderUtils.isOracleMode(context.getCompatibleMode()) && context.getReadBatchSize() != -1) { + sql = String.format("select * from (%s) where rownum <= %d", sql, context.getReadBatchSize()); + } + + return sql; + } + + /** + * check if the userSavePoint is valid + * + * @param context + * @return true - valid, false - invalid + */ + public static boolean isUserSavePointValid(TaskContext context) { + String userSavePoint = context.getUserSavePoint(); + if (userSavePoint == null || userSavePoint.length() == 0) { + LOG.info("user save point is empty!"); + return false; + } + + LOG.info("validating user save point: " + userSavePoint); + + final String patternString = "(.+)=(.+)"; + Pattern parttern = Pattern.compile(patternString); + Matcher matcher = parttern.matcher(userSavePoint); + if (!matcher.find()) { + LOG.error("user save point format is not correct: " + userSavePoint); + return false; + } + + List columnsInUserSavePoint = getColumnsFromUserSavePoint(userSavePoint); + List valuesInUserSavePoint = getValuesFromUserSavePoint(userSavePoint); + if (columnsInUserSavePoint.size() == 0 || valuesInUserSavePoint.size() == 0 || + columnsInUserSavePoint.size() != valuesInUserSavePoint.size()) { + LOG.error("number of columns and values in user save point are different:" + userSavePoint); + return false; + } + + String where = context.getWhere(); + if (StringUtils.isNotEmpty(where)) { + for (String column : columnsInUserSavePoint) { + if (where.contains(column)) { + LOG.error("column " + column + " is conflict with where: " + where); + return false; + } + } + } + + // Columns in userSavePoint must be the selected index. + String[] pkColumns = context.getPkColumns(); + if (pkColumns.length != columnsInUserSavePoint.size()) { + LOG.error("user save point is not on the selected index."); + return false; + } + + for (String column : columnsInUserSavePoint) { + boolean found = false; + for (String pkCol : pkColumns) { + if (pkCol.equals(column)) { + found = true; + break; + } + } + if (!found) { + LOG.error("column " + column + " is not on the selected index."); + return false; + } + } + + return true; + } + + private static String removeBracket(String str) { + final char leftBracket = '('; + final char rightBracket = ')'; + if (str != null && str.contains(String.valueOf(leftBracket)) && str.contains(String.valueOf(rightBracket)) && + str.indexOf(leftBracket) < str.indexOf(rightBracket)) { + return str.substring(str.indexOf(leftBracket)+1, str.indexOf(rightBracket)); + } + return str; + } + + private static List getColumnsFromUserSavePoint(String userSavePoint) { + return Arrays.asList(removeBracket(userSavePoint.split("=")[0]).split(",")); + } + + private static List getValuesFromUserSavePoint(String userSavePoint) { + return Arrays.asList(removeBracket(userSavePoint.split("=")[1]).split(",")); + } + + /** + * 先解析成where + *

+ * 再判断是否存在索引 + * + * @param conn + * @param context + * @return + */ + public static void initIndex(Connection conn, TaskContext context) { + if (StringUtils.isEmpty(context.getWhere())) { + return; + } + SQLExpr expr = SQLUtils.toSQLExpr(context.getWhere(), "mysql"); + LOG.info("expr: " + expr); + List allColumnsInTab = getAllColumnFromTab(conn, context.getTable()); + List allColNames = getColNames(allColumnsInTab, expr); + + if (allColNames == null) { + return; + } + + // Remove the duplicated column names + Set colNames = new TreeSet(); + for (String colName : allColNames) { + if (!colNames.contains(colName)) { + colNames.add(colName); + } + } + List indexNames = getIndexName(conn, context.getTable(), colNames, context.getCompatibleMode()); + findBestIndex(conn, indexNames, context.getTable(), context); + } + + private static List getAllColumnFromTab(Connection conn, String tableName) { + String sql = "show columns from " + tableName; + Statement stmt = null; + ResultSet rs = null; + List allColumns = new ArrayList(); + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + while (rs.next()) { + allColumns.add(rs.getString("Field").toUpperCase()); + } + } catch (Exception e) { + LOG.warn("fail to get all columns from table " + tableName, e); + } finally { + close(rs, stmt, null); + } + + LOG.info("all columns in tab: " + String.join(",", allColumns)); + return allColumns; + } + + /** + * 找出where条件中的列名,目前仅支持全部为and条件,并且操作符为大于、大约等于、等于、小于、小于等于和不等于的表达式。 + * + * test coverage: - c6 = 20180710 OR c4 = 320: no index selected - 20180710 + * = c6: correct index selected - 20180710 = c6 and c4 = 320 or c2 < 100: no + * index selected + * + * @param expr + * @return + */ + private static List getColNames(List allColInTab, SQLExpr expr) { + List colNames = new ArrayList(); + if (expr instanceof SQLBinaryOpExpr) { + SQLBinaryOpExpr exp = (SQLBinaryOpExpr) expr; + if (exp.getOperator() == SQLBinaryOperator.BooleanAnd) { + List leftColumns = getColNames(allColInTab, exp.getLeft()); + List rightColumns = getColNames(allColInTab, exp.getRight()); + if (leftColumns == null || rightColumns == null) { + return null; + } + colNames.addAll(leftColumns); + colNames.addAll(rightColumns); + } else if (exp.getOperator() == SQLBinaryOperator.GreaterThan + || exp.getOperator() == SQLBinaryOperator.GreaterThanOrEqual + || exp.getOperator() == SQLBinaryOperator.Equality + || exp.getOperator() == SQLBinaryOperator.LessThan + || exp.getOperator() == SQLBinaryOperator.LessThanOrEqual + || exp.getOperator() == SQLBinaryOperator.NotEqual) { + // only support simple comparison operators + String left = SQLUtils.toMySqlString(exp.getLeft()).toUpperCase(); + String right = SQLUtils.toMySqlString(exp.getRight()).toUpperCase(); + LOG.debug("left: " + left + ", right: " + right); + if (allColInTab.contains(left)) { + colNames.add(left); + } + + if (allColInTab.contains(right)) { + colNames.add(right); + } + } else { + // unsupported operators + return null; + } + } + + return colNames; + } + + private static Map> getAllIndex(Connection conn, String tableName, String compatibleMode) { + Map> allIndex = new HashMap>(); + String sql = "show index from " + tableName; + if (isOracleMode(compatibleMode)) { + tableName = tableName.toUpperCase(); + sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " + + "from dba_ind_columns where TABLE_NAME = '" + tableName +"' " + + " union all " + + "SELECT DISTINCT " + + "CASE " + + "WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " + + "WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " + + "ELSE '' " + + "END AS Key_name, " + + "cols.column_name Column_name " + + "FROM all_constraints cons, all_cons_columns cols " + + "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " + + "AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner"; + } + Statement stmt = null; + ResultSet rs = null; + + try { + LOG.info("running sql to get index: " + sql); + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + while (rs.next()) { + String keyName = rs.getString("Key_name"); + String colName = rs.getString("Column_name").toUpperCase(); + if (allIndex.containsKey(keyName)) { + allIndex.get(keyName).add(colName); + } else { + List allColumns = new ArrayList(); + allColumns.add(colName); + allIndex.put(keyName, allColumns); + } + } + + // add primary key to all index + if (allIndex.containsKey("PRIMARY")) { + List colsInPrimary = allIndex.get("PRIMARY"); + for (String keyName : allIndex.keySet()) { + if (keyName.equals("PRIMARY")) { + continue; + } + allIndex.get(keyName).addAll(colsInPrimary); + } + } + } catch (Exception e) { + LOG.error("fail to get all keys from table" + sql, e); + } finally { + close(rs, stmt, null); + } + + LOG.info("all index: " + allIndex.toString()); + return allIndex; + } + + /** + * + * @param conn + * @param table + * @param colNamesInCondition + * @return + */ + private static List getIndexName(Connection conn, String table, + Set colNamesInCondition, String compatibleMode) { + List indexNames = new ArrayList(); + if (colNamesInCondition == null || colNamesInCondition.size() == 0) { + LOG.info("there is no qulified conditions in the where clause, skip index selection."); + return indexNames; + } + + LOG.info("columNamesInConditions: " + String.join(",", colNamesInCondition)); + + Map> allIndex = getAllIndex(conn, table, compatibleMode); + for (String keyName : allIndex.keySet()) { + boolean indexNotMatch = false; + // If the index does not have all the column in where conditions, it + // can not be chosen + // the selected index must start with the columns in where condition + if (allIndex.get(keyName).size() < colNamesInCondition.size()) { + indexNotMatch = true; + } else { + // the the first number columns of this index + int num = colNamesInCondition.size(); + for (String colName : allIndex.get(keyName)) { + if (!colNamesInCondition.contains(colName)) { + indexNotMatch = true; + break; + } + if (--num == 0) { + break; + } + } + } + + if (indexNotMatch) { + continue; + } else { + indexNames.add(keyName); + } + } + + return indexNames; + } + + /** + * 以 column开头的索引,可能有多个,也可能存在多列的情形 + *

+ * 所以,需要选择列数最少的 + * + * @param indexNames + * @param context + */ + private static void findBestIndex(Connection conn, List indexNames, String table, TaskContext context) { + if (indexNames.size() == 0) { + LOG.warn("table has no index."); + return; + } + + Map> allIndexs = new HashMap>(); + String sql = "show index from " + table + " where key_name in (" + buildPlaceHolder(indexNames.size()) + ")"; + if (isOracleMode(context.getCompatibleMode())) { + Map> allIndexInTab = getAllIndex(conn, table, context.getCompatibleMode()); + for (String indexName : indexNames) { + if (allIndexInTab.containsKey(indexName)) { + Map index = new TreeMap(); + List columnList = allIndexInTab.get(indexName); + for (int i = 1; i <= columnList.size(); i++) { + index.put(i, columnList.get(i-1)); + } + allIndexs.put(indexName, index); + } else { + LOG.error("index does not exist: " + indexName); + } + } + } else { + PreparedStatement ps = null; + ResultSet rs = null; + try { + ps = conn.prepareStatement(sql); + for (int i = 0, n = indexNames.size(); i < n; i++) { + ps.setString(i + 1, indexNames.get(i)); + } + rs = ps.executeQuery(); + while (rs.next()) { + String keyName = rs.getString("Key_name"); + Map index = allIndexs.get(keyName); + if (index == null) { + index = new TreeMap(); + allIndexs.put(keyName, index); + } + int keyInIndex = rs.getInt("Seq_in_index"); + String column = rs.getString("Column_name"); + index.put(keyInIndex, column); + } + } catch (Throwable e) { + LOG.error("show index from table fail :" + sql, e); + } finally { + close(rs, ps, null); + } + } + + LOG.info("possible index:" + allIndexs + ",where:" + context.getWhere()); + + Entry> chooseIndex = null; + int columnCount = Integer.MAX_VALUE; + for (Entry> entry : allIndexs.entrySet()) { + if (entry.getValue().size() < columnCount) { + columnCount = entry.getValue().size(); + chooseIndex = entry; + } + } + + if (chooseIndex != null) { + LOG.info("choose index name:" + chooseIndex.getKey() + ",columns:" + chooseIndex.getValue()); + context.setIndexName(chooseIndex.getKey()); + context.setSecondaryIndexColumns(new ArrayList(chooseIndex.getValue().values())); + } + } + + /** + * 由于ObProxy存在bug,事务超时或事务被杀时,conn的close是没有响应的 + * + * @param rs + * @param stmt + * @param conn + */ + public static void close(final ResultSet rs, final Statement stmt, final Connection conn) { + DBUtil.closeDBResources(rs, stmt, conn); + } + + /** + * 判断是否重复record + * + * @param savePoint + * @param row + * @param pkIndexs + * @return + */ + public static boolean isPkEquals(Record savePoint, Record row, int[] pkIndexs) { + if (savePoint == null || row == null) { + return false; + } + try { + for (int index : pkIndexs) { + Object left = savePoint.getColumn(index).getRawData(); + Object right = row.getColumn(index).getRawData(); + if (!left.equals(right)) { + return false; + } + } + } catch (Throwable e) { + return false; + } + return true; + } + + public static String buildPlaceHolder(int n) { + if (n <= 0) { + return ""; + } + StringBuilder str = new StringBuilder(2 * n); + str.append('?'); + for (int i = 1; i < n; i++) { + str.append(",?"); + } + return str.toString(); + } + + public static void binding(PreparedStatement ps, List list) throws SQLException { + for (int i = 0, n = list.size(); i < n; i++) { + Column c = list.get(i); + if(c instanceof BoolColumn){ + ps.setLong(i + 1, ((BoolColumn)c).asLong()); + }else if(c instanceof BytesColumn){ + ps.setBytes(i + 1, ((BytesColumn)c).asBytes()); + }else if(c instanceof DateColumn){ + ps.setTimestamp(i + 1, new Timestamp(((DateColumn)c).asDate().getTime())); + }else if(c instanceof DoubleColumn){ + ps.setDouble(i + 1, ((DoubleColumn)c).asDouble()); + }else if(c instanceof LongColumn){ + ps.setLong(i + 1, ((LongColumn)c).asLong()); + }else if(c instanceof StringColumn){ + ps.setString(i + 1, ((StringColumn)c).asString()); + }else{ + ps.setObject(i + 1, c.getRawData()); + } + } + } + + public static List buildPoint(Record savePoint, int[] pkIndexs) { + List result = new ArrayList(pkIndexs.length); + for (int i = 0, n = pkIndexs.length; i < n; i++) { + result.add(savePoint.getColumn(pkIndexs[i])); + } + return result; + } + + public static String getCompatibleMode(Connection conn) { + String compatibleMode = OB_COMPATIBLE_MODE_MYSQL; + String getCompatibleModeSql = "SHOW VARIABLES LIKE 'ob_compatibility_mode'"; + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(getCompatibleModeSql); + if (rs.next()) { + compatibleMode = rs.getString("VALUE"); + } + } catch (Exception e) { + LOG.error("fail to get ob compatible mode, using mysql as default: " + e.getMessage()); + } finally { + DBUtil.closeDBResources(rs, stmt, conn); + } + + LOG.info("ob compatible mode is " + compatibleMode); + return compatibleMode; + } + + public static boolean isOracleMode(String mode) { + return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equals(mode)); + } +} diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java new file mode 100644 index 00000000..ba754a37 --- /dev/null +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java @@ -0,0 +1,176 @@ +package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; + +import java.sql.Connection; +import java.util.Collections; +import java.util.List; + +import com.alibaba.datax.common.element.Record; + +public class TaskContext { + private Connection conn; + private final String table; + private String indexName; + // 辅助索引的字段列表 + private List secondaryIndexColumns = Collections.emptyList(); + private String querySql; + private final String where; + private final int fetchSize; + private long readBatchSize = -1; + private boolean weakRead = true; + private String userSavePoint; + private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; + + public String getPartitionName() { + return partitionName; + } + + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + private String partitionName; + + // 断点续读的保存点 + private volatile Record savePoint; + + // pk在column中的index,用于绑定变量时从savePoint中读取值 + // 如果这个值为null,则表示 不是断点续读的场景 + private int[] pkIndexs; + + private final List columns; + + private String[] pkColumns; + + private long cost; + + private final int transferColumnNumber; + + public TaskContext(String table, List columns, String where, int fetchSize) { + super(); + this.table = table; + this.columns = columns; + // 针对只有querySql的场景 + this.transferColumnNumber = columns == null ? -1 : columns.size(); + this.where = where; + this.fetchSize = fetchSize; + } + + public Connection getConn() { + return conn; + } + + public void setConn(Connection conn) { + this.conn = conn; + } + + public String getIndexName() { + return indexName; + } + + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + public List getSecondaryIndexColumns() { + return secondaryIndexColumns; + } + + public void setSecondaryIndexColumns(List secondaryIndexColumns) { + this.secondaryIndexColumns = secondaryIndexColumns; + } + + public String getQuerySql() { + if (readBatchSize == -1 || ObReaderUtils.isOracleMode(compatibleMode)) { + return querySql; + } else { + return querySql + " limit " + readBatchSize; + } + } + + public void setQuerySql(String querySql) { + this.querySql = querySql; + } + + public String getWhere() { + return where; + } + + public Record getSavePoint() { + return savePoint; + } + + public void setSavePoint(Record savePoint) { + this.savePoint = savePoint; + } + + public int[] getPkIndexs() { + return pkIndexs; + } + + public void setPkIndexs(int[] pkIndexs) { + this.pkIndexs = pkIndexs; + } + + public List getColumns() { + return columns; + } + + public String[] getPkColumns() { + return pkColumns; + } + + public void setPkColumns(String[] pkColumns) { + this.pkColumns = pkColumns; + } + + public String getTable() { + return table; + } + + public int getFetchSize() { + return fetchSize; + } + + public long getCost() { + return cost; + } + + public void addCost(long cost) { + this.cost += cost; + } + + public int getTransferColumnNumber() { + return transferColumnNumber; + } + + public long getReadBatchSize() { + return readBatchSize; + } + + public void setReadBatchSize(long readBatchSize) { + this.readBatchSize = readBatchSize; + } + + public boolean getWeakRead() { + return weakRead; + } + + public void setWeakRead(boolean weakRead) { + this.weakRead = weakRead; + } + + public String getUserSavePoint() { + return userSavePoint; + } + public void setUserSavePoint(String userSavePoint) { + this.userSavePoint = userSavePoint; + } + + public String getCompatibleMode() { + return compatibleMode; + } + + public void setCompatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + } +} diff --git a/oceanbasev10reader/src/main/resources/plugin.json b/oceanbasev10reader/src/main/resources/plugin.json new file mode 100644 index 00000000..66acbd62 --- /dev/null +++ b/oceanbasev10reader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "oceanbasev10reader", + "class": "com.alibaba.datax.plugin.reader.oceanbasev10reader.OceanBaseReader", + "description": "read data from oceanbase with SQL interface", + "developer": "oceanbase" +} \ No newline at end of file diff --git a/oceanbasev10writer/pom.xml b/oceanbasev10writer/pom.xml new file mode 100644 index 00000000..cbe19732 --- /dev/null +++ b/oceanbasev10writer/pom.xml @@ -0,0 +1,126 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + oceanbasev10writer + + com.alibaba.datax + 0.0.1-SNAPSHOT + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + org.springframework + spring-test + 4.0.4.RELEASE + test + + + + + com.alipay.oceanbase + oceanbase-connector-java + 3.2.0 + system + ${basedir}/src/main/libs/oceanbase-connector-java-3.2.0.jar + + + com.alipay.oceanbase + oceanbase-client + + + + + + log4j + log4j + 1.2.16 + + + org.json + json + 20160810 + + + junit + junit + 4.11 + test + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/oceanbasev10writer/src/main/assembly/package.xml b/oceanbasev10writer/src/main/assembly/package.xml new file mode 100644 index 00000000..2529b4d4 --- /dev/null +++ b/oceanbasev10writer/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/oceanbasev10writer + + + target/ + + oceanbasev10writer-0.0.1-SNAPSHOT.jar + + plugin/writer/oceanbasev10writer + + + + + + false + plugin/writer/oceanbasev10writer/libs + runtime + + + diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java new file mode 100644 index 00000000..9fa3cd9a --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java @@ -0,0 +1,62 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer; + +public interface Config { + + String MEMSTORE_THRESHOLD = "memstoreThreshold"; + + double DEFAULT_MEMSTORE_THRESHOLD = 0.9d; + + String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond"; + + long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30; + + int DEFAULT_BATCH_SIZE = 100; + int MAX_BATCH_SIZE = 4096; + + String FAIL_TRY_COUNT = "failTryCount"; + + int DEFAULT_FAIL_TRY_COUNT = 10000; + + String WRITER_THREAD_COUNT = "writerThreadCount"; + + int DEFAULT_WRITER_THREAD_COUNT = 1; + + String CONCURRENT_WRITE = "concurrentWrite"; + + boolean DEFAULT_CONCURRENT_WRITE = true; + + String OB_VERSION = "obVersion"; + String TIMEOUT = "timeout"; + + String PRINT_COST = "printCost"; + boolean DEFAULT_PRINT_COST = false; + + String COST_BOUND = "costBound"; + long DEFAULT_COST_BOUND = 20; + + String MAX_ACTIVE_CONNECTION = "maxActiveConnection"; + int DEFAULT_MAX_ACTIVE_CONNECTION = 2000; + + String WRITER_SUB_TASK_COUNT = "writerSubTaskCount"; + int DEFAULT_WRITER_SUB_TASK_COUNT = 1; + int MAX_WRITER_SUB_TASK_COUNT = 4096; + + String OB_WRITE_MODE = "obWriteMode"; + String OB_COMPATIBLE_MODE = "obCompatibilityMode"; + String OB_COMPATIBLE_MODE_ORACLE = "ORACLE"; + String OB_COMPATIBLE_MODE_MYSQL = "MYSQL"; + + String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout"; + int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s + + String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout"; + int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s + + String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout"; + int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s + + String OB_UPDATE_COLUMNS = "obUpdateColumns"; + + String USE_PART_CALCULATOR = "usePartCalculator"; + boolean DEFAULT_USE_PART_CALCULATOR = false; +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java new file mode 100644 index 00000000..89ef1c52 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java @@ -0,0 +1,246 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.SingleTableWriterTask; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; + +/** + * 2016-04-07 + *

+ * 专门针对OceanBase1.0的Writer + * + * @author biliang.wbl + * + */ +public class OceanBaseV10Writer extends Writer { + private static DataBaseType DATABASE_TYPE = DataBaseType.OceanBase; + + /** + * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 + *

+ * 整个 Writer 执行流程是: + * + *

+	 * Job类init-->prepare-->split
+	 * 
+	 *                          Task类init-->prepare-->startWrite-->post-->destroy
+	 *                          Task类init-->prepare-->startWrite-->post-->destroy
+	 * 
+	 *                                                                            Job类post-->destroy
+	 * 
+ */ + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonJob; + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + /** + * 注意:此方法仅执行一次。 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?... + * 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。 + */ + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + checkCompatibleMode(originalConfig); + this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonJob.init(this.originalConfig); + } + + /** + * 注意:此方法仅执行一次。 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + // 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外) + @Override + public void prepare() { + int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK); + if (tableNumber == 1) { + this.commonJob.prepare(this.originalConfig); + final String version = fetchServerVersion(originalConfig); + originalConfig.set(Config.OB_VERSION, version); + } + + String username = originalConfig.getString(Key.USERNAME); + String password = originalConfig.getString(Key.PASSWORD); + + // 获取presql配置,并执行 + List preSqls = originalConfig.getList(Key.PRE_SQL, String.class); + if (preSqls == null || preSqls.size() == 0) { + return; + } + + List conns = originalConfig.getList(Constant.CONN_MARK, Object.class); + for (Object connConfObject : conns) { + Configuration connConf = Configuration.from(connConfObject.toString()); + // 这里的 jdbcUrl 已经 append 了合适后缀参数 + String jdbcUrl = connConf.getString(Key.JDBC_URL); + + List tableList = connConf.getList(Key.TABLE, String.class); + for (String table : tableList) { + List renderedPreSqls = WriterUtil.renderPreOrPostSqls(preSqls, table); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcUrl, username, password); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", + StringUtils.join(renderedPreSqls, ";"), jdbcUrl); + WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, DATABASE_TYPE); + ObWriterUtils.asyncClose(null, null, conn); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("After job prepare(), originalConfig now is:[\n{}\n]", originalConfig.toJSON()); + } + } + + /** + * 注意:此方法仅执行一次。 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。 这里的 + * mandatoryNumber 是强制必须切分的份数。 + */ + @Override + public List split(int mandatoryNumber) { + int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK); + if (tableNumber == 1) { + return this.commonJob.split(this.originalConfig, mandatoryNumber); + } + Configuration simplifiedConf = this.originalConfig; + + List splitResultConfigs = new ArrayList(); + for (int j = 0; j < mandatoryNumber; j++) { + splitResultConfigs.add(simplifiedConf.clone()); + } + return splitResultConfigs; + } + + /** + * 注意:此方法仅执行一次。 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + @Override + public void post() { + int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK); + if (tableNumber == 1) { + commonJob.post(this.originalConfig); + return; + } + String username = originalConfig.getString(Key.USERNAME); + String password = originalConfig.getString(Key.PASSWORD); + List conns = originalConfig.getList(Constant.CONN_MARK, Object.class); + List postSqls = originalConfig.getList(Key.POST_SQL, String.class); + if (postSqls == null || postSqls.size() == 0) { + return; + } + + for (Object connConfObject : conns) { + Configuration connConf = Configuration.from(connConfObject.toString()); + String jdbcUrl = connConf.getString(Key.JDBC_URL); + List tableList = connConf.getList(Key.TABLE, String.class); + + for (String table : tableList) { + List renderedPostSqls = WriterUtil.renderPreOrPostSqls(postSqls, table); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + // 说明有 postSql 配置,则此处删除掉 + Connection conn = DBUtil.getConnection(DATABASE_TYPE, jdbcUrl, username, password); + LOG.info("Begin to execute postSqls:[{}]. context info:{}.", + StringUtils.join(renderedPostSqls, ";"), jdbcUrl); + WriterUtil.executeSqls(conn, renderedPostSqls, jdbcUrl, DATABASE_TYPE); + ObWriterUtils.asyncClose(null, null, conn); + } + } + } + originalConfig.remove(Key.POST_SQL); + } + + /** + * 注意:此方法仅执行一次。 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。 + */ + @Override + public void destroy() { + this.commonJob.destroy(this.originalConfig); + } + + private String fetchServerVersion(Configuration config) { + final String fetchVersionSql = "show variables like 'version'"; + return DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); + } + + private void checkCompatibleMode(Configuration configure) { + final String fetchCompatibleModeSql = "SHOW VARIABLES LIKE 'ob_compatibility_mode'"; + String compatibleMode = DbUtils.fetchSingleValueWithRetry(configure, fetchCompatibleModeSql); + ObWriterUtils.setCompatibleMode(compatibleMode); + configure.set(Config.OB_COMPATIBLE_MODE, compatibleMode); + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private Configuration writerSliceConfig; + private CommonRdbmsWriter.Task writerTask; + + /** + * 注意:此方法每个 Task 都会执行一次。 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 + * startWrite()做准备。 + */ + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + int tableNumber = writerSliceConfig.getInt(Constant.TABLE_NUMBER_MARK); + if (tableNumber == 1) { + // always use concurrentTableWriter + this.writerTask = new ConcurrentTableWriterTask(DATABASE_TYPE); + } else { + throw new RuntimeException("writing to multi-tables is not supported."); + } + LOG.info("tableNumber:" + tableNumber + ",writerTask Class:" + writerTask.getClass().getName()); + this.writerTask.init(this.writerSliceConfig); + } + + /** + * 注意:此方法每个 Task 都会执行一次。 最佳实践:如果 Task + * 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 + */ + @Override + public void prepare() { + this.writerTask.prepare(this.writerSliceConfig); + } + + /** + * 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。 + */ + public void startWrite(RecordReceiver recordReceiver) { + this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); + } + + /** + * 注意:此方法每个 Task 都会执行一次。 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。 + */ + @Override + public void post() { + this.writerTask.post(this.writerSliceConfig); + } + + /** + * 注意:此方法每个 Task 都会执行一次。 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。 + */ + @Override + public void destroy() { + this.writerTask.destroy(this.writerSliceConfig); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ConnHolder.java new file mode 100644 index 00000000..785f47bf --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ConnHolder.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; + +import java.sql.Connection; + +public abstract class ConnHolder { + + protected final Configuration config; + protected Connection conn; + + public ConnHolder(Configuration config) { + this.config = config; + } + + public abstract Connection initConnection(); + + public Configuration getConfig() { + return config; + } + + public Connection getConn() { + return conn; + } + + public Connection reconnect() { + DBUtil.closeDBResources(null, conn); + return initConnection(); + } + + public abstract String getJdbcUrl(); + + public abstract String getUserName(); + + public abstract void destroy(); +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java new file mode 100644 index 00000000..53172495 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java @@ -0,0 +1,101 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author oceanbase + * + */ +public class DataBaseWriterBuffer { + private static final Logger LOG = LoggerFactory.getLogger(DataBaseWriterBuffer.class); + + private final ConnHolder connHolder; + private final String dbName; + private Map> tableBuffer = new HashMap>(); + private long lastCheckMemstoreTime; + + public DataBaseWriterBuffer(Configuration config,String jdbcUrl, String userName, String password,String dbName){ + this.connHolder = new ObClientConnHolder(config, jdbcUrl, userName, password); + this.dbName=dbName; + } + + public ConnHolder getConnHolder(){ + return connHolder; + } + + public void initTableBuffer(List tableList) { + for (String table : tableList) { + tableBuffer.put(table, new LinkedList()); + } + } + + public List getTableList(){ + return new ArrayList(tableBuffer.keySet()); + } + + public void addRecord(Record record, String tableName) { + LinkedList recordList = tableBuffer.get(tableName); + if (recordList == null) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, + String.format("The [table] calculated based on the rules does not exist. The calculated [tableName]=%s, [db]=%s. Please check the rules you configured.", + tableName, connHolder.getJdbcUrl())); + } + recordList.add(record); + } + + public Map> getTableBuffer() { + return tableBuffer; + } + + public String getDbName() { + return dbName; + } + + public long getLastCheckMemstoreTime() { + return lastCheckMemstoreTime; + } + + public void setLastCheckMemstoreTime(long lastCheckMemstoreTime) { + this.lastCheckMemstoreTime = lastCheckMemstoreTime; + } + + /** + * 检查当前DB的memstore使用状态 + *

+ * 若超过阈值,则休眠 + * + * @param memstoreCheckIntervalSecond + * @param memstoreThreshold + */ + public synchronized void checkMemstore(long memstoreCheckIntervalSecond, double memstoreThreshold) { + long now = System.currentTimeMillis(); + if (now - getLastCheckMemstoreTime() < 1000 * memstoreCheckIntervalSecond) { + return; + } + + LOG.debug(String.format("checking memstore usage: lastCheckTime=%d, now=%d, check interval=%d, threshold=%f", + getLastCheckMemstoreTime(), now, memstoreCheckIntervalSecond, memstoreThreshold)); + + Connection conn = getConnHolder().getConn(); + while (ObWriterUtils.isMemstoreFull(conn, memstoreThreshold)) { + LOG.warn("OB memstore is full,sleep 60 seconds, jdbc=" + getConnHolder().getJdbcUrl() + + ",threshold=" + memstoreThreshold); + ObWriterUtils.sleep(60000); + } + setLastCheckMemstoreTime(now); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OBDataSourceV10.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OBDataSourceV10.java new file mode 100644 index 00000000..2c1f516f --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OBDataSourceV10.java @@ -0,0 +1,190 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alipay.oceanbase.obproxy.datasource.ObGroupDataSource; +import com.alipay.oceanbase.obproxy.exception.ConnectionPropertiesNotSupportedException; +import com.alipay.oceanbase.obproxy.util.StringParser.IllegalFormatException; +import com.google.common.collect.Maps; + +public class OBDataSourceV10 { + private static final Logger LOG = LoggerFactory.getLogger(OBDataSourceV10.class); + + private static final Map dataSources = Maps.newHashMap(); + + private static int ocjGetConnectionTimeout = 0; + private static int ocjGlobalProxyroGetConnectionTimeout = 0; + private static int ocjMaxWaitOfCreateClusterResourceMs = 0; + + private static Configuration taskConfig; + + public static String genKey(String fullUserName, String dbName) { + //username@tenantName#clusterName/dbName + return fullUserName + "/" + dbName; + } + + public static synchronized void init(Configuration configuration, + final String fullUsername, + final String password, + final String dbName) { + taskConfig = configuration; + final String rsUrl = ""; + final String dataSourceKey = genKey(fullUsername, dbName); + final int maxActiveConnection = configuration.getInt(Config.MAX_ACTIVE_CONNECTION, Config.DEFAULT_MAX_ACTIVE_CONNECTION); + if (dataSources.containsKey(dataSourceKey)) { + dataSources.get(dataSourceKey).increseRefercnce(); + } else { + long timeout = configuration.getInt(Config.TIMEOUT, 30); + if (timeout < 30) { + timeout = 30; + } + if (ocjGetConnectionTimeout == 0) { + ocjGetConnectionTimeout = configuration.getInt(Config.OCJ_GET_CONNECT_TIMEOUT, + Config.DEFAULT_OCJ_GET_CONNECT_TIMEOUT); + ocjGlobalProxyroGetConnectionTimeout = configuration.getInt(Config.OCJ_PROXY_CONNECT_TIMEOUT, + Config.DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT); + ocjMaxWaitOfCreateClusterResourceMs = configuration.getInt(Config.OCJ_CREATE_RESOURCE_TIMEOUT, + Config.DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT); + + LOG.info(String.format("initializing OCJ with ocjGetConnectionTimeout=%d, " + + "ocjGlobalProxyroGetConnectionTimeout=%d, ocjMaxWaitOfCreateClusterResourceMs=%d", + ocjGetConnectionTimeout, ocjGlobalProxyroGetConnectionTimeout, ocjMaxWaitOfCreateClusterResourceMs)); + } + DataSourceHolder holder = null; + try { + holder = new DataSourceHolder(rsUrl, fullUsername, password, dbName, maxActiveConnection, timeout); + dataSources.put(dataSourceKey, holder); + } catch (ConnectionPropertiesNotSupportedException e) { + e.printStackTrace(); + throw new DataXException(ObDataSourceErrorCode.DESC, "connect error"); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + throw new DataXException(ObDataSourceErrorCode.DESC, "connect error"); + } catch (IllegalFormatException e) { + e.printStackTrace(); + throw new DataXException(ObDataSourceErrorCode.DESC, "connect error"); + } catch (SQLException e) { + e.printStackTrace(); + throw new DataXException(ObDataSourceErrorCode.DESC, "connect error"); + } + } + } + + public static synchronized void destory(final String dataSourceKey){ + DataSourceHolder holder = dataSources.get(dataSourceKey); + holder.decreaseReference(); + if (holder.canClose()) { + dataSources.remove(dataSourceKey); + holder.close(); + LOG.info(String.format("close datasource success [%s]", dataSourceKey)); + } + } + + public static Connection getConnection(final String url) { + Connection conn = null; + try { + conn = dataSources.get(url).getconnection(); + } catch (SQLException e) { + e.printStackTrace(); + } + return conn; + } + + private static Map buildJdbcProperty() { + Map property = new HashMap(); + property.put("useServerPrepStmts", "false"); + property.put("characterEncoding", "UTF-8"); + property.put("useLocalSessionState", "false"); + property.put("rewriteBatchedStatements", "true"); + property.put("socketTimeout", "25000"); + + return property; + } + + private static class DataSourceHolder { + private volatile int reference; + private final ObGroupDataSource groupDataSource; + public static final Map jdbcProperty = buildJdbcProperty();; + + public DataSourceHolder(final String rsUrl, + final String fullUsername, + final String password, + final String dbName, + final int maxActive, + final long timeout) throws ConnectionPropertiesNotSupportedException, IllegalFormatException, IllegalArgumentException, SQLException { + this.reference = 1; + this.groupDataSource = new ObGroupDataSource(); + this.groupDataSource.setUrl(rsUrl); + this.groupDataSource.setFullUsername(fullUsername); + this.groupDataSource.setPassword(password); + this.groupDataSource.setDatabase(dbName); + this.groupDataSource.setConnectionProperties(jdbcProperty); + this.groupDataSource.setGetConnectionTimeout(ocjGetConnectionTimeout); + this.groupDataSource.setGlobalProxyroGetConnectionTimeout(ocjGlobalProxyroGetConnectionTimeout); + this.groupDataSource.setMaxWaitOfCreateClusterResourceMs(ocjMaxWaitOfCreateClusterResourceMs); + this.groupDataSource.setMaxActive(maxActive); + this.groupDataSource.setGlobalSlowQueryThresholdUs(3000000); // 3s, sql with response time more than 3s will be logged + this.groupDataSource.setGlobalCleanLogFileEnabled(true); // enable log cleanup + this.groupDataSource.setGlobalLogFileSizeThreshold(17179869184L); // 16G, log file total size + this.groupDataSource.setGlobalCleanLogFileInterval(10000); // 10s, check interval + this.groupDataSource.setInitialSize(1); + + List initSqls = new ArrayList(); + if (taskConfig != null) { + List sessionConfig = taskConfig.getList(Key.SESSION, new ArrayList(), String.class); + if (sessionConfig != null || sessionConfig.size() > 0) { + initSqls.addAll(sessionConfig); + } + } + // set up for writing timestamp columns + if (ObWriterUtils.isOracleMode()) { + initSqls.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS';"); + initSqls.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF';"); + initSqls.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD';"); + } + + this.groupDataSource.setConnectionInitSqls(initSqls); + + this.groupDataSource.init(); + // this.groupDataSource; + LOG.info("Create GroupDataSource rsUrl=[{}], fullUserName=[{}], dbName=[{}], getConnectionTimeout= {}ms, maxActive={}", + rsUrl, fullUsername, dbName, 5000, maxActive); + } + + public Connection getconnection() throws SQLException { + return groupDataSource.getConnection(); + } + + public synchronized void increseRefercnce() { + this.reference++; + } + + public synchronized void decreaseReference() { + this.reference--; + } + + public synchronized boolean canClose() { + return reference == 0; + } + + public synchronized void close() { + if (this.canClose()) { + groupDataSource.destroy(); + } + } + } + +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java new file mode 100644 index 00000000..10de5615 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java @@ -0,0 +1,55 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; + +/** + * wrap oceanbase java client + * @author oceanbase + */ + +public class OCJConnHolder extends ConnHolder { + private ServerConnectInfo connectInfo; + private String dataSourceKey; + + public OCJConnHolder (Configuration config, ServerConnectInfo connInfo) { + super(config); + this.connectInfo = connInfo; + this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName); + OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName); + } + + @Override + public Connection initConnection() { + conn = OBDataSourceV10.getConnection(dataSourceKey); + return conn; + } + + @Override + public Connection reconnect() { + DBUtil.closeDBResources(null, conn); + return initConnection(); + } + + @Override + public Connection getConn() { + return conn; + } + + @Override + public String getJdbcUrl() { + return connectInfo.jdbcUrl; + } + + @Override + public String getUserName() { + return connectInfo.userName; + } + + public void destroy() { + OBDataSourceV10.destory(this.dataSourceKey); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java new file mode 100644 index 00000000..8ff53039 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java @@ -0,0 +1,63 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; + +/** + * 数据库连接代理对象,负责创建连接,重新连接 + * + * @author oceanbase + * + */ +public class ObClientConnHolder extends ConnHolder { + private final String jdbcUrl; + private final String userName; + private final String password; + + public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) { + super(config); + this.jdbcUrl = jdbcUrl; + this.userName = userName; + this.password = password; + } + + // Connect to ob with obclient and obproxy + @Override + public Connection initConnection() { + String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl); + DataBaseType dbType = DataBaseType.OceanBase; + if (ObWriterUtils.isOracleMode()) { + // set up for writing timestamp columns + List sessionConfig = config.getList(Key.SESSION, new ArrayList(), String.class); + sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"); + sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'"); + sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'"); + config.set(Key.SESSION, sessionConfig); + } + conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password); + DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE); + return conn; + } + + @Override + public String getJdbcUrl() { + return jdbcUrl; + } + + @Override + public String getUserName() { + return userName; + } + + @Override + public void destroy() { + DBUtil.closeDBResources(null, conn); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObDataSourceErrorCode.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObDataSourceErrorCode.java new file mode 100644 index 00000000..6509c766 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObDataSourceErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum ObDataSourceErrorCode implements ErrorCode { + DESC("ObDataSourceError code","connect error"); + + private final String code; + private final String describe; + + private ObDataSourceErrorCode(String code, String describe) { + this.code = code; + this.describe = describe; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.describe; + } + + @Override + public String toString() { + return String.format("Code:[%s], Describe:[%s]. ", this.code, + this.describe); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java new file mode 100644 index 00000000..49e5c05f --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java @@ -0,0 +1,58 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ServerConnectInfo { + + public String clusterName; + public String tenantName; + public String userName; + public String password; + public String databaseName; + public String ipPort; + public String jdbcUrl; + + public ServerConnectInfo(final String jdbcUrl, final String username, final String password) { + if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { + String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); + if (ss.length != 3) { + throw new RuntimeException("jdbc url format is not correct: " + jdbcUrl); + } + this.userName = username; + this.clusterName = ss[1].trim().split(":")[0]; + this.tenantName = ss[1].trim().split(":")[1]; + this.jdbcUrl = ss[2].replace("jdbc:mysql:", "jdbc:oceanbase:"); + } else { + throw new RuntimeException ("jdbc url format is not correct: " + jdbcUrl); + } + this.password = password; + parseJdbcUrl(jdbcUrl); + } + + private void parseJdbcUrl(final String jdbcUrl) { + Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w]+)\\?"); + Matcher matcher = pattern.matcher(jdbcUrl); + if (matcher.find()) { + String ipPort = matcher.group(1); + String dbName = matcher.group(2); + this.ipPort = ipPort; + this.databaseName = dbName; + } else { + throw new RuntimeException("Invalid argument:" + jdbcUrl); + } + } + + public String toString() { + StringBuffer strBuffer = new StringBuffer(); + return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName) + .append(", userName:").append(userName).append(", databaseName:").append(databaseName) + .append(", ipPort:").append(ipPort).append(", jdbcUrl:").append(jdbcUrl).toString(); + } + + public String getFullUserName() { + StringBuilder builder = new StringBuilder(); + builder.append(userName).append("@").append(tenantName).append("#").append(clusterName); + return builder.toString(); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ColumnMetaCache.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ColumnMetaCache.java new file mode 100644 index 00000000..13339e0b --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ColumnMetaCache.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; + +public class ColumnMetaCache { + private static final Logger LOG = LoggerFactory.getLogger(ColumnMetaCache.class); + + private static String tableName; + private static Triple, List, List> columnMeta = null; + + public ColumnMetaCache() { + + } + + public static void init(Connection connection, final String tableName, final List columns) throws SQLException { + if (columnMeta == null) { + synchronized(ColumnMetaCache.class) { + ColumnMetaCache.tableName = tableName; + if (columnMeta == null) { + columnMeta = DBUtil.getColumnMetaData(connection, + tableName, StringUtils.join(columns, ",")); + LOG.info("fetch columnMeta of table {} success", tableName); + } + } + } + } + + public static Triple, List, List> getColumnMeta() { + return columnMeta; + } + +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java new file mode 100644 index 00000000..084acbeb --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -0,0 +1,523 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.sql.Connection; +//import java.sql.PreparedStatement; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; +import com.alipay.oceanbase.obproxy.data.TableEntryKey; +import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator; + +public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class); + + // memstore_total 与 memstore_limit 比例的阈值,一旦超过这个值,则暂停写入 + private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD; + // memstore检查的间隔 + private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND; + // 最后一次检查 + private long lastCheckMemstoreTime; + + private static AtomicLong totalTask = new AtomicLong(0); + private long taskId = -1; + + private AtomicBoolean isMemStoreFull = new AtomicBoolean(false); + private ConnHolder checkConnHolder; + + public ConcurrentTableWriterTask(DataBaseType dataBaseType) { + super(dataBaseType); + taskId = totalTask.getAndIncrement(); + } + + private ObPartitionIdCalculator partCalculator = null; + + private HashMap> groupInsertValues; +// private List unknownPartRecords; + private List partitionKeyIndexes; + + private ConcurrentTableWriter concurrentWriter = null; + + private ConnHolder connHolder; + + private boolean allTaskInQueue = false; + + private Lock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); + + private long startTime; + private boolean isOb2 = false; + private String obWriteMode = "update"; + private boolean isOracleCompatibleMode = false; + private String obUpdateColumns = null; + private List> deleteColPos; + private String dbName; + + @Override + public void init(Configuration config) { + super.init(config); + // OceanBase 所有操作都是 insert into on duplicate key update 模式 + // writeMode应该使用enum来定义 + this.writeMode = "update"; + obWriteMode = config.getString(Config.OB_WRITE_MODE, "update"); + ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password); + dbName = connectInfo.databaseName; + //init check memstore + this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD); + this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND, + Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND); + this.isOracleCompatibleMode = ObWriterUtils.isOracleMode(); + + LOG.info("configure url is unavailable, use obclient for connections."); + this.checkConnHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, + connectInfo.getFullUserName(), connectInfo.password); + this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, + connectInfo.getFullUserName(), connectInfo.password); + checkConnHolder.initConnection(); + if (isOracleCompatibleMode) { + connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); + table = table.toUpperCase(); + LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", + connectInfo.databaseName, table)); + } + + if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) { + initPartCalculator(connectInfo); + } else { + LOG.info("Disable partition calculation feature."); + } + + obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null); + groupInsertValues = new HashMap>(); + partitionKeyIndexes = new ArrayList(); + rewriteSql(); + + if (null == concurrentWriter) { + concurrentWriter = new ConcurrentTableWriter(config, connectInfo, writeRecordSql); + allTaskInQueue = false; + } + + String version = config.getString(Config.OB_VERSION); + int pIdx = version.lastIndexOf('.'); + if ((Float.valueOf(version.substring(0, pIdx)) >= 2.1f)) { + isOb2 = true; + } + } + + private void initPartCalculator(ServerConnectInfo connectInfo) { + int retry = 0; + LOG.info(String.format("create tableEntryKey with clusterName %s, tenantName %s, databaseName %s, tableName %s", + connectInfo.clusterName, connectInfo.tenantName, connectInfo.databaseName, table)); + TableEntryKey tableEntryKey = new TableEntryKey(connectInfo.clusterName, connectInfo.tenantName, + connectInfo.databaseName, table); + do { + try { + if (retry > 0) { + int sleep = retry > 8 ? 500 : (1 << retry); + TimeUnit.SECONDS.sleep(sleep); + LOG.info("retry create new part calculator, the {} times", retry); + } + LOG.info("create partCalculator with address: " + connectInfo.ipPort); + partCalculator = new ObPartitionIdCalculator(connectInfo.ipPort, tableEntryKey); + } catch (Exception ex) { + ++retry; + LOG.warn("create new part calculator failed, retry ... {}", retry, ex); + } + } while (partCalculator == null && retry < 3); // try 3 times + } + + public boolean isFinished() { + return allTaskInQueue && concurrentWriter.checkFinish(); + } + + public boolean allTaskInQueue() { + return allTaskInQueue; + } + + public void setPutAllTaskInQueue() { + this.allTaskInQueue = true; + LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}", + concurrentWriter.getTaskQueueSize(), + concurrentWriter.getTotalTaskCount(), + concurrentWriter.getFinishTaskCount()); + } + + private void rewriteSql() { + Connection conn = connHolder.initConnection(); + if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) { + // change obWriteMode to insert so the insert statement will be generated. + obWriteMode = "insert"; + deleteColPos = ObWriterUtils.buildDeleteSql(conn, dbName, table, columns); + } + this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns); + LOG.info("writeRecordSql :{}", this.writeRecordSql); + } + + public void prepare(Configuration writerSliceConfig) { + super.prepare(writerSliceConfig); + calPartitionKeyIndex(partitionKeyIndexes); + concurrentWriter.start(); + } + + private void calPartitionKeyIndex(List partKeyIndexes) { + partKeyIndexes.clear(); + if (null == partCalculator) { + LOG.error("partCalculator is null"); + return; + } + for (int i = 0; i < columns.size(); ++i) { + if (partCalculator.isPartitionKeyColumn(columns.get(i))) { + LOG.info(columns.get(i) + " is partition key."); + partKeyIndexes.add(i); + } + } + } + + private Long calPartitionId(List partKeyIndexes, Record record) { + if (partCalculator == null) { + return null; + } + for (Integer i : partKeyIndexes) { + partCalculator.addColumn(columns.get(i), record.getColumn(i).asString()); + } + return partCalculator.calculate(); + } + + @Override + public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { + this.taskPluginCollector = taskPluginCollector; + + // 用于写入数据的时候的类型根据目的表字段类型转换 + int retryTimes = 0; + boolean needRetry = false; + do { + try { + if (retryTimes > 0) { + TimeUnit.SECONDS.sleep((1 << retryTimes)); + DBUtil.closeDBResources(null, connection); + connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password); + LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes); + } + ColumnMetaCache.init(connection, this.table, this.columns); + this.resultSetMetaData = ColumnMetaCache.getColumnMeta(); + needRetry = false; + } catch (SQLException e) { + needRetry = true; + ++retryTimes; + e.printStackTrace(); + LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes); + } catch (InterruptedException e) { + LOG.warn("startWriteWithConnection interrupt, ignored"); + } finally { + } + } while (needRetry && retryTimes < 100); + + try { + Record record; + startTime = System.currentTimeMillis(); + while ((record = recordReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != this.columnNumber) { + // 源头读取字段列数与目的表字段写入列数不相等,直接报错 + LOG.error("column not equal {} != {}, record = {}", + this.columnNumber, record.getColumnNumber(), record.toString()); + throw DataXException + .asDataXException( + DBUtilErrorCode.CONF_ERROR, + String.format("Recoverable exception in OB. Roll back this write and hibernate for one minute. SQLState: %d. ErrorCode: %d", + record.getColumnNumber(), + this.columnNumber)); + } + addRecordToCache(record); + } + addLeftRecords(); + waitTaskFinish(); + } catch (Exception e) { + throw DataXException.asDataXException( + DBUtilErrorCode.WRITE_DATA_ERROR, e); + } finally { + DBUtil.closeDBResources(null, null, connection); + } + } + + public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record) + throws SQLException { + return fillPreparedStatement(preparedStatement, record); + } + + public PreparedStatement fillStatementIndex(PreparedStatement preparedStatement, + int prepIdx, int columnIndex, Column column) throws SQLException { + int columnSqltype = this.resultSetMetaData.getMiddle().get(columnIndex); + String typeName = this.resultSetMetaData.getRight().get(columnIndex); + return fillPreparedStatementColumnType(preparedStatement, prepIdx, columnSqltype, typeName, column); + } + + public void collectDirtyRecord(Record record, SQLException e) { + taskPluginCollector.collectDirtyRecord(record, e); + } + + public void insertOneRecord(Connection connection, List buffer) { + doOneInsert(connection, buffer); + } + + private void addLeftRecords() { + for (List groupValues : groupInsertValues.values()) { + if (groupValues.size() > 0 ) { + int retry = 0; + while (true) { + try { + concurrentWriter.addBatchRecords(groupValues); + break; + } catch (InterruptedException e) { + retry++; + LOG.info("Concurrent table writer is interrupted, retry {}", retry); + } + } + } + } + } + + private void addRecordToCache(final Record record) { + Long partId =null; + try { + partId = calPartitionId(partitionKeyIndexes, record); + } catch (Exception e1) { + LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); + } + + if (partId == null && isOb2) { + LOG.debug("fail to calculate parition id, just put into the default buffer."); + partId = Long.MAX_VALUE; + } + + if (partId != null) { + List groupValues = groupInsertValues.get(partId); + if (groupValues == null) { + groupValues = new ArrayList(batchSize); + groupInsertValues.put(partId, groupValues); + } + groupValues.add(record); + if (groupValues.size() >= batchSize) { + int i = 0; + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(groupValues); + printEveryTime(); + break; + } catch (InterruptedException e) { + LOG.info("Concurrent table writer is interrupted"); + } + } + groupValues = new ArrayList(batchSize); + groupInsertValues.put(partId, groupValues); + } + } else { + LOG.warn("add unknown part record {}", record); + List unknownPartRecords = new ArrayList(); + unknownPartRecords.add(record); + int i = 0; + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(unknownPartRecords); + break; + } catch (InterruptedException e) { + LOG.info("Concurrent table writer is interrupted"); + } + } + } + } + + private void checkMemStore() { + Connection checkConn = checkConnHolder.reconnect(); + long now = System.currentTimeMillis(); + if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) { + return; + } + boolean isFull = ObWriterUtils.isMemstoreFull(checkConn, memstoreThreshold); + this.isMemStoreFull.set(isFull); + if (isFull) { + LOG.warn("OB memstore is full,sleep 30 seconds, threshold=" + memstoreThreshold); + } + lastCheckMemstoreTime = now; + } + + public boolean isMemStoreFull() { + return isMemStoreFull.get(); + } + + public void printEveryTime() { + long cost = System.currentTimeMillis() - startTime; + if (cost > 10000) { //10s + print(); + startTime = System.currentTimeMillis(); + } + } + + public void print() { + LOG.debug("Statistic total task {}, finished {}, queue Size {}", + concurrentWriter.getTotalTaskCount(), + concurrentWriter.getFinishTaskCount(), + concurrentWriter.getTaskQueueSize()); + concurrentWriter.printStatistics(); + } + + public void waitTaskFinish() { + setPutAllTaskInQueue(); + lock.lock(); + try { + while (!concurrentWriter.checkFinish()) { + condition.await(15, TimeUnit.SECONDS); + print(); + checkMemStore(); + } + } catch (InterruptedException e) { + LOG.warn("Concurrent table writer wait task finish interrupt"); + } finally { + lock.unlock(); + } + LOG.debug("wait all InsertTask finished ..."); + } + + public void singalTaskFinish() { + lock.lock(); + condition.signal(); + lock.unlock(); + } + + @Override + public void destroy(Configuration writerSliceConfig) { + if(concurrentWriter!=null) { + concurrentWriter.destory(); + } + // 把本级持有的conn关闭掉 + DBUtil.closeDBResources(null, connHolder.getConn()); + DBUtil.closeDBResources(null, checkConnHolder.getConn()); + checkConnHolder.destroy(); + super.destroy(writerSliceConfig); + } + + public class ConcurrentTableWriter { + private BlockingQueue> queue; + private List insertTasks; + private Configuration config; + private ServerConnectInfo connectInfo; + private String rewriteRecordSql; + private AtomicLong totalTaskCount; + private AtomicLong finishTaskCount; + private final int threadCount; + + public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) { + threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT); + queue = new LinkedBlockingQueue>(threadCount << 1); + insertTasks = new ArrayList(threadCount); + this.config = config; + this.connectInfo = connInfo; + this.rewriteRecordSql = rewriteRecordSql; + this.totalTaskCount = new AtomicLong(0); + this.finishTaskCount = new AtomicLong(0); + } + + public long getTotalTaskCount() { + return totalTaskCount.get(); + } + + public long getFinishTaskCount() { + return finishTaskCount.get(); + } + + public int getTaskQueueSize() { + return queue.size(); + } + + public void increFinishCount() { + finishTaskCount.incrementAndGet(); + } + + //should check after put all the task in the queue + public boolean checkFinish() { + long finishCount = finishTaskCount.get(); + long totalCount = totalTaskCount.get(); + return finishCount == totalCount; + } + + public synchronized void start() { + for (int i = 0; i < threadCount; ++i) { + LOG.info("start {} insert task.", (i+1)); + InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql, deleteColPos); + insertTask.setWriterTask(ConcurrentTableWriterTask.this); + insertTask.setWriter(this); + insertTasks.add(insertTask); + } + WriterThreadPool.executeBatch(insertTasks); + } + + public void printStatistics() { + long insertTotalCost = 0; + long insertTotalCount = 0; + for (InsertTask task: insertTasks) { + insertTotalCost += task.getTotalCost(); + insertTotalCount += task.getInsertCount(); + } + long avgCost = 0; + if (insertTotalCount != 0) { + avgCost = insertTotalCost / insertTotalCount; + } + ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms", + insertTotalCount, insertTotalCost, avgCost); + } + + public void addBatchRecords(final List records) throws InterruptedException { + boolean isSucc = false; + while (!isSucc) { + isSucc = queue.offer(records, 5, TimeUnit.SECONDS); + checkMemStore(); + } + totalTaskCount.incrementAndGet(); + } + + public synchronized void destory() { + if (insertTasks != null) { + for(InsertTask task : insertTasks) { + task.setStop(); + } + for(InsertTask task: insertTasks) { + task.destroy(); + } + } + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java new file mode 100644 index 00000000..522d91a5 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java @@ -0,0 +1,287 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask.ConcurrentTableWriter; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; + +public class InsertTask implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class); + + private ConcurrentTableWriterTask writerTask; + private ConcurrentTableWriter writer; + + private String writeRecordSql; + private long totalCost = 0; + private long insertCount = 0; + + private Queue> queue; + private boolean isStop; + private ConnHolder connHolder; + + private final long taskId; + private ServerConnectInfo connInfo; + + // 失败重试次数 + private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT; + private boolean printCost = Config.DEFAULT_PRINT_COST; + private long costBound = Config.DEFAULT_COST_BOUND; + private List> deleteMeta; + + public InsertTask( + final long taskId, + Queue> recordsQueue, + Configuration config, + ServerConnectInfo connectInfo, + String writeRecordSql, + List> deleteMeta) { + this.taskId = taskId; + this.queue = recordsQueue; + this.connInfo = connectInfo; + failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); + printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST); + costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND); + this.connHolder = new ObClientConnHolder(config, connInfo.jdbcUrl, + connInfo.getFullUserName(), connInfo.password); + this.writeRecordSql = writeRecordSql; + this.isStop = false; + this.deleteMeta = deleteMeta; + } + + void setWriterTask(ConcurrentTableWriterTask writerTask) { + this.writerTask = writerTask; + } + + void setWriter(ConcurrentTableWriter writer) { + this.writer = writer; + } + + private boolean isStop() { return isStop; } + public void setStop() { isStop = true; } + public long getTotalCost() { return totalCost; } + public long getInsertCount() { return insertCount; } + + @Override + public void run() { + Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId())); + LOG.debug("Task {} start to execute...", taskId); + while (!isStop()) { + try { + List records = queue.poll(); + if (null != records) { + doMultiInsert(records, this.printCost, this.costBound); + + } else if (writerTask.isFinished()) { + writerTask.singalTaskFinish(); + LOG.debug("not more task, thread exist ..."); + break; + } else { + TimeUnit.MILLISECONDS.sleep(5); + } + } catch (InterruptedException e) { + LOG.debug("TableWriter is interrupt"); + } catch (Exception e) { + LOG.warn("ERROR UNEXPECTED {}", e); + } + } + LOG.debug("Thread exist..."); + } + + public void destroy() { + connHolder.destroy(); + }; + + public void calStatistic(final long cost) { + writer.increFinishCount(); + ++insertCount; + totalCost += cost; + if (this.printCost && cost > this.costBound) { + LOG.info("slow multi insert cost {}ms", cost); + } + } + + private void doDelete(Connection conn, final List buffer) throws SQLException { + if(deleteMeta == null || deleteMeta.size() == 0) { + return; + } + for (int i = 0; i < deleteMeta.size(); i++) { + String deleteSql = deleteMeta.get(i).getKey(); + int[] valueIdx = deleteMeta.get(i).getValue(); + PreparedStatement ps = null; + try { + ps = conn.prepareStatement(deleteSql); + StringBuilder builder = new StringBuilder(); + for (Record record : buffer) { + int bindIndex = 0; + for (int idx : valueIdx) { + writerTask.fillStatementIndex(ps, bindIndex++, idx, record.getColumn(idx)); + builder.append(record.getColumn(idx).asString()).append(","); + } + ps.addBatch(); + } + LOG.debug("delete values: " + builder.toString()); + ps.executeBatch(); + } catch (SQLException ex) { + LOG.error("SQL Exception when delete records with {}", deleteSql, ex); + throw ex; + } finally { + DBUtil.closeDBResources(ps, null); + } + } + } + + public void doMultiInsert(final List buffer, final boolean printCost, final long restrict) { + checkMemstore(); + connHolder.initConnection(); + Connection conn = connHolder.getConn(); + boolean success = false; + long cost = 0; + long startTime = 0; + try { + for (int i = 0; i < failTryCount; ++i) { + if (i > 0) { + try { + int sleep = i >= 9 ? 500 : 1 << i;//不明白为什么要sleep 500s + TimeUnit.SECONDS.sleep(sleep); + } catch (InterruptedException e) { + LOG.info("thread interrupted ..., ignore"); + } + connHolder.initConnection(); + conn = connHolder.getConn(); + LOG.info("retry {}, start do batch insert, size={}", i, buffer.size()); + checkMemstore(); + } + startTime = System.currentTimeMillis(); + PreparedStatement ps = null; + try { + conn.setAutoCommit(false); + + // do delete if necessary + doDelete(conn, buffer); + + ps = conn.prepareStatement(writeRecordSql); + for (Record record : buffer) { + ps = writerTask.fillStatement(ps, record); + ps.addBatch(); + } + ps.executeBatch(); + conn.commit(); + success = true; + cost = System.currentTimeMillis() - startTime; + calStatistic(cost); + break; + } catch (SQLException e) { + LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); + if (i == 0 || i > 10 ) { + for (Record record : buffer) { + LOG.warn("ERROR : record {}", record); + } + } + // 按照错误码分类,分情况处理 + // 如果是OB系统级异常,则需要重建连接 + boolean fatalFail = ObWriterUtils.isFatalError(e); + if (fatalFail) { + ObWriterUtils.sleep(300000); + connHolder.reconnect(); + // 如果是可恢复的异常,则重试 + } else if (ObWriterUtils.isRecoverableError(e)) { + conn.rollback(); + ObWriterUtils.sleep(60000); + } else {// 其它异常直接退出,采用逐条写入方式 + conn.rollback(); + ObWriterUtils.sleep(1000); + break; + } + } catch (Exception e) { + e.printStackTrace(); + LOG.warn("Insert error unexpected {}", e); + } finally { + DBUtil.closeDBResources(ps, null); + } + } + } catch (SQLException e) { + LOG.warn("ERROR:retry failSql State ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); + } + + if (!success) { + try { + LOG.info("do one insert"); + conn = connHolder.reconnect(); + doOneInsert(conn, buffer); + cost = System.currentTimeMillis() - startTime; + calStatistic(cost); + } finally { + } + } + } + + // process one row, delete before insert + private void doOneInsert(Connection connection, List buffer) { + List deletePstmtList = new ArrayList(); + PreparedStatement preparedStatement = null; + try { + connection.setAutoCommit(false); + if (deleteMeta != null && deleteMeta.size() > 0) { + for (int i = 0; i < deleteMeta.size(); i++) { + String deleteSql = deleteMeta.get(i).getKey(); + deletePstmtList.add(connection.prepareStatement(deleteSql)); + } + } + + preparedStatement = connection.prepareStatement(this.writeRecordSql); + for (Record record : buffer) { + try { + for (int i = 0; i < deletePstmtList.size(); i++) { + PreparedStatement deleteStmt = deletePstmtList.get(i); + int[] valueIdx = deleteMeta.get(i).getValue(); + int bindIndex = 0; + for (int idx : valueIdx) { + writerTask.fillStatementIndex(deleteStmt, bindIndex++, idx, record.getColumn(idx)); + } + deleteStmt.execute(); + } + preparedStatement = writerTask.fillStatement(preparedStatement, record); + preparedStatement.execute(); + connection.commit(); + } catch (SQLException e) { + writerTask.collectDirtyRecord(record, e); + } finally { + // 此处不应该关闭statement,后续的数据还需要用到 + } + } + } catch (Exception e) { + throw DataXException.asDataXException( + DBUtilErrorCode.WRITE_DATA_ERROR, e); + } finally { + DBUtil.closeDBResources(preparedStatement, null); + for (PreparedStatement pstmt : deletePstmtList) { + DBUtil.closeDBResources(pstmt, null); + } + } + } + + private void checkMemstore() { + while (writerTask.isMemStoreFull()) { + ObWriterUtils.sleep(30000); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java new file mode 100644 index 00000000..637a3be4 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java @@ -0,0 +1,152 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; + +public class SingleTableWriterTask extends CommonRdbmsWriter.Task { + + // memstore_total 与 memstore_limit 比例的阈值,一旦超过这个值,则暂停写入 + private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD; + + // memstore检查的间隔 + private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND; + + // 最后一次检查 + private long lastCheckMemstoreTime; + + // 失败重试次数 + private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT; + + private ConnHolder connHolder; + private String obWriteMode = "update"; + private boolean isOracleCompatibleMode = false; + private String obUpdateColumns = null; + + public SingleTableWriterTask(DataBaseType dataBaseType) { + super(dataBaseType); + } + + @Override + public void init(Configuration config) { + super.init(config); + this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD); + this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND, + Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND); + failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); + // OceanBase 所有操作都是 insert into on duplicate key update 模式 + // writeMode应该使用enum来定义 + this.writeMode = "update"; + this.connHolder = new ObClientConnHolder(config, jdbcUrl, username, password); + //ob1.0里面, + this.batchSize = Math.min(128, config.getInt(Key.BATCH_SIZE, 128)); + LOG.info("In Write OceanBase 1.0, Real Batch Size : " + this.batchSize); + + isOracleCompatibleMode = ObWriterUtils.isOracleMode(); + LOG.info("isOracleCompatibleMode=" + isOracleCompatibleMode); + + obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null); + + obWriteMode = config.getString(Config.OB_WRITE_MODE, "update"); + if (isOracleCompatibleMode) { + obWriteMode = "insert"; + } + rewriteSql(); + } + + private void rewriteSql() { + Connection conn = connHolder.initConnection(); + this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns); + } + + protected void doBatchInsert(Connection conn, List buffer) throws SQLException { + doBatchInsert(buffer); + } + + private void doBatchInsert(List buffer) { + Connection conn = connHolder.getConn(); + // 检查内存 + checkMemstore(conn); + boolean success = false; + try { + for (int i = 0; i < failTryCount; i++) { + PreparedStatement ps = null; + try { + conn.setAutoCommit(false); + ps = conn.prepareStatement(this.writeRecordSql); + for (Record record : buffer) { + ps = fillPreparedStatement(ps, record); + ps.addBatch(); + } + ps.executeBatch(); + conn.commit(); + // 标记执行正常,且退出for循环 + success = true; + break; + } catch (SQLException e) { + // 如果是OB系统级异常,则需要重建连接 + boolean fatalFail = ObWriterUtils.isFatalError(e); + if (fatalFail) { + LOG.warn("Fatal exception in OB. Roll back this write and hibernate for five minutes. SQLState: {}. ErrorCode: {}", + e.getSQLState(), e.getErrorCode(), e); + ObWriterUtils.sleep(300000); + DBUtil.closeDBResources(null, conn); + conn = connHolder.reconnect(); + // 如果是可恢复的异常,则重试 + } else if (ObWriterUtils.isRecoverableError(e)) { + LOG.warn("Recoverable exception in OB. Roll back this write and hibernate for one minute. SQLState: {}. ErrorCode: {}", + e.getSQLState(), e.getErrorCode(), e); + conn.rollback(); + ObWriterUtils.sleep(60000); + // 其它异常直接退出,采用逐条写入方式 + } else { + LOG.warn("Exception in OB. Roll back this write and hibernate for one second. Write and submit the records one by one. SQLState: {}. ErrorCode: {}", + e.getSQLState(), e.getErrorCode(), e); + conn.rollback(); + ObWriterUtils.sleep(1000); + break; + } + } finally { + DBUtil.closeDBResources(ps, null); + } + } + } catch (SQLException e) { + LOG.warn("Exception in OB. Roll back this write. Write and submit the records one by one. SQLState: {}. ErrorCode: {}", + e.getSQLState(), e.getErrorCode(), e); + } + if (!success) { + doOneInsert(conn, buffer); + } + } + + private void checkMemstore(Connection conn) { + long now = System.currentTimeMillis(); + if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) { + return; + } + while (ObWriterUtils.isMemstoreFull(conn, memstoreThreshold)) { + LOG.warn("OB memstore is full,sleep 60 seconds, threshold=" + memstoreThreshold); + ObWriterUtils.sleep(60000); + } + lastCheckMemstoreTime = now; + } + + @Override + public void destroy(Configuration writerSliceConfig) { + // 把本级持有的conn关闭掉 + DBUtil.closeDBResources(null, connHolder.getConn()); + super.destroy(writerSliceConfig); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java new file mode 100644 index 00000000..8add5382 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WriterThreadPool { + private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class); + + private static ExecutorService executorService = Executors.newCachedThreadPool(); + + public WriterThreadPool() { + } + + public static ExecutorService getInstance() { + return executorService; + } + + public static synchronized void shutdown() { + LOG.info("start shutdown executor service..."); + executorService.shutdown(); + LOG.info("shutdown executor service success..."); + } + + public static synchronized void execute(InsertTask task) { + executorService.execute(task); + } + + public static synchronized void executeBatch(List tasks) { + for (InsertTask task : tasks) { + executorService.execute(task); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java new file mode 100644 index 00000000..5138c9cb --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java @@ -0,0 +1,71 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.writer.Key; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class DbUtils { + + protected static final Logger LOG = LoggerFactory.getLogger(DbUtils.class); + + public static String fetchSingleValueWithRetry(Configuration config, String query) { + final String username = config.getString(Key.USERNAME); + final String password = config.getString(Key.PASSWORD); + String jdbcUrl = config.getString(Key.JDBC_URL); + + if(jdbcUrl == null) { + List conns = config.getList(Constant.CONN_MARK, Object.class); + Configuration connConf = Configuration.from(conns.get(0).toString()); + jdbcUrl = connConf.getString(Key.JDBC_URL); + } + + Connection conn = null; + PreparedStatement stmt = null; + ResultSet result = null; + boolean need_retry = false; + String value = null; + int retry = 0; + do { + try { + if (retry > 0) { + int sleep = retry > 9 ? 500 : 1 << retry; + try { + TimeUnit.SECONDS.sleep(sleep); + } catch (InterruptedException e) { + } + LOG.warn("retry fetch value for {} the {} times", query, retry); + } + conn = DBUtil.getConnection(DataBaseType.OceanBase, jdbcUrl, username, password); + stmt = conn.prepareStatement(query); + result = stmt.executeQuery(); + if (result.next()) { + value = result.getString("Value"); + } else { + throw new RuntimeException("no values returned for " + query); + } + LOG.info("value for query [{}] is [{}]", query, value); + break; + } catch (SQLException e) { + need_retry = true; + ++retry; + LOG.warn("fetch value with {} error {}", query, e); + } finally { + DBUtil.closeDBResources(result, stmt, null); + } + } while (need_retry); + + return value; + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java new file mode 100644 index 00000000..368c3d17 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -0,0 +1,390 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.*; +import java.util.stream.Collectors; + +public class ObWriterUtils { + protected static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; + + private static String compatibleMode = null; + + public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) { + PreparedStatement ps = null; + ResultSet rs = null; + boolean result = false; + try { + String sysDbName = "oceanbase"; + if (isOracleMode()) { + sysDbName = "sys"; + } + ps = conn.prepareStatement(String.format(CHECK_MEMSTORE, sysDbName)); + ps.setDouble(1, memstoreThreshold); + rs = ps.executeQuery(); + // 只要有满足条件的,则表示当前租户 有个机器的memstore即将满 + result = rs.next(); + } catch (Throwable e) { + LOG.error("check memstore fail" + e.getMessage()); + result = false; + } finally { + //do not need to close the statment in ob1.0 + } + + LOG.info("isMemstoreFull=" + result); + return result; + } + + public static boolean isOracleMode(){ + return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE)); + } + + public static String getCompatibleMode() { + return compatibleMode; + } + + public static void setCompatibleMode(String mode) { + compatibleMode = mode; + } + + private static String buildDeleteSql (String tableName, List columns) { + StringBuilder builder = new StringBuilder("DELETE FROM "); + builder.append(tableName).append(" WHERE "); + for (int i = 0; i < columns.size(); i++) { + builder.append(columns.get(i)).append(" = ?"); + if (i != columns.size() - 1) { + builder.append(" and "); + } + } + return builder.toString(); + } + + private static int[] getColumnIndex(List columnsInIndex, List allColumns) { + allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList()); + int[] colIdx = new int[columnsInIndex.size()]; + for (int i = 0; i < columnsInIndex.size(); i++) { + int index = allColumns.indexOf(columnsInIndex.get(i)); + if (index < 0) { + throw new RuntimeException( + String.format("column {} is in unique or primary key but not in the column list.", + columnsInIndex.get(i))); + } + colIdx[i] = index; + } + return colIdx; + } + + public static List> buildDeleteSql(Connection conn, String dbName, String tableName, + List columns) { + List> deleteMeta = new ArrayList(); + Map> uniqueKeys = getAllUniqueIndex(conn, dbName, tableName); + for (Map.Entry> entry : uniqueKeys.entrySet()) { + List colNames = entry.getValue(); + String deleteSql = buildDeleteSql(tableName, colNames); + int[] colIdx = getColumnIndex(colNames, columns); + LOG.info("delete sql [{}], column index: {}", deleteSql, Arrays.toString(colIdx)); + deleteMeta.add(new ImmutablePair(deleteSql, colIdx)); + } + return deleteMeta; + } + + // this function is just for oracle mode + private static Map> getAllUniqueIndex(Connection conn, String dbName, String tableName) { + Map> uniqueKeys = new HashMap(); + if (tableName.contains("\\.")) { + dbName = tableName.split("\\.")[0]; + tableName = tableName.split("\\.")[1]; + } + dbName = dbName.toUpperCase(); + String sql = String.format("select cons.CONSTRAINT_NAME AS KEY_NAME, cols.COLUMN_NAME COLUMN_NAME " + + "from all_constraints cons, all_cons_columns cols " + + "WHERE cols.table_name = '%s' AND cons.constraint_type in('P', 'U') " + + " AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner " + + " AND cols.owner = '%s' " + + "Order by KEY_NAME, cols.POSITION", tableName, dbName); + + LOG.info("get all unique keys by sql {}", sql); + + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + while (rs.next()) { + String keyName = rs.getString("Key_name"); + String columnName = StringUtils.upperCase(rs.getString("Column_name")); + List s = uniqueKeys.get(keyName); + if (s == null) { + s = new ArrayList(); + uniqueKeys.put(keyName, s); + } + s.add(columnName); + } + } catch (Throwable e) { + LOG.error("show index from table fail :" + sql, e); + } finally { + asyncClose(rs, stmt, null); + } + return uniqueKeys; + } + + /** + * + * @param tableName + * @param columnHolders + * @param conn + * @param writeMode + * @return + */ + public static String buildWriteSql(String tableName, List columnHolders, + Connection conn, String writeMode, String obUpdateColumns) { + List valueHolders = new ArrayList(columnHolders.size()); + for (int i = 0; i < columnHolders.size(); i++) { + valueHolders.add("?"); + } + String writeDataSqlTemplate = new StringBuilder().append("INSERT INTO " + tableName + " (") + .append(StringUtils.join(columnHolders, ",")).append(") VALUES(") + .append(StringUtils.join(valueHolders, ",")).append(")").toString(); + + LOG.info("write mode: " + writeMode); + + // update mode + if (!writeMode.equals("insert")) { + if (obUpdateColumns == null) { + Set skipColumns = getSkipColumns(conn, tableName); + + StringBuilder columnList = new StringBuilder(); + for (String column : skipColumns) { + columnList.append(column).append(","); + } + LOG.info("Skip columns: " + columnList.toString()); + writeDataSqlTemplate = writeDataSqlTemplate + onDuplicateKeyUpdateString(columnHolders, skipColumns); + } else { + LOG.info("Update columns: " + obUpdateColumns); + writeDataSqlTemplate = writeDataSqlTemplate + onDuplicateKeyUpdateString(obUpdateColumns); + + } + } + + return writeDataSqlTemplate; + } + + private static Set getSkipColumns(Connection conn, String tableName) { + String sql = "show index from " + tableName; + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + Map> uniqueKeys = new HashMap>(); + while (rs.next()) { + String nonUnique = rs.getString("Non_unique"); + if (!"0".equals(nonUnique)) { + continue; + } + String keyName = rs.getString("Key_name"); + String columnName = StringUtils.upperCase(rs.getString("Column_name")); + Set s = uniqueKeys.get(keyName); + if (s == null) { + s = new HashSet(); + uniqueKeys.put(keyName, s); + } + s.add(columnName); + } + // If the table has only one primary/unique key, just skip the column in the update list, + // it is safe since this primary/unique key does not change when the data in this inserting + // row conflicts with existing values. + if (uniqueKeys.size() == 1) { + return uniqueKeys.values().iterator().next(); + } else if (uniqueKeys.size() > 1) { + // If this table has more than one primary/unique keys, then just skip the common columns in + // all primary/unique keys. These columns can be found in every the primary/unique keys so they + // must be intact when there are at least one primary/unique key conflicts between the new + // data and existing data. So keeping them unchanged is safe. + // + // We can not skip all the columns in primary/unique keys because there might be some fields + // which do not conflict with existing value. If we skip them in the update list of the INSERT + // statement, these fields will not get updated, then we will have some fields with new values + // while some with old values in the same row, which breaks data consistency. + Iterator keyNameIterator = uniqueKeys.keySet().iterator(); + Set skipColumns = uniqueKeys.get(keyNameIterator.next()); + while(keyNameIterator.hasNext()) { + skipColumns.retainAll(uniqueKeys.get(keyNameIterator.next())); + } + return skipColumns; + } + } catch (Throwable e) { + LOG.error("show index from table fail :" + sql, e); + } finally { + asyncClose(rs, stmt, null); + } + return Collections.emptySet(); + } + + /* + * build ON DUPLICATE KEY UPDATE sub clause from updateColumns user specified + */ + private static String onDuplicateKeyUpdateString(String updateColumns) { + if (updateColumns == null || updateColumns.length() < 1) { + return ""; + } + StringBuilder builder = new StringBuilder(); + builder.append(" ON DUPLICATE KEY UPDATE "); + List list = new ArrayList(); + for (String column : updateColumns.split(",")) { + list.add(column + "=VALUES(" + column + ")"); + } + builder.append(StringUtils.join(list, ',')); + return builder.toString(); + } + + private static String onDuplicateKeyUpdateString(List columnHolders, Set skipColumns) { + if (columnHolders == null || columnHolders.size() < 1) { + return ""; + } + StringBuilder builder = new StringBuilder(); + builder.append(" ON DUPLICATE KEY UPDATE "); + List list = new ArrayList(); + for (String column : columnHolders) { + // skip update columns + if (skipColumns.contains(column.toUpperCase())) { + continue; + } + list.add(column + "=VALUES(" + column + ")"); + } + if (!list.isEmpty()) { + builder.append(StringUtils.join(list, ',')); + } else { + // 如果除了UK 没有别的字段,则更新第一个字段 + String column = columnHolders.get(0); + builder.append(column + "=VALUES(" + column + ")"); + } + return builder.toString(); + } + + /** + * 休眠n毫秒 + * + * @param ms + * 毫秒 + */ + public static void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } + + /** + * 致命错误 + * + * @param e + * @return + */ + public static boolean isFatalError(SQLException e) { + String sqlState = e.getSQLState(); + if (StringUtils.startsWith(sqlState, "08")) { + return true; + } + final int errorCode = Math.abs(e.getErrorCode()); + switch (errorCode) { + // Communications Errors + case 1040: // ER_CON_COUNT_ERROR + case 1042: // ER_BAD_HOST_ERROR + case 1043: // ER_HANDSHAKE_ERROR + case 1047: // ER_UNKNOWN_COM_ERROR + case 1081: // ER_IPSOCK_ERROR + case 1129: // ER_HOST_IS_BLOCKED + case 1130: // ER_HOST_NOT_PRIVILEGED + // Authentication Errors + case 1045: // ER_ACCESS_DENIED_ERROR + // Resource errors + case 1004: // ER_CANT_CREATE_FILE + case 1005: // ER_CANT_CREATE_TABLE + case 1015: // ER_CANT_LOCK + case 1021: // ER_DISK_FULL + case 1041: // ER_OUT_OF_RESOURCES + case 1094: // Unknown thread id: %lu + // Out-of-memory errors + case 1037: // ER_OUTOFMEMORY + case 1038: // ER_OUT_OF_SORTMEMORY + return true; + } + + if (StringUtils.isNotBlank(e.getMessage())) { + final String errorText = e.getMessage().toUpperCase(); + + if (errorCode == 0 + && (errorText.indexOf("COMMUNICATIONS LINK FAILURE") > -1 + || errorText.indexOf("COULD NOT CREATE CONNECTION") > -1) + || errorText.indexOf("NO DATASOURCE") > -1 || errorText.indexOf("NO ALIVE DATASOURCE") > -1 + || errorText.indexOf("NO OPERATIONS ALLOWED AFTER CONNECTION CLOSED") > -1) { + return true; + } + } + return false; + } + + /** + * 可恢复的错误 + * + * @param e + * @return + */ + public static boolean isRecoverableError(SQLException e) { + int error = Math.abs(e.getErrorCode()); + // 明确可恢复 + if (white.contains(error)) { + return true; + } + // 明确不可恢复 + if (black.contains(error)) { + return false; + } + // 超过4000的,都是OB特有的ErrorCode + return error > 4020; + } + + private static Set white = new HashSet(); + static { + int[] errList = { 1213, 1047, 1041, 1094, 4000, 4012 }; + for (int err : errList) { + white.add(err); + } + } + // 不考虑4000以下的 + private static Set black = new HashSet(); + static { + int[] errList = { 4022, 4025, 4026, 4028, 4029, 4031, 4033, 4034, 4037, 4041, 4044 }; + for (int err : errList) { + black.add(err); + } + } + + /** + * 由于ObProxy存在bug,事务超时或事务被杀时,conn的close是没有响应的 + * + * @param rs + * @param stmt + * @param conn + */ + public static void asyncClose(final ResultSet rs, final Statement stmt, final Connection conn) { + Thread t = new Thread() { + public void run() { + DBUtil.closeDBResources(rs, stmt, conn); + } + }; + t.setDaemon(true); + t.start(); + } +} diff --git a/oceanbasev10writer/src/main/libs/oceanbase-connector-java-3.2.0.jar b/oceanbasev10writer/src/main/libs/oceanbase-connector-java-3.2.0.jar new file mode 100644 index 00000000..239f3dc4 Binary files /dev/null and b/oceanbasev10writer/src/main/libs/oceanbase-connector-java-3.2.0.jar differ diff --git a/oceanbasev10writer/src/main/resources/plugin.json b/oceanbasev10writer/src/main/resources/plugin.json new file mode 100644 index 00000000..23154c31 --- /dev/null +++ b/oceanbasev10writer/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "oceanbasev10writer", + "class": "com.alibaba.datax.plugin.writer.oceanbasev10writer.OceanBaseV10Writer", + "description": "write data into oceanbase with sql interface", + "developer": "oceanbase" +} \ No newline at end of file diff --git a/package.xml b/package.xml index 49e3c4ec..882dd23b 100755 --- a/package.xml +++ b/package.xml @@ -33,7 +33,7 @@ datax - oceanbasereader/target/datax/ + oceanbasev10reader/target/datax/ **/*.* @@ -385,5 +385,12 @@ datax + + oceanbasev10writer/target/datax/ + + **/*.* + + datax + diff --git a/plugin-rdbms-util/pom.xml b/plugin-rdbms-util/pom.xml index e6884fb1..282a39e8 100755 --- a/plugin-rdbms-util/pom.xml +++ b/plugin-rdbms-util/pom.xml @@ -63,5 +63,12 @@ guava r05 - + + com.alipay.oceanbase + oceanbase-client + 1.1.10 + system + ${basedir}/src/main/libs/oceanbase-client-1.1.10.jar + + diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java index 0e10c742..9f2939c4 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java @@ -48,4 +48,11 @@ public final class Key { public static String SPLIT_FACTOR = "splitFactor"; -} \ No newline at end of file + public final static String WEAK_READ = "weakRead"; + + public final static String SAVE_POINT = "savePoint"; + + public final static String REUSE_CONN = "reuseConn"; + + public final static String PARTITION_NAME = "partitionName"; +} diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java index 63d1621b..2392d1ca 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java @@ -358,7 +358,7 @@ public final class DBUtil { String url, String user, String pass, String socketTimeout) { //ob10的处理 - if (url.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING) && dataBaseType == DataBaseType.MySql) { + if (url.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { String[] ss = url.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length != 3) { throw DataXException @@ -367,7 +367,7 @@ public final class DBUtil { } LOG.info("this is ob1_0 jdbc url."); user = ss[1].trim() +":"+user; - url = ss[2]; + url = ss[2].replace("jdbc:mysql:", "jdbc:oceanbase:"); LOG.info("this is ob1_0 jdbc url. user="+user+" :url="+url); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java index b6b8140c..205919fe 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java @@ -21,7 +21,8 @@ public enum DataBaseType { ADS("ads","com.mysql.jdbc.Driver"), ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"), KingbaseES("kingbasees", "com.kingbase8.Driver"), - Oscar("oscar", "com.oscar.Driver"); + Oscar("oscar", "com.oscar.Driver"), + OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver"); private String typeName; @@ -42,6 +43,7 @@ public enum DataBaseType { switch (this) { case MySql: case DRDS: + case OceanBase: suffix = "yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true"; if (jdbc.contains("?")) { result = jdbc + "&" + suffix; @@ -108,6 +110,14 @@ public enum DataBaseType { break; case Oscar: break; + case OceanBase: + suffix = "yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true"; + if (jdbc.contains("?")) { + result = jdbc + "&" + suffix; + } else { + result = jdbc + "?" + suffix; + } + break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index 440aac2a..27b88f44 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -402,13 +402,20 @@ public class CommonRdbmsWriter { throws SQLException { for (int i = 0; i < this.columnNumber; i++) { int columnSqltype = this.resultSetMetaData.getMiddle().get(i); - preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, record.getColumn(i)); + String typeName = this.resultSetMetaData.getRight().get(i); + preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i)); } return preparedStatement; } - protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, Column column) throws SQLException { + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, + int columnSqltype, Column column) throws SQLException { + return fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqltype, null, column); + } + + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, + int columnSqltype, String typeName, Column column) throws SQLException { java.util.Date utilDate; switch (columnSqltype) { case Types.CHAR: @@ -451,8 +458,11 @@ public class CommonRdbmsWriter { // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 case Types.DATE: - if (this.resultSetMetaData.getRight().get(columnIndex) - .equalsIgnoreCase("year")) { + if (typeName == null) { + typeName = this.resultSetMetaData.getRight().get(columnIndex); + } + + if (typeName.equalsIgnoreCase("year")) { if (column.asBigInteger() == null) { preparedStatement.setString(columnIndex + 1, null); } else { diff --git a/plugin-rdbms-util/src/main/libs/oceanbase-client-1.1.10.jar b/plugin-rdbms-util/src/main/libs/oceanbase-client-1.1.10.jar new file mode 100644 index 00000000..38162912 Binary files /dev/null and b/plugin-rdbms-util/src/main/libs/oceanbase-client-1.1.10.jar differ diff --git a/pom.xml b/pom.xml index 4f0bf65f..3bd75a31 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ opentsdbreader cassandrareader gdbreader + oceanbasev10reader mysqlwriter @@ -99,6 +100,7 @@ cassandrawriter clickhousewriter oscarwriter + oceanbasev10writer plugin-rdbms-util plugin-unstructured-storage-util