From 8478f7a21282e2bdb3d1697a67bfe57fef4813dc Mon Sep 17 00:00:00 2001 From: "ranyu.zyh" Date: Thu, 3 Apr 2025 13:25:32 +0800 Subject: [PATCH] feature:oceanbase plugin add direct path support --- oceanbasev10writer/pom.xml | 5 + .../writer/oceanbasev10writer/Config.java | 92 +- .../oceanbasev10writer/common/Table.java | 88 ++ .../oceanbasev10writer/common/TableCache.java | 21 + .../AbstractRestrictedConnection.java | 257 +++++ .../AbstractRestrictedPreparedStatement.java | 663 +++++++++++++ .../directPath/DirectLoaderBuilder.java | 170 ++++ .../directPath/DirectPathConnection.java | 398 ++++++++ .../directPath/DirectPathConstants.java | 12 + .../DirectPathPreparedStatement.java | 164 ++++ .../directPath/ObTableDirectLoad.java | 154 +++ .../ext/AbstractConnHolder.java | 13 + .../ext/DirectPathAbstractConnHolder.java | 61 ++ .../ext/DirectPathConnHolder.java | 115 +++ .../oceanbasev10writer/ext/OCJConnHolder.java | 62 +- .../ext/ObClientConnHolder.java | 89 +- .../ext/ServerConnectInfo.java | 185 ++-- .../task/AbstractInsertTask.java | 127 +++ .../task/ConcurrentTableWriterTask.java | 911 +++++++++--------- .../task/DirectPathInsertTask.java | 68 ++ .../oceanbasev10writer/task/InsertTask.java | 15 +- .../task/WriterThreadPool.java | 5 +- 22 files changed, 3045 insertions(+), 630 deletions(-) create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/Table.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/TableCache.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedConnection.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedPreparedStatement.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectLoaderBuilder.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConnection.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConstants.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathPreparedStatement.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/ObTableDirectLoad.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathAbstractConnHolder.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathConnHolder.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/AbstractInsertTask.java create mode 100644 oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/DirectPathInsertTask.java diff --git a/oceanbasev10writer/pom.xml b/oceanbasev10writer/pom.xml index 22b64c58..d1986401 100644 --- a/oceanbasev10writer/pom.xml +++ b/oceanbasev10writer/pom.xml @@ -115,6 +115,11 @@ 4.11 test + + com.oceanbase + obkv-table-client + 1.4.0 + com.oceanbase obkv-hbase-client diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java index 6776196b..a90de0a3 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java @@ -2,62 +2,86 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer; public interface Config { - String MEMSTORE_THRESHOLD = "memstoreThreshold"; + String MEMSTORE_THRESHOLD = "memstoreThreshold"; - double DEFAULT_MEMSTORE_THRESHOLD = 0.9d; + double DEFAULT_MEMSTORE_THRESHOLD = 0.9d; - double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d; - String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond"; + double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d; - long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30; + String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond"; - int DEFAULT_BATCH_SIZE = 100; - int MAX_BATCH_SIZE = 4096; + long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30; - String FAIL_TRY_COUNT = "failTryCount"; + int DEFAULT_BATCH_SIZE = 100; - int DEFAULT_FAIL_TRY_COUNT = 10000; + int MAX_BATCH_SIZE = 4096; - String WRITER_THREAD_COUNT = "writerThreadCount"; + String FAIL_TRY_COUNT = "failTryCount"; - int DEFAULT_WRITER_THREAD_COUNT = 1; + int DEFAULT_FAIL_TRY_COUNT = 10000; - String CONCURRENT_WRITE = "concurrentWrite"; + String WRITER_THREAD_COUNT = "writerThreadCount"; - boolean DEFAULT_CONCURRENT_WRITE = true; + int DEFAULT_WRITER_THREAD_COUNT = 1; - String OB_VERSION = "obVersion"; - String TIMEOUT = "timeout"; + String CONCURRENT_WRITE = "concurrentWrite"; - String PRINT_COST = "printCost"; - boolean DEFAULT_PRINT_COST = false; + boolean DEFAULT_CONCURRENT_WRITE = true; - String COST_BOUND = "costBound"; - long DEFAULT_COST_BOUND = 20; + String OB_VERSION = "obVersion"; - String MAX_ACTIVE_CONNECTION = "maxActiveConnection"; - int DEFAULT_MAX_ACTIVE_CONNECTION = 2000; + String TIMEOUT = "timeout"; - String WRITER_SUB_TASK_COUNT = "writerSubTaskCount"; - int DEFAULT_WRITER_SUB_TASK_COUNT = 1; - int MAX_WRITER_SUB_TASK_COUNT = 4096; + String PRINT_COST = "printCost"; + + boolean DEFAULT_PRINT_COST = false; + + String COST_BOUND = "costBound"; + + long DEFAULT_COST_BOUND = 20; + + String MAX_ACTIVE_CONNECTION = "maxActiveConnection"; + + int DEFAULT_MAX_ACTIVE_CONNECTION = 2000; + + String WRITER_SUB_TASK_COUNT = "writerSubTaskCount"; + + int DEFAULT_WRITER_SUB_TASK_COUNT = 1; + + int MAX_WRITER_SUB_TASK_COUNT = 4096; + + String OB_WRITE_MODE = "obWriteMode"; - String OB_WRITE_MODE = "obWriteMode"; String OB_COMPATIBLE_MODE = "obCompatibilityMode"; + String OB_COMPATIBLE_MODE_ORACLE = "ORACLE"; + String OB_COMPATIBLE_MODE_MYSQL = "MYSQL"; - String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout"; - int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s + String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout"; - String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout"; - int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s + int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s - String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout"; - int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s + String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout"; - String OB_UPDATE_COLUMNS = "obUpdateColumns"; + int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s - String USE_PART_CALCULATOR = "usePartCalculator"; - boolean DEFAULT_USE_PART_CALCULATOR = false; + String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout"; + + int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s + + String OB_UPDATE_COLUMNS = "obUpdateColumns"; + + String USE_PART_CALCULATOR = "usePartCalculator"; + + boolean DEFAULT_USE_PART_CALCULATOR = false; + + String BLOCKS_COUNT = "blocksCount"; + + String DIRECT_PATH = "directPath"; + + String RPC_PORT = "rpcPort"; + + // 区别于recordLimit,这个参数仅针对某张表。即一张表超过最大错误数不会影响其他表。仅用于旁路导入。 + String MAX_ERRORS = "maxErrors"; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/Table.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/Table.java new file mode 100644 index 00000000..53c70254 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/Table.java @@ -0,0 +1,88 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.common; + +import java.util.Objects; + +public class Table { + private String tableName; + private String dbName; + private Throwable error; + private Status status; + + public Table(String dbName, String tableName) { + this.dbName = dbName; + this.tableName = tableName; + this.status = Status.INITIAL; + } + + public Throwable getError() { + return error; + } + + public void setError(Throwable error) { + this.error = error; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Table table = (Table) o; + return tableName.equals(table.tableName) && dbName.equals(table.dbName); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, dbName); + } + + public enum Status { + /** + * + */ + INITIAL(0), + + /** + * + */ + RUNNING(1), + + /** + * + */ + FAILURE(2), + + /** + * + */ + SUCCESS(3); + + private int code; + + /** + * @param code + */ + private Status(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/TableCache.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/TableCache.java new file mode 100644 index 00000000..c59dca06 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/common/TableCache.java @@ -0,0 +1,21 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.common; + +import java.util.concurrent.ConcurrentHashMap; + +public class TableCache { + private static final TableCache INSTANCE = new TableCache(); + private final ConcurrentHashMap TABLE_CACHE; + + private TableCache() { + TABLE_CACHE = new ConcurrentHashMap<>(); + } + + public static TableCache getInstance() { + return INSTANCE; + } + + public Table getTable(String dbName, String tableName) { + String fullTableName = String.join("-", dbName, tableName); + return TABLE_CACHE.computeIfAbsent(fullTableName, (k) -> new Table(dbName, tableName)); + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedConnection.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedConnection.java new file mode 100644 index 00000000..717e2e36 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedConnection.java @@ -0,0 +1,257 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +public abstract class AbstractRestrictedConnection implements java.sql.Connection { + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + throw new UnsupportedOperationException("prepareCall(String) is unsupported"); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + throw new UnsupportedOperationException("nativeSQL(String) is unsupported"); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + throw new UnsupportedOperationException("setAutoCommit(boolean) is unsupported"); + } + + @Override + public boolean getAutoCommit() throws SQLException { + throw new UnsupportedOperationException("getAutoCommit is unsupported"); + } + + @Override + public void abort(Executor executor) throws SQLException { + throw new UnsupportedOperationException("abort(Executor) is unsupported"); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + throw new UnsupportedOperationException("setNetworkTimeout(Executor, int) is unsupported"); + } + + @Override + public int getNetworkTimeout() throws SQLException { + throw new UnsupportedOperationException("getNetworkTimeout is unsupported"); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + throw new UnsupportedOperationException("getMetaData is unsupported"); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + throw new UnsupportedOperationException("setReadOnly(boolean) is unsupported"); + } + + @Override + public boolean isReadOnly() throws SQLException { + throw new UnsupportedOperationException("isReadOnly is unsupported"); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + throw new UnsupportedOperationException("setCatalog(String) is unsupported"); + } + + @Override + public String getCatalog() throws SQLException { + throw new UnsupportedOperationException("getCatalog is unsupported"); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + throw new UnsupportedOperationException("setTransactionIsolation(int) is unsupported"); + } + + @Override + public int getTransactionIsolation() throws SQLException { + throw new UnsupportedOperationException("getTransactionIsolation is unsupported"); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + throw new UnsupportedOperationException("getWarnings is unsupported"); + } + + @Override + public void clearWarnings() throws SQLException { + throw new UnsupportedOperationException("clearWarnings is unsupported"); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException("createStatement(int, int) is unsupported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException("prepareStatement(String, int, int) is unsupported"); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException("prepareCall(String, int, int) is unsupported"); + } + + @Override + public Map> getTypeMap() throws SQLException { + throw new UnsupportedOperationException("getTypeMap is unsupported"); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + throw new UnsupportedOperationException("setTypeMap(Map>) is unsupported"); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + throw new UnsupportedOperationException("setHoldability is unsupported"); + } + + @Override + public int getHoldability() throws SQLException { + throw new UnsupportedOperationException("getHoldability is unsupported"); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new UnsupportedOperationException("setSavepoint is unsupported"); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + throw new UnsupportedOperationException("setSavepoint(String) is unsupported"); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + throw new UnsupportedOperationException("rollback(Savepoint) is unsupported"); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + throw new UnsupportedOperationException("releaseSavepoint(Savepoint) is unsupported"); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException("createStatement(int, int, int) is unsupported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException("prepareStatement(String, int, int, int) is unsupported"); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException("prepareCall(String, int, int, int) is unsupported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + throw new UnsupportedOperationException("prepareStatement(String, int) is unsupported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + throw new UnsupportedOperationException("prepareStatement(String, int[]) is unsupported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + throw new UnsupportedOperationException("prepareStatement(String, String[]) is unsupported"); + } + + @Override + public Clob createClob() throws SQLException { + throw new UnsupportedOperationException("createClob is unsupported"); + } + + @Override + public Blob createBlob() throws SQLException { + throw new UnsupportedOperationException("createBlob is unsupported"); + } + + @Override + public NClob createNClob() throws SQLException { + throw new UnsupportedOperationException("createNClob is unsupported"); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw new UnsupportedOperationException("createSQLXML is unsupported"); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + throw new UnsupportedOperationException("isValid(int) is unsupported"); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + throw new UnsupportedOperationException("setClientInfo(String, String) is unsupported"); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + throw new UnsupportedOperationException("setClientInfo(Properties) is unsupported"); + } + + @Override + public String getClientInfo(String name) throws SQLException { + throw new UnsupportedOperationException("getClientInfo(String) is unsupported"); + } + + @Override + public Properties getClientInfo() throws SQLException { + throw new UnsupportedOperationException("getClientInfo is unsupported"); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + throw new UnsupportedOperationException("createArrayOf(String, Object[]) is unsupported"); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + throw new UnsupportedOperationException("createStruct(String, Object[]) is unsupported"); + } + + @Override + public void setSchema(String schema) throws SQLException { + throw new UnsupportedOperationException("setSchema(String) is unsupported"); + } + + @Override + public T unwrap(Class iface) throws SQLException { + throw new UnsupportedOperationException("unwrap(Class) is unsupported"); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + throw new UnsupportedOperationException("isWrapperFor(Class) is unsupported"); + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedPreparedStatement.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedPreparedStatement.java new file mode 100644 index 00000000..47b2c98b --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/AbstractRestrictedPreparedStatement.java @@ -0,0 +1,663 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.URL; +import java.nio.charset.Charset; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.util.Calendar; +import java.util.List; + +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType; +import com.alipay.oceanbase.rpc.util.ObVString; +import org.apache.commons.io.IOUtils; + +public abstract class AbstractRestrictedPreparedStatement implements java.sql.PreparedStatement { + + private boolean closed; + + @Override + public void setNull(int parameterIndex, int sqlType) throws SQLException { + this.setParameter(parameterIndex, createObObj(null)); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + throw new UnsupportedOperationException("setNull(int, int, String) is unsupported"); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setDate(int parameterIndex, Date x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + throw new UnsupportedOperationException("setDate(int, Date, Calendar) is unsupported"); + } + + @Override + public void setTime(int parameterIndex, Time x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + throw new UnsupportedOperationException("setTime(int, Time, Calendar) is unsupported"); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + throw new UnsupportedOperationException("setTimestamp(int, Timestamp, Calendar) is unsupported"); + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + throw new UnsupportedOperationException("setObject(int, Object, int) is unsupported"); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + throw new UnsupportedOperationException("setObject(int, Object, int, int) is unsupported"); + } + + @Override + public void setRef(int parameterIndex, Ref x) throws SQLException { + throw new UnsupportedOperationException("setRef(int, Ref) is unsupported"); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + throw new UnsupportedOperationException("setArray(int, Array) is unsupported"); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + throw new UnsupportedOperationException("setSQLXML(int, SQLXML) is unsupported"); + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + // if (x == null) { + // this.setParameter(parameterIndex, createObObj(x)); + // } else { + // // TODO If need BackslashEscapes and character encoding ? + // this.setParameter(parameterIndex, createObObj(x.toString())); + // } + throw new UnsupportedOperationException("setURL(int, URL) is unsupported"); + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + throw new UnsupportedOperationException("setRowId(int, RowId) is unsupported"); + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + this.setParameter(parameterIndex, createObObj(value)); + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setBlob(int parameterIndex, InputStream x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setBlob(int parameterIndex, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException("setBlob(int, InputStream, length) is unsupported"); + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setClob(int parameterIndex, Reader x) throws SQLException { + this.setCharacterStream(parameterIndex, x); + } + + @Override + public void setClob(int parameterIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException("setClob(int, Reader, length) is unsupported"); + } + + @Override + public void setNClob(int parameterIndex, NClob x) throws SQLException { + this.setClob(parameterIndex, (Clob) (x)); + } + + @Override + public void setNClob(int parameterIndex, Reader x) throws SQLException { + this.setClob(parameterIndex, x); + } + + @Override + public void setNClob(int parameterIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException("setNClob(int, Reader, length) is unsupported"); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Deprecated + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x, int length) throws SQLException { + throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported"); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader x) throws SQLException { + this.setParameter(parameterIndex, createObObj(x)); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException("setNCharacterStream(int, InputStream, length) is unsupported"); + } + + /** + * @return boolean + */ + protected abstract boolean isOracleMode(); + + /** + * Set parameter to the target position. + * + * @param parameterIndex + * @param obObj + * @throws SQLException + */ + protected abstract void setParameter(int parameterIndex, ObObj obObj) throws SQLException; + + /** + * Close the current prepared statement. + * + * @throws SQLException + */ + @Override + public void close() throws SQLException { + this.closed = true; + } + + /** + * Return whether the current prepared statement is closed? + * + * @return boolean + * @throws SQLException + */ + @Override + public boolean isClosed() throws SQLException { + return this.closed; + } + + /** + * Create a {@link ObObj } array with input values. + * + * @param values Original row value + * @return ObObj[] + */ + public ObObj[] createObObjArray(Object[] values) { + if (values == null) { + return null; + } + ObObj[] array = new ObObj[values.length]; + for (int i = 0; i < values.length; i++) { + array[i] = createObObj(values[i]); + } + return array; + } + + /** + * Create a {@link ObObj } array with input values. + * + * @param values Original row value + * @return ObObj[] + */ + public ObObj[] createObObjArray(List values) { + if (values == null) { + return null; + } + ObObj[] array = new ObObj[values.size()]; + for (int i = 0; i < values.size(); i++) { + array[i] = createObObj(values.get(i)); + } + return array; + } + + /** + * Create a {@link ObObj } instance. + * + * @param value Original column value + * @return ObObj + */ + public ObObj createObObj(Object value) { + try { + // Only used for strongly typed declared variables + Object convertedValue = value == null ? null : convertValue(value); + return new ObObj(ObObjType.defaultObjMeta(convertedValue), convertedValue); + } catch (Exception ex) { + throw new IllegalArgumentException(ex); + } + } + + /** + * Some values with data type is unsupported by ObObjType#valueOfType. + * We should convert the input value to supported value data type. + * + * @param value + * @return Object + * @throws Exception + */ + public static Object convertValue(Object value) throws Exception { + if (value instanceof BigDecimal) { + return value.toString(); + } else if (value instanceof BigInteger) { + return value.toString(); + } else if (value instanceof Instant) { + return Timestamp.from(((Instant) value)); + } else if (value instanceof LocalDate) { + // Warn: java.sql.Date.valueOf() is deprecated. As local zone is used. + return Date.valueOf(((LocalDate) value)); + } else if (value instanceof LocalTime) { + // Warn: java.sql.Time.valueOf() is deprecated. + Time t = Time.valueOf((LocalTime) value); + return new Timestamp(t.getTime()); + } else if (value instanceof LocalDateTime) { + return Timestamp.valueOf(((LocalDateTime) value)); + } else if (value instanceof OffsetDateTime) { + return Timestamp.from(((OffsetDateTime) value).toInstant()); + } else if (value instanceof Time) { + return new Timestamp(((Time) value).getTime()); + } else if (value instanceof ZonedDateTime) { + // Note: Be care of time zone!!! + return Timestamp.from(((ZonedDateTime) value).toInstant()); + } else if (value instanceof OffsetTime) { + LocalTime lt = ((OffsetTime) value).toLocalTime(); + // Warn: java.sql.Time.valueOf() is deprecated. + return new Timestamp(Time.valueOf(lt).getTime()); + } else if (value instanceof InputStream) { + try (InputStream is = ((InputStream) value)) { + // Note: Be care of character set!!! + return new ObVString(IOUtils.toString(is, Charset.defaultCharset())); + } + } else if (value instanceof Blob) { + Blob b = (Blob) value; + try (InputStream is = b.getBinaryStream()) { + if (is == null) { + return null; + } + // Note: Be care of character set!!! + return new ObVString(IOUtils.toString(is, Charset.defaultCharset())); + } finally { + b.free(); + } + } else if (value instanceof Reader) { + try (Reader r = ((Reader) value)) { + return IOUtils.toString(r); + } + } else if (value instanceof Clob) { + Clob c = (Clob) value; + try (Reader r = c.getCharacterStream()) { + return r == null ? null : IOUtils.toString(r); + } finally { + c.free(); + } + } else { + return value; + } + } + + // *********************************************************************************** // + + @Override + public boolean getMoreResults(int current) throws SQLException { + throw new UnsupportedOperationException("getMoreResults(int) is unsupported"); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + throw new UnsupportedOperationException("getGeneratedKeys is unsupported"); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + throw new UnsupportedOperationException("executeUpdate(String, int) is unsupported"); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + throw new UnsupportedOperationException("executeUpdate(String, int[]) is unsupported"); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + throw new UnsupportedOperationException("executeUpdate(String, String[]) is unsupported"); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + throw new UnsupportedOperationException("execute(String, int) is unsupported"); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + throw new UnsupportedOperationException("execute(String, int[]) is unsupported"); + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + throw new UnsupportedOperationException("execute(String, String[]) is unsupported"); + } + + @Override + public int getResultSetHoldability() throws SQLException { + throw new UnsupportedOperationException("getResultSetHoldability is unsupported"); + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + throw new UnsupportedOperationException("setPoolable(boolean) is unsupported"); + } + + @Override + public boolean isPoolable() throws SQLException { + throw new UnsupportedOperationException("isPoolable is unsupported"); + } + + @Override + public void closeOnCompletion() throws SQLException { + throw new UnsupportedOperationException("closeOnCompletion is unsupported"); + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + throw new UnsupportedOperationException("isCloseOnCompletion is unsupported"); + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + throw new UnsupportedOperationException("executeQuery(String) is unsupported"); + } + + @Override + public int executeUpdate(String sql) throws SQLException { + throw new UnsupportedOperationException("executeUpdate(String) is unsupported"); + } + + @Override + public int getMaxFieldSize() throws SQLException { + throw new UnsupportedOperationException("getMaxFieldSize is unsupported"); + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + throw new UnsupportedOperationException("setMaxFieldSize(int) is unsupported"); + } + + @Override + public int getMaxRows() throws SQLException { + throw new UnsupportedOperationException("getMaxRows is unsupported"); + } + + @Override + public void setMaxRows(int max) throws SQLException { + throw new UnsupportedOperationException("setMaxRows(int) is unsupported"); + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + throw new UnsupportedOperationException("setEscapeProcessing(boolean) is unsupported"); + } + + @Override + public int getQueryTimeout() throws SQLException { + throw new UnsupportedOperationException("getQueryTimeout is unsupported"); + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + throw new UnsupportedOperationException("setQueryTimeout(int) is unsupported"); + } + + @Override + public void cancel() throws SQLException { + throw new UnsupportedOperationException("cancel is unsupported"); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + throw new UnsupportedOperationException("getWarnings is unsupported"); + } + + @Override + public void clearWarnings() throws SQLException { + throw new UnsupportedOperationException("clearWarnings is unsupported"); + } + + @Override + public void setCursorName(String name) throws SQLException { + throw new UnsupportedOperationException("setCursorName(String) is unsupported"); + } + + @Override + public boolean execute(String sql) throws SQLException { + throw new UnsupportedOperationException("execute(String) is unsupported"); + } + + @Override + public ResultSet getResultSet() throws SQLException { + throw new UnsupportedOperationException("getResultSet is unsupported"); + } + + @Override + public int getUpdateCount() throws SQLException { + throw new UnsupportedOperationException("getUpdateCount is unsupported"); + } + + @Override + public boolean getMoreResults() throws SQLException { + throw new UnsupportedOperationException("getMoreResults is unsupported"); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + throw new UnsupportedOperationException("setFetchDirection(int) is unsupported"); + } + + @Override + public int getFetchDirection() throws SQLException { + throw new UnsupportedOperationException("getFetchDirection is unsupported"); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + throw new UnsupportedOperationException("setFetchSize(int) is unsupported"); + } + + @Override + public int getFetchSize() throws SQLException { + throw new UnsupportedOperationException("getFetchSize is unsupported"); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + throw new UnsupportedOperationException("getResultSetConcurrency is unsupported"); + } + + @Override + public int getResultSetType() throws SQLException { + throw new UnsupportedOperationException("getResultSetType is unsupported"); + } + + @Override + public void addBatch(String sql) throws SQLException { + throw new UnsupportedOperationException("addBatch(String) is unsupported"); + } + + @Override + public ResultSet executeQuery() throws SQLException { + throw new UnsupportedOperationException("executeQuery is unsupported"); + } + + @Override + public int executeUpdate() throws SQLException { + throw new UnsupportedOperationException("executeUpdate is unsupported"); + } + + @Override + public boolean execute() throws SQLException { + throw new UnsupportedOperationException("execute is unsupported"); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + throw new UnsupportedOperationException("getParameterMetaData is unsupported"); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + throw new UnsupportedOperationException("getMetaData is unsupported"); + } + + @Override + public T unwrap(Class iface) throws SQLException { + throw new UnsupportedOperationException("isWrapperFor(Class) is unsupported"); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + throw new UnsupportedOperationException("isWrapperFor(Class) is unsupported"); + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectLoaderBuilder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectLoaderBuilder.java new file mode 100644 index 00000000..22664947 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectLoaderBuilder.java @@ -0,0 +1,170 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.io.Serializable; + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement; +import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType; +import org.apache.commons.lang.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The builder for {@link ObTableDirectLoad}. + */ +public class DirectLoaderBuilder implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(DirectLoaderBuilder.class); + private String host; + private int port; + + private String user; + private String tenant; + private String password; + + private String schema; + private String table; + + /** + * Server-side parallelism. + */ + private int parallel; + + private long maxErrorCount; + + private ObLoadDupActionType duplicateKeyAction; + + /** + * The overall timeout of the direct load task + */ + private Long timeout; + + private Long heartBeatTimeout; + + private Long heartBeatInterval; + + public DirectLoaderBuilder host(String host) { + this.host = host; + return this; + } + + public DirectLoaderBuilder port(int port) { + this.port = port; + return this; + } + + public DirectLoaderBuilder user(String user) { + //1.4.0的obkv版本只需要用户名称,不能带租户和集群信息 + int indexOf = user.indexOf("@"); + this.user = user; + if (indexOf > 0) { + this.user = user.substring(0, indexOf); + } + return this; + } + + public DirectLoaderBuilder tenant(String tenant) { + this.tenant = tenant; + return this; + } + + public DirectLoaderBuilder password(String password) { + this.password = password; + return this; + } + + public DirectLoaderBuilder schema(String schema) { + this.schema = schema; + return this; + } + + public DirectLoaderBuilder table(String table) { + this.table = table; + return this; + } + + public DirectLoaderBuilder parallel(int parallel) { + this.parallel = parallel; + return this; + } + + public DirectLoaderBuilder maxErrorCount(long maxErrorCount) { + this.maxErrorCount = maxErrorCount; + return this; + } + + public DirectLoaderBuilder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) { + this.duplicateKeyAction = duplicateKeyAction; + return this; + } + + public DirectLoaderBuilder timeout(long timeout) { + this.timeout = timeout; + return this; + } + + public DirectLoaderBuilder heartBeatTimeout(Long heartBeatTimeout) { + this.heartBeatTimeout = heartBeatTimeout; + return this; + } + + public DirectLoaderBuilder heartBeatInterval(Long heartBeatInterval) { + this.heartBeatInterval = heartBeatInterval; + return this; + } + + public ObTableDirectLoad build() { + try { + ObDirectLoadConnection obDirectLoadConnection = buildConnection(parallel); + ObDirectLoadStatement obDirectLoadStatement = buildStatement(obDirectLoadConnection); + return new ObTableDirectLoad(schema, table, obDirectLoadStatement, obDirectLoadConnection); + } catch (ObDirectLoadException e) { + throw new ObTableException(e.getMessage(), e); + } + } + + private ObDirectLoadConnection buildConnection(int writeThreadNum) throws ObDirectLoadException { + if (heartBeatTimeout == null || heartBeatInterval == null) { + throw new IllegalArgumentException("heartBeatTimeout and heartBeatInterval must not be null"); + } + ObDirectLoadConnection build = ObDirectLoadManager.getConnectionBuilder() + .setServerInfo(host, port) + .setLoginInfo(tenant, user, password, schema) + .setHeartBeatInfo(heartBeatTimeout, heartBeatInterval) + .enableParallelWrite(writeThreadNum) + .build(); + log.info("ObDirectLoadConnection value is:{}", ObjectUtils.toString(build)); + return build; + } + + private ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection) throws ObDirectLoadException { + ObDirectLoadStatement build = connection.getStatementBuilder() + .setTableName(table) + .setParallel(parallel) + .setQueryTimeout(timeout) + .setDupAction(duplicateKeyAction) + .setMaxErrorRowCount(maxErrorCount) + .build(); + log.info("ObDirectLoadStatement value is:{}", ObjectUtils.toString(build)); + return build; + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConnection.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConnection.java new file mode 100644 index 00000000..ce7ef7e3 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConnection.java @@ -0,0 +1,398 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.sql.SQLException; +import java.util.Arrays; + +import com.alibaba.datax.common.util.Configuration; + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkArgument; + +public class DirectPathConnection extends AbstractRestrictedConnection { + + private static final int OB_DIRECT_PATH_DEFAULT_BLOCKS = 1; + private static final long OB_DIRECT_PATH_HEART_BEAT_TIMEOUT = 60000; + private static final long OB_DIRECT_PATH_HEART_BEAT_INTERVAL = 10000; + private static final int DEFAULT_BUFFERSIZE = 1048576; + private final Configuration configuration; + + private State state; + private int commiters; + + private final int blocks; + private final ObTableDirectLoad load; + private final Object lock = new Object(); + + private static final Logger log = LoggerFactory.getLogger(DirectPathConnection.class); + + /** + * Construct a new instance. + * + * @param load + * @param blocks + */ + private DirectPathConnection(ObTableDirectLoad load, int blocks, Configuration configuration) { + this.configuration = configuration; + this.load = load; + this.blocks = blocks; + } + + /** + * Begin a new {@link DirectPathConnection } + * + * @return DirectPathConnection + * @throws SQLException + */ + public DirectPathConnection begin() throws SQLException { + synchronized (lock) { + if (state == null || state == State.CLOSED) { + try { + this.load.begin(); + this.state = State.BEGIN; + } catch (Exception ex) { + throw new SQLException(ex); + } + } else { + throw new IllegalStateException("Begin transaction failed as connection state is already BEGIN"); + } + } + return this; + } + + /** + * Commit buffered data with MAXIMUM timeout. + * + * @throws SQLException + */ + @Override + public void commit() throws SQLException { + synchronized (lock) { + if (state == State.BEGIN) { + this.commiters++; + if (commiters == blocks) { + try { + this.load.commit(); + state = State.FINISHED; + } catch (Exception ex) { + throw new SQLException(ex); + } + } else if (commiters > blocks) { + throw new IllegalStateException("Your commit have exceed the limit. (" + commiters + ">" + blocks + ")"); + } + } else { + throw new IllegalStateException("Commit transaction failed as connection state is not BEGIN"); + } + } + } + + /** + * Rollback if error occurred. + * + * @throws SQLException + */ + @Override + public void rollback() throws SQLException { + synchronized (lock) { + if (state == State.BEGIN) { + try { + //obkv-table-client-2.1.0的close方法包含回滚逻辑 + this.load.close(); + } catch (Exception ex) { + throw new SQLException(ex); + } + } else { + throw new IllegalStateException("Rollback transaction failed as connection state is not BEGIN"); + } + } + } + + /** + * Close this connection. + */ + @Override + public void close() { + synchronized (lock) { + // Closed only if state is BEGIN + this.load.close(); + this.state = State.CLOSED; + } + } + + /** + * @return DirectPathPreparedStatement + */ + @Override + public DirectPathPreparedStatement createStatement() throws SQLException { + return this.prepareStatement(null); + } + + /** + * A new batch need create a new {@link DirectPathPreparedStatement }. + * The {@link DirectPathPreparedStatement } can not be reuse, otherwise it may cause duplicate records. + * + * @return DirectPathStatement + */ + @Override + public DirectPathPreparedStatement prepareStatement(String sql) throws SQLException { + if (state == State.BEGIN) { + Integer bufferSize = configuration.getInt(DirectPathConstants.BUFFERSIZE, DEFAULT_BUFFERSIZE); + log.info("The current bufferSize size is{}", bufferSize); + return new DirectPathPreparedStatement(this, bufferSize); + } else { + throw new IllegalStateException("Create statement failed as connection state is not BEGIN"); + } + } + + /** + * Return the schema name of this connection instance. + * + * @return String + */ + @Override + public String getSchema() { + if (state == State.BEGIN) { + return this.load.getTable().getDatabase(); + } else { + throw new IllegalStateException("Get schema failed as connection state is not BEGIN"); + } + } + + /** + * Return the table name of this connection instance. + * + * @return String + */ + public String getTableName() { + if (state == State.BEGIN) { + return this.load.getTableName(); + } else { + throw new IllegalStateException("Get table failed as connection state is not BEGIN"); + } + } + + /** + * Return whether this connection is closed. + * + * @return boolean + */ + @Override + public boolean isClosed() { + synchronized (lock) { + return this.state == State.CLOSED; + } + } + + public boolean isFinished() { + return this.state.equals(State.FINISHED); + } + + /** + * Insert bucket data into buffer. + * + * @param bucket + * @return int[] + * @throws SQLException + */ + int[] insert(ObDirectLoadBucket bucket) throws SQLException { + try { + this.load.write(bucket); + int[] result = new int[bucket.getRowNum()]; + Arrays.fill(result, 1); + return result; + } catch (Exception ex) { + throw new SQLException(ex); + } + } + + /** + * Indicates the state of {@link DirectPathConnection } + */ + enum State { + + /** + * Begin transaction + */ + BEGIN, + /** + * Transaction is finished, ready to close. + */ + FINISHED, + + /** + * Transaction is closed. + */ + CLOSED; + } + + /** + * This builder used to build a new {@link DirectPathConnection } + */ + public static class Builder { + + private String host; + private int port; + + private String user; + private String tenant; + private String password; + + private String schema; + private String table; + + /** + * Client job count. + */ + private int blocks = OB_DIRECT_PATH_DEFAULT_BLOCKS; + + /** + * Server threads used to sort. + */ + private int parallel; + + private long maxErrorCount; + + private ObLoadDupActionType duplicateKeyAction; + + // Used for load data + private long serverTimeout; + + private Configuration configuration; + + public Builder host(String host) { + this.host = host; + return this; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder user(String user) { + this.user = user; + return this; + } + + public Builder tenant(String tenant) { + this.tenant = tenant; + return this; + } + + public Builder password(String password) { + this.password = password; + return this; + } + + public Builder schema(String schema) { + this.schema = schema; + return this; + } + + public Builder table(String table) { + this.table = table; + return this; + } + + public Builder blocks(int blocks) { + this.blocks = blocks; + return this; + } + + public Builder parallel(int parallel) { + this.parallel = parallel; + return this; + } + + public Builder maxErrorCount(long maxErrorCount) { + this.maxErrorCount = maxErrorCount; + return this; + } + + public Builder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) { + this.duplicateKeyAction = duplicateKeyAction; + return this; + } + + public Builder serverTimeout(long serverTimeout) { + this.serverTimeout = serverTimeout; + return this; + } + + public Builder configuration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + /** + * Build a new {@link DirectPathConnection } + * + * @return DirectPathConnection + */ + public DirectPathConnection build() throws Exception { + return createConnection(host, port, user, tenant, password, schema, table, // + blocks, parallel, maxErrorCount, duplicateKeyAction, serverTimeout, duplicateKeyAction).begin(); + } + + /** + * Create a new {@link DirectPathConnection } + * + * @param host + * @param port + * @param user + * @param tenant + * @param password + * @param schema + * @param table + * @param parallel + * @param maxErrorCount + * @param action + * @param serverTimeout + * @return DirectPathConnection + * @throws Exception + */ + DirectPathConnection createConnection(String host, int port, String user, String tenant, String password, String schema, String table, // + int blocks, int parallel, long maxErrorCount, ObLoadDupActionType action, long serverTimeout, ObLoadDupActionType obLoadDupActionType) throws Exception { + + checkArgument(StringUtils.isNotBlank(host), "Host is null.(host=%s)", host); + checkArgument((port > 0 && port < 65535), "Port is invalid.(port=%s)", port); + checkArgument(StringUtils.isNotBlank(user), "User Name is null.(user=%s)", user); + checkArgument(StringUtils.isNotBlank(tenant), "Tenant Name is null.(tenant=%s)", tenant); + checkArgument(StringUtils.isNotBlank(schema), "Schema Name is null.(schema=%s)", schema); + checkArgument(StringUtils.isNotBlank(table), "Table Name is null.(table=%s)", table); + + checkArgument(blocks > 0, "Client Blocks is invalid.(blocks=%s)", blocks); + checkArgument(parallel > 0, "Server Parallel is invalid.(parallel=%s)", parallel); + checkArgument(maxErrorCount > -1, "MaxErrorCount is invalid.(maxErrorCount=%s)", maxErrorCount); + checkArgument(action != null, "ObLoadDupActionType is null.(obLoadDupActionType=%s)", action); + checkArgument(serverTimeout > 0, "Server timeout is invalid.(timeout=%s)", serverTimeout); + Long heartBeatTimeout = 0L; + Long heartBeatInterval = 0L; + if (configuration != null) { + heartBeatTimeout = configuration.getLong(DirectPathConstants.HEART_BEAT_TIMEOUT, OB_DIRECT_PATH_HEART_BEAT_TIMEOUT); + heartBeatInterval = configuration.getLong(DirectPathConstants.HEART_BEAT_INTERVAL, OB_DIRECT_PATH_HEART_BEAT_INTERVAL); + parallel = configuration.getInt(DirectPathConstants.PARALLEL, parallel); + } + DirectLoaderBuilder builder = new DirectLoaderBuilder() + .host(host).port(port) + .user(user) + .tenant(tenant) + .password(password) + .schema(schema) + .table(table) + .parallel(parallel) + .maxErrorCount(maxErrorCount) + .timeout(serverTimeout) + .duplicateKeyAction(obLoadDupActionType) + .heartBeatTimeout(heartBeatTimeout) + .heartBeatInterval(heartBeatInterval); + ObTableDirectLoad directLoad = builder.build(); + + return new DirectPathConnection(directLoad, blocks, configuration); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConstants.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConstants.java new file mode 100644 index 00000000..d32c966c --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathConstants.java @@ -0,0 +1,12 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +public class DirectPathConstants { + // 以下常量已在DirectPathConnection中被正确使用 + public static final String HEART_BEAT_TIMEOUT = "heartBeatTimeout"; + + public static final String HEART_BEAT_INTERVAL = "heartBeatInterval"; + + public static final String PARALLEL = "parallel"; + + public static final String BUFFERSIZE = "bufferSize"; +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathPreparedStatement.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathPreparedStatement.java new file mode 100644 index 00000000..8f4aa7cf --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/DirectPathPreparedStatement.java @@ -0,0 +1,164 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket; +import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; + +import static com.google.common.base.Preconditions.checkArgument; + +public class DirectPathPreparedStatement extends AbstractRestrictedPreparedStatement { + + private ObDirectLoadBucket bucket; + private final DirectPathConnection conn; + private final Map parameters; + private final Integer bufferSize; + private static final int DEFAULT_BUFFERSIZE = 1048576; + public static final int[] EMPTY_ARRAY = new int[0]; + + /** + * Construct a new {@link DirectPathConnection } instance. + * + * @param conn + */ + public DirectPathPreparedStatement(DirectPathConnection conn) { + this.conn = conn; + this.parameters = new HashMap<>(); + this.bufferSize = DEFAULT_BUFFERSIZE; + this.bucket = new ObDirectLoadBucket(); + } + + public DirectPathPreparedStatement(DirectPathConnection conn, Integer bufferSize) { + this.conn = conn; + this.parameters = new HashMap<>(); + this.bufferSize = bufferSize; + this.bucket = new ObDirectLoadBucket(bufferSize); + } + + /** + * Return current direct path connection. + * + * @return DirectPathConnection + * @throws SQLException + */ + @Override + public DirectPathConnection getConnection() throws SQLException { + return this.conn; + } + + /** + * Copy a new row data avoid overwrite. + * + * @throws SQLException + */ + @Override + public void addBatch() throws SQLException { + checkRange(); + ObObj[] objObjArray = new ObObj[parameters.size()]; + for (Map.Entry entry : parameters.entrySet()) { + objObjArray[entry.getKey() - 1] = entry.getValue(); + } + this.addBatch(objObjArray); + } + + /** + * Add a new row into buffer with input original value list. + * + * @param values One original row data. + */ + public void addBatch(List values) { + this.addBatch(createObObjArray(values)); + } + + /** + * Add a new row into buffer with input original value array. + * + * @param values One original row data. + */ + public void addBatch(Object[] values) { + this.addBatch(createObObjArray(values)); + } + + /** + * Add a new row into buffer with input ObObj array. + * + * @param arr One row data described as ObObj. + */ + private void addBatch(ObObj[] arr) { + checkArgument(arr != null && arr.length > 0, "Input values is null"); + try { + this.bucket.addRow(arr); + } catch (ObDirectLoadException e) { + throw new RuntimeException(e); + } + } + + /** + * Buffered the row data in memory. (defined in the bucket) + * You must invoke {@code ObDirectLoadBucket.clearBatch } after executeBatch. + * + * @return int[] + * @throws SQLException + */ + @Override + public int[] executeBatch() throws SQLException { + return this.bucket.isEmpty() ? EMPTY_ARRAY : this.conn.insert(bucket); + } + + /** + * Clear batch is always recreate a new {@link ObDirectLoadBucket} + */ + @Override + public void clearBatch() { + this.parameters.clear(); + this.bucket = new ObDirectLoadBucket(bufferSize); + } + + /** + * Clear the holder parameters. + * + * @throws SQLException + */ + @Override + public void clearParameters() throws SQLException { + this.parameters.clear(); + } + + /** + * @return boolean + */ + @Override + public boolean isOracleMode() { + return false; + } + + /** + * Set parameter to the target position. + * + * @param parameterIndex Start From 1 + * @param obObj Convert original value to {@link ObObj } + * @throws SQLException + */ + @Override + protected void setParameter(int parameterIndex, ObObj obObj) throws SQLException { + checkArgument(parameterIndex > 0, "Parameter index should start from 1"); + this.parameters.put(parameterIndex, obObj); + } + + /** + * Avoid range exception: + *

+ * Map.put(1, "abc"); + * Map.put(5, "def"); // Error: parameter index is 5, but 2 values exists. + */ + private void checkRange() { + OptionalInt optionalInt = parameters.keySet().stream().mapToInt(e -> e).max(); + int parameterIndex = optionalInt.orElseThrow(() -> new IllegalArgumentException("No parameter index found")); + checkArgument(parameterIndex == parameters.size(), "Parameter index(%s) is unmatched with value list(%s)", parameterIndex, parameters.size()); + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/ObTableDirectLoad.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/ObTableDirectLoad.java new file mode 100644 index 00000000..859fcedd --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/directPath/ObTableDirectLoad.java @@ -0,0 +1,154 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath; + +import java.sql.SQLException; +import java.util.Objects; + +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement; +import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId; +import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException; +import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus; +import com.alipay.oceanbase.rpc.table.ObTable; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper of the direct-load API for OceanBase. + */ +public class ObTableDirectLoad implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ObTableDirectLoad.class); + + private final String tableName; + private final String schemaTableName; + private final ObDirectLoadStatement statement; + private final ObDirectLoadConnection connection; + + public ObTableDirectLoad(String schemaName, String tableName, ObDirectLoadStatement statement, ObDirectLoadConnection connection) { + Objects.requireNonNull(schemaName, "schemaName must not be null"); + Objects.requireNonNull(tableName, "tableName must not be null"); + Objects.requireNonNull(statement, "statement must not be null"); + Objects.requireNonNull(connection, "connection must not be null"); + this.tableName = tableName; + this.schemaTableName = String.format("%s.%s", schemaName, tableName); + this.statement = statement; + this.connection = connection; + } + + /** + * Begin the direct load operation. + * + * @throws ObDirectLoadException if an error occurs during the operation. + */ + public void begin() throws ObDirectLoadException { + statement.begin(); + } + + /** + * Write data into the direct load operation. + * + * @param bucket The data bucket to write. + * @throws SQLException if writing fails. + */ + public void write(ObDirectLoadBucket bucket) throws SQLException { + try { + if (bucket == null || bucket.isEmpty()) { + throw new IllegalArgumentException("Bucket must not be null or empty."); + } + LOG.info("Writing {} rows to table: {}", bucket.getRowNum(), schemaTableName); + statement.write(bucket); + LOG.info("Successfully wrote bucket data to table: {}", schemaTableName); + } catch (ObDirectLoadException e) { + LOG.error("Failed to write to table: {}", schemaTableName, e); + throw new SQLException(String.format("Failed to write to table: %s", schemaTableName), e); + } + } + + /** + * Commit the current direct load operation. + * + * @throws SQLException if commit fails. + */ + public void commit() throws SQLException { + try { + LOG.info("Committing direct load for table: {}", schemaTableName); + statement.commit(); + LOG.info("Successfully committed direct load for table: {}", schemaTableName); + } catch (ObDirectLoadException e) { + LOG.error("Failed to commit for table: {}", schemaTableName, e); + throw new SQLException(String.format("Failed to commit for table: %s", schemaTableName), e); + } + } + + /** + * Close the direct load operation. + */ + public void close() { + LOG.info("Closing direct load for table: {}", schemaTableName); + statement.close(); + connection.close(); + LOG.info("Direct load closed for table: {}", schemaTableName); + } + + /** + * Gets the status from the current connection based on the traceId + */ + public ObTableLoadClientStatus getStatus() throws SQLException { + ObDirectLoadTraceId traceId = statement.getTraceId(); + // Check if traceId is null and throw an exception with a clear message + if (traceId == null) { + throw new SQLException("traceId is null."); + } + // Retrieve the status using the traceId + ObTableLoadClientStatus status = statement.getConnection().getProtocol().getHeartBeatRpc(traceId).getStatus(); + if (status == null) { + LOG.info("Direct load connect protocol heartBeatRpc for table is null: {}", schemaTableName); + throw new SQLException("status is null."); + } + // Return status if not null; otherwise, return ERROR + return status; + } + + /** + * Gets the current table + */ + public ObTable getTable() { + try { + return this.statement.getObTablePool().getControlObTable(); + } catch (ObDirectLoadException e) { + throw new RuntimeException(e); + } + } + + public String getTableName() { + if (StringUtils.isBlank(tableName)) { + throw new IllegalArgumentException("tableName is blank."); + } + return tableName; + } + + /** + * Inserts data into the direct load operation. + * + * @param bucket The data bucket containing rows to insert. + * @throws SQLException if an error occurs during the insert operation. + */ + public void insert(ObDirectLoadBucket bucket) throws SQLException { + LOG.info("Inserting {} rows to table: {}", bucket.getRowNum(), schemaTableName); + + if (bucket.isEmpty()) { + LOG.warn("Parameter 'bucket' must not be empty."); + throw new IllegalArgumentException("Parameter 'bucket' must not be empty."); + } + + try { + // Perform the insertion into the load operation + statement.write(bucket); + LOG.info("Successfully inserted data into table: {}", schemaTableName); + } catch (Exception ex) { + LOG.error("Unexpected error during insert operation for table: {}", schemaTableName, ex); + throw new SQLException("Unexpected error during insert operation.", ex); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java index c8630cd0..6e4d4aab 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java @@ -14,6 +14,17 @@ public abstract class AbstractConnHolder { protected final Configuration config; protected Connection conn; + protected String jdbcUrl; + protected String userName; + protected String password; + + protected AbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) { + this.config = config; + this.jdbcUrl = jdbcUrl; + this.userName = userName; + this.password = password; + } + public AbstractConnHolder(Configuration config) { this.config = config; } @@ -45,4 +56,6 @@ public abstract class AbstractConnHolder { public abstract String getUserName(); public abstract void destroy(); + + public abstract void doCommit(); } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathAbstractConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathAbstractConnHolder.java new file mode 100644 index 00000000..c5c6dbe0 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathAbstractConnHolder.java @@ -0,0 +1,61 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class DirectPathAbstractConnHolder { + private static final Logger LOG = LoggerFactory.getLogger(AbstractConnHolder.class); + protected Configuration config; + protected String jdbcUrl; + protected String userName; + protected String password; + + protected Connection conn; + + protected DirectPathAbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) { + this.config = config; + this.jdbcUrl = jdbcUrl; + this.userName = userName; + this.password = password; + } + + public Connection reconnect() { + DBUtil.closeDBResources(null, conn); + return initConnection(); + } + + public Connection getConn() { + if (conn == null) { + return initConnection(); + } else { + try { + if (conn.isClosed()) { + return reconnect(); + } + return conn; + } catch (Exception e) { + LOG.debug("can not judge whether the hold connection is closed or not, just reuse the hold connection"); + return conn; + } + } + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public Configuration getConfig() { + return config; + } + + public void doCommit() {} + + public abstract void destroy(); + + public abstract Connection initConnection(); +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathConnHolder.java new file mode 100644 index 00000000..352eda1c --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DirectPathConnHolder.java @@ -0,0 +1,115 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection; + +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DirectPathConnHolder extends AbstractConnHolder { + private static final Logger LOG = LoggerFactory.getLogger(DirectPathConnHolder.class); + + /** + * The server side timeout. + */ + private static final long SERVER_TIMEOUT = 24L * 60 * 60 * 1000 * 1000; + + private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + private String tableName; + private String host; + private int rpcPort; + private String tenantName; + private String databaseName; + private int blocks; + private int threads; + private int maxErrors; + private ObLoadDupActionType duplicateKeyAction; + + public DirectPathConnHolder(Configuration config, ServerConnectInfo connectInfo, String tableName, int threadsPerChannel) { + super(config, connectInfo.jdbcUrl, connectInfo.userName, connectInfo.password); + // direct path: + //● publicCloud & odp - single or full + //● publicCloud & observer - not support + //● !publicCloud & odp - full + //● !publicCloud & observer - single + this.userName = connectInfo.getFullUserName(); + this.host = connectInfo.host; + this.rpcPort = connectInfo.rpcPort; + this.tenantName = connectInfo.tenantName; + if (!connectInfo.publicCloud && StringUtils.isEmpty(tenantName)) { + throw new IllegalStateException("tenant name is needed when using direct path load in private cloud."); + } + this.databaseName = connectInfo.databaseName; + this.tableName = tableName; + this.blocks = config.getInt(Config.BLOCKS_COUNT, 1); + this.threads = threadsPerChannel * Math.min(blocks, 32); + this.maxErrors = config.getInt(Config.MAX_ERRORS, 0); + this.duplicateKeyAction = "insert".equalsIgnoreCase(config.getString(Config.OB_WRITE_MODE)) ? ObLoadDupActionType.IGNORE : ObLoadDupActionType.REPLACE; + } + + @Override + public Connection initConnection() { + synchronized (cache) { + conn = cache.computeIfAbsent(new Table(databaseName, tableName), e -> { + try { + return new DirectPathConnection.Builder().host(host) // + .port(rpcPort) // + .tenant(tenantName) // + .user(userName) // + .password(Optional.ofNullable(password).orElse("")) // + .schema(databaseName) // + .table(tableName) // + .blocks(blocks) // + .parallel(threads) // + .maxErrorCount(maxErrors) // + .duplicateKeyAction(duplicateKeyAction) // + .serverTimeout(SERVER_TIMEOUT) // + .configuration(config) + .build(); + } catch (Exception ex) { + throw DataXException.asDataXException(DBUtilErrorCode.CONN_DB_ERROR, ex); + } + }); + } + return conn; + } + + public String getJdbcUrl() { + return ""; + } + + public String getUserName() { + return ""; + } + + @Override + public void destroy() { + if (conn != null && ((DirectPathConnection) conn).isFinished()) { + DBUtil.closeDBResources(null, conn); + } + } + + @Override + public void doCommit() { + try { + if (conn != null) { + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java index 262fb1cb..02277fbe 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java @@ -1,42 +1,54 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; import java.sql.Connection; +import java.sql.SQLException; import com.alibaba.datax.common.util.Configuration; /** * wrap oceanbase java client + * * @author oceanbase */ public class OCJConnHolder extends AbstractConnHolder { - private ServerConnectInfo connectInfo; - private String dataSourceKey; + private ServerConnectInfo connectInfo; + private String dataSourceKey; - public OCJConnHolder (Configuration config, ServerConnectInfo connInfo) { - super(config); - this.connectInfo = connInfo; - this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName); - OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName); - } + public OCJConnHolder(Configuration config, ServerConnectInfo connInfo) { + super(config); + this.connectInfo = connInfo; + this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName); + OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName); + } - @Override - public Connection initConnection() { - conn = OBDataSourceV10.getConnection(dataSourceKey); - return conn; - } + @Override + public Connection initConnection() { + conn = OBDataSourceV10.getConnection(dataSourceKey); + return conn; + } - @Override - public String getJdbcUrl() { - return connectInfo.jdbcUrl; - } + @Override + public String getJdbcUrl() { + return connectInfo.jdbcUrl; + } - @Override - public String getUserName() { - return connectInfo.userName; - } - - public void destroy() { - OBDataSourceV10.destory(this.dataSourceKey); - } + @Override + public String getUserName() { + return connectInfo.userName; + } + + public void destroy() { + OBDataSourceV10.destory(this.dataSourceKey); + } + + public void doCommit() { + try { + if (conn != null) { + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java index ac75d359..c0e885c2 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; import java.sql.Connection; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -14,50 +15,60 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; * 数据库连接代理对象,负责创建连接,重新连接 * * @author oceanbase - * */ public class ObClientConnHolder extends AbstractConnHolder { - private final String jdbcUrl; - private final String userName; - private final String password; + private final String jdbcUrl; + private final String userName; + private final String password; - public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) { - super(config); - this.jdbcUrl = jdbcUrl; - this.userName = userName; - this.password = password; - } + public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) { + super(config); + this.jdbcUrl = jdbcUrl; + this.userName = userName; + this.password = password; + } - // Connect to ob with obclient and obproxy - @Override - public Connection initConnection() { - String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl); - DataBaseType dbType = DataBaseType.OceanBase; - if (ObWriterUtils.isOracleMode()) { - // set up for writing timestamp columns - List sessionConfig = config.getList(Key.SESSION, new ArrayList(), String.class); - sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"); - sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'"); - sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'"); - config.set(Key.SESSION, sessionConfig); - } - conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password); - DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE); - return conn; - } + // Connect to ob with obclient and obproxy + @Override + public Connection initConnection() { + String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl); + DataBaseType dbType = DataBaseType.OceanBase; + if (ObWriterUtils.isOracleMode()) { + // set up for writing timestamp columns + List sessionConfig = config.getList(Key.SESSION, new ArrayList(), String.class); + sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"); + sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'"); + sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'"); + config.set(Key.SESSION, sessionConfig); + } + conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password); + DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE); + return conn; + } - @Override - public String getJdbcUrl() { - return jdbcUrl; - } + @Override + public String getJdbcUrl() { + return jdbcUrl; + } - @Override - public String getUserName() { - return userName; - } + @Override + public String getUserName() { + return userName; + } - @Override - public void destroy() { - DBUtil.closeDBResources(null, conn); - } + @Override + public void destroy() { + DBUtil.closeDBResources(null, conn); + } + + @Override + public void doCommit() { + try { + if (conn != null) { + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java index e7489dda..2bcc8541 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java @@ -5,95 +5,112 @@ import static org.apache.commons.lang3.StringUtils.EMPTY; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.alibaba.datax.common.util.Configuration; + public class ServerConnectInfo { - - public String clusterName; - public String tenantName; - public String userName; - public String password; - public String databaseName; - public String ipPort; - public String jdbcUrl; - public boolean publicCloud; - /** - * - * @param jdbcUrl format is jdbc:oceanbase//ip:port - * @param username format is cluster:tenant:username or username@tenant#cluster or user@tenant or user - * @param password - */ - public ServerConnectInfo(final String jdbcUrl, final String username, final String password) { - this.jdbcUrl = jdbcUrl; - this.password = password; - parseJdbcUrl(jdbcUrl); - parseFullUserName(username); - } + public String clusterName; + public String tenantName; + // userName doesn't contain tenantName or clusterName + public String userName; + public String password; + public String databaseName; + public String ipPort; + public String jdbcUrl; + public String host; + public String port; + public boolean publicCloud; + public int rpcPort; + public Configuration config; - private void parseJdbcUrl(final String jdbcUrl) { - Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w-]+)\\?"); - Matcher matcher = pattern.matcher(jdbcUrl); - if (matcher.find()) { - String ipPort = matcher.group(1); - String dbName = matcher.group(2); - this.ipPort = ipPort; - this.databaseName = dbName; - this.publicCloud = ipPort.split(":")[0].endsWith("aliyuncs.com"); - } else { - throw new RuntimeException("Invalid argument:" + jdbcUrl); - } - } + public ServerConnectInfo(final String jdbcUrl, final String username, final String password, Configuration config) { + this.jdbcUrl = jdbcUrl; + this.password = password; + this.config = config; + parseJdbcUrl(jdbcUrl); + parseFullUserName(username); + } - private void parseFullUserName(final String fullUserName) { - int tenantIndex = fullUserName.indexOf("@"); - int clusterIndex = fullUserName.indexOf("#"); - if (fullUserName.contains(":") && tenantIndex < 0) { - String[] names = fullUserName.split(":"); - if (names.length != 3) { - throw new RuntimeException("invalid argument: " + fullUserName); - } else { - this.clusterName = names[0]; - this.tenantName = names[1]; - this.userName = names[2]; - } - } else if (!publicCloud || tenantIndex < 0) { - this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex); - this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1); - // Avoid reporting errors when users do not write # - this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex < 0 ? fullUserName.length() : clusterIndex); - } else { - // If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created. - this.userName = fullUserName.substring(0, tenantIndex); - if (clusterIndex > tenantIndex) { - this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex); - this.clusterName = fullUserName.substring(clusterIndex + 1); - } else { - this.tenantName = fullUserName.substring(tenantIndex + 1); - this.clusterName = EMPTY; - } - } - } + private void parseJdbcUrl(final String jdbcUrl) { + Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w]+)\\?"); + Matcher matcher = pattern.matcher(jdbcUrl); + if (matcher.find()) { + String ipPort = matcher.group(1); + String dbName = matcher.group(2); + this.ipPort = ipPort; + this.host = ipPort.split(":")[0]; + this.port = ipPort.split(":")[1]; + this.databaseName = dbName; + this.publicCloud = host.endsWith("aliyuncs.com"); + } else { + throw new RuntimeException("Invalid argument:" + jdbcUrl); + } + } - @Override - public String toString() { - StringBuffer strBuffer = new StringBuffer(); - return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName) - .append(", userName:").append(userName).append(", databaseName:").append(databaseName) - .append(", ipPort:").append(ipPort).append(", jdbcUrl:").append(jdbcUrl).toString(); - } + protected void parseFullUserName(final String fullUserName) { + int tenantIndex = fullUserName.indexOf("@"); + int clusterIndex = fullUserName.indexOf("#"); + // 适用于jdbcUrl以||_dsc_ob10_dsc_开头的场景 + if (fullUserName.contains(":") && tenantIndex < 0) { + String[] names = fullUserName.split(":"); + if (names.length != 3) { + throw new RuntimeException("invalid argument: " + fullUserName); + } else { + this.clusterName = names[0]; + this.tenantName = names[1]; + this.userName = names[2]; + } + } else if (tenantIndex < 0) { + // 适用于short jdbcUrl,且username中不含租户名(主要是公有云场景,此场景下不计算分区) + this.userName = fullUserName; + this.clusterName = EMPTY; + this.tenantName = EMPTY; + } else { + // 适用于short jdbcUrl,且username中含租户名 + this.userName = fullUserName.substring(0, tenantIndex); + if (clusterIndex < 0) { + this.clusterName = EMPTY; + this.tenantName = fullUserName.substring(tenantIndex + 1); + } else { + this.clusterName = fullUserName.substring(clusterIndex + 1); + this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex); + } + } + } - public String getFullUserName() { - StringBuilder builder = new StringBuilder(); - builder.append(userName); - if (!EMPTY.equals(tenantName)) { - builder.append("@").append(tenantName); - } + @Override + public String toString() { + return "ServerConnectInfo{" + + "clusterName='" + clusterName + '\'' + + ", tenantName='" + tenantName + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + ", databaseName='" + databaseName + '\'' + + ", ipPort='" + ipPort + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", host='" + host + '\'' + + ", publicCloud=" + publicCloud + + ", rpcPort=" + rpcPort + + '}'; + } - if (!EMPTY.equals(clusterName)) { - builder.append("#").append(clusterName); - } - if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) { - return this.userName; - } - return builder.toString(); - } + public String getFullUserName() { + StringBuilder builder = new StringBuilder(); + builder.append(userName); + if (!EMPTY.equals(tenantName)) { + builder.append("@").append(tenantName); + } + + if (!EMPTY.equals(clusterName)) { + builder.append("#").append(clusterName); + } + if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) { + return this.userName; + } + return builder.toString(); + } + + public void setRpcPort(int rpcPort) { + this.rpcPort = rpcPort; + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/AbstractInsertTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/AbstractInsertTask.java new file mode 100644 index 00000000..d4f215e1 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/AbstractInsertTask.java @@ -0,0 +1,127 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractInsertTask implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractInsertTask.class); + protected final long taskId; + protected ConcurrentTableWriterTask writerTask; + protected ConcurrentTableWriterTask.ConcurrentTableWriter writer; + protected Queue> queue; + protected boolean isStop; + protected Configuration config; + protected ServerConnectInfo connInfo; + protected AbstractConnHolder connHolder; + protected long totalCost = 0; + protected long insertCount = 0; + private boolean printCost = Config.DEFAULT_PRINT_COST; + private long costBound = Config.DEFAULT_COST_BOUND; + + public AbstractInsertTask(final long taskId, Queue> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) { + this.taskId = taskId; + this.queue = recordsQueue; + this.config = config; + this.connInfo = connectInfo; + this.isStop = false; + this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST); + this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND); + this.writer = writer; + this.writerTask = task; + initConnHolder(); + } + + public AbstractInsertTask(final long taskId, Queue> recordsQueue, Configuration config, ServerConnectInfo connectInfo) { + this.taskId = taskId; + this.queue = recordsQueue; + this.config = config; + this.connInfo = connectInfo; + this.isStop = false; + this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST); + this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND); + initConnHolder(); + } + + protected abstract void initConnHolder(); + + public void setWriterTask(ConcurrentTableWriterTask writerTask) { + this.writerTask = writerTask; + } + + public void setWriter(ConcurrentTableWriterTask.ConcurrentTableWriter writer) { + this.writer = writer; + } + + private boolean isStop() { + return isStop; + } + + public void setStop() { + isStop = true; + } + + public AbstractConnHolder getConnHolder() { + return connHolder; + } + + public void calStatistic(final long cost) { + writer.increFinishCount(); + insertCount++; + totalCost += cost; + if (this.printCost && cost > this.costBound) { + LOG.info("slow multi insert cost {}ms", cost); + } + } + + @Override + public void run() { + Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId())); + LOG.debug("Task {} start to execute...", taskId); + while (!isStop()) { + try { + List records = queue.poll(); + if (null != records) { + write(records); + } else if (writerTask.isFinished()) { + writerTask.singalTaskFinish(); + LOG.debug("not more task, thread exist ..."); + break; + } else { + TimeUnit.MILLISECONDS.sleep(5); + } + } catch (InterruptedException e) { + LOG.debug("TableWriter is interrupt"); + } catch (Exception e) { + LOG.warn("ERROR UNEXPECTED ", e); + break; + } + } + LOG.debug("Thread exist..."); + } + + protected abstract void write(List records); + + public long getTotalCost() { + return totalCost; + } + + public long getInsertCount() { + return insertCount; + } + + public void destroy() { + if (connHolder != null) { + connHolder.destroy(); + } + } +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index 8867f3e2..6ab6753a 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -18,7 +18,9 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.IObPartCalculator import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV1; import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV2; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; + import com.oceanbase.partition.calculator.enums.ObServerMode; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -34,8 +36,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static com.alibaba.datax.plugin.writer.oceanbasev10writer.Config.DEFAULT_SLOW_MEMSTORE_THRESHOLD; import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.FAST; import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.PAUSE; @@ -44,15 +48,15 @@ import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUt public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class); - // memstore_total 与 memstore_limit 比例的阈值,一旦超过这个值,则暂停写入 - private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD; - // memstore检查的间隔 - private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND; - // 最后一次检查 - private long lastCheckMemstoreTime; + // memstore_total 与 memstore_limit 比例的阈值,一旦超过这个值,则暂停写入 + private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD; + // memstore检查的间隔 + private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND; + // 最后一次检查 + private long lastCheckMemstoreTime; + + private volatile ObWriterUtils.LoadMode loadMode = FAST; - private volatile ObWriterUtils.LoadMode loadMode = FAST; - private static AtomicLong totalTask = new AtomicLong(0); private long taskId = -1; private AtomicBoolean isMemStoreFull = new AtomicBoolean(false); @@ -69,38 +73,41 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private String obUpdateColumns = null; private String dbName; private int calPartFailedCount = 0; + private boolean directPath; - public ConcurrentTableWriterTask(DataBaseType dataBaseType) { - super(dataBaseType); - taskId = totalTask.getAndIncrement(); - } + public ConcurrentTableWriterTask(DataBaseType dataBaseType) { + super(dataBaseType); + taskId = totalTask.getAndIncrement(); + } - @Override - public void init(Configuration config) { - super.init(config); - // OceanBase 所有操作都是 insert into on duplicate key update 模式 - // writeMode应该使用enum来定义 - this.writeMode = "update"; + @Override + public void init(Configuration config) { + super.init(config); + this.directPath = config.getBool(Config.DIRECT_PATH, false); + // OceanBase 所有操作都是 insert into on duplicate key update 模式 + // writeMode应该使用enum来定义 + this.writeMode = "update"; obWriteMode = config.getString(Config.OB_WRITE_MODE, "update"); - ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password); - dbName = connectInfo.databaseName; - //init check memstore - this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD); - this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND, - Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND); + ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password, config); + connectInfo.setRpcPort(config.getInt(Config.RPC_PORT, 0)); + dbName = connectInfo.databaseName; + //init check memstore + this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD); + this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND, + Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND); - this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, - connectInfo.getFullUserName(), connectInfo.password); - this.isOracleCompatibleMode = ObWriterUtils.isOracleMode(); - if (isOracleCompatibleMode) { - connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); - //在转义的情况下不翻译 - if (!(table.startsWith("\"") && table.endsWith("\""))) { - table = table.toUpperCase(); - } + this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, + connectInfo.getFullUserName(), connectInfo.password); + this.isOracleCompatibleMode = ObWriterUtils.isOracleMode(); + if (isOracleCompatibleMode) { + connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); + //在转义的情况下不翻译 + if (!(table.startsWith("\"") && table.endsWith("\""))) { + table = table.toUpperCase(); + } - LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", - connectInfo.databaseName, table)); + LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s", + connectInfo.databaseName, table)); } if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) { @@ -135,37 +142,37 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { return new ObPartitionCalculatorV1(connectInfo, table, columns); } - public boolean isFinished() { - return allTaskInQueue && concurrentWriter.checkFinish(); - } - - public boolean allTaskInQueue() { - return allTaskInQueue; - } - - public void setPutAllTaskInQueue() { - this.allTaskInQueue = true; - LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}", - concurrentWriter.getTaskQueueSize(), - concurrentWriter.getTotalTaskCount(), - concurrentWriter.getFinishTaskCount()); - } - - private void rewriteSql() { - Connection conn = connHolder.initConnection(); - if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) { - // change obWriteMode to insert so the insert statement will be generated. - obWriteMode = "insert"; - } - this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns); - LOG.info("writeRecordSql :{}", this.writeRecordSql); - } + public boolean isFinished() { + return allTaskInQueue && concurrentWriter.checkFinish(); + } + + public boolean allTaskInQueue() { + return allTaskInQueue; + } + + public void setPutAllTaskInQueue() { + this.allTaskInQueue = true; + LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}", + concurrentWriter.getTaskQueueSize(), + concurrentWriter.getTotalTaskCount(), + concurrentWriter.getFinishTaskCount()); + } + + private void rewriteSql() { + Connection conn = connHolder.initConnection(); + if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) { + // change obWriteMode to insert so the insert statement will be generated. + obWriteMode = "insert"; + } + this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns); + LOG.info("writeRecordSql :{}", this.writeRecordSql); + } @Override - public void prepare(Configuration writerSliceConfig) { - super.prepare(writerSliceConfig); - concurrentWriter.start(); - } + public void prepare(Configuration writerSliceConfig) { + super.prepare(writerSliceConfig); + concurrentWriter.start(); + } @Override public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { @@ -175,25 +182,25 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { int retryTimes = 0; boolean needRetry = false; do { - try { - if (retryTimes > 0) { - TimeUnit.SECONDS.sleep((1 << retryTimes)); - DBUtil.closeDBResources(null, connection); - connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password); - LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes); - } - ColumnMetaCache.init(connection, this.table, this.columns); - this.resultSetMetaData = ColumnMetaCache.getColumnMeta(); - needRetry = false; - } catch (SQLException e) { - needRetry = true; - ++retryTimes; - e.printStackTrace(); - LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes); - } catch (InterruptedException e) { - LOG.warn("startWriteWithConnection interrupt, ignored"); - } finally { - } + try { + if (retryTimes > 0) { + TimeUnit.SECONDS.sleep((1 << retryTimes)); + DBUtil.closeDBResources(null, connection); + connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password); + LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes); + } + ColumnMetaCache.init(connection, this.table, this.columns); + this.resultSetMetaData = ColumnMetaCache.getColumnMeta(); + needRetry = false; + } catch (SQLException e) { + needRetry = true; + ++retryTimes; + e.printStackTrace(); + LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes); + } catch (InterruptedException e) { + LOG.warn("startWriteWithConnection interrupt, ignored"); + } finally { + } } while (needRetry && retryTimes < 100); try { @@ -202,8 +209,8 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { while ((record = recordReceiver.getFromReader()) != null) { if (record.getColumnNumber() != this.columnNumber) { // 源头读取字段列数与目的表字段写入列数不相等,直接报错 - LOG.error("column not equal {} != {}, record = {}", - this.columnNumber, record.getColumnNumber(), record.toString()); + LOG.error("column not equal {} != {}, record = {}", + this.columnNumber, record.getColumnNumber(), record.toString()); throw DataXException .asDataXException( DBUtilErrorCode.CONF_ERROR, @@ -223,388 +230,406 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } } - public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record) - throws SQLException { - return fillPreparedStatement(preparedStatement, record); - } + public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record) + throws SQLException { + return fillPreparedStatement(preparedStatement, record); + } - private void addLeftRecords() { - //不需要刷新Cache,已经是最后一批数据了 - for (List groupValues : groupInsertValues.values()) { - if (groupValues.size() > 0 ) { - addRecordsToWriteQueue(groupValues); - } - } - } - - private void addRecordToCache(final Record record) { - Long partId =null; - try { - partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record); - } catch (Exception e1) { - if (calPartFailedCount++ < 10) { - LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); - } - } + private void addLeftRecords() { + //不需要刷新Cache,已经是最后一批数据了 + for (List groupValues : groupInsertValues.values()) { + if (groupValues.size() > 0) { + addRecordsToWriteQueue(groupValues); + } + } + } + + private void addRecordToCache(final Record record) { + Long partId = null; + try { + partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record); + } catch (Exception e1) { + if (calPartFailedCount++ < 10) { + LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); + } + } if (partId == null) { LOG.debug("fail to calculate parition id, just put into the default buffer."); partId = Long.MAX_VALUE; } - List groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList(batchSize)); - groupValues.add(record); - if (groupValues.size() >= batchSize) { - groupValues = addRecordsToWriteQueue(groupValues); - groupInsertValues.put(partId, groupValues); - } - } + List groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList(batchSize)); + groupValues.add(record); + if (groupValues.size() >= batchSize) { + groupValues = addRecordsToWriteQueue(groupValues); + groupInsertValues.put(partId, groupValues); + } + } - /** - * - * @param records - * @return 返回一个新的Cache用于存储接下来的数据 - */ - private List addRecordsToWriteQueue(List records) { - int i = 0; - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(records); - break; - } catch (InterruptedException e) { - i++; - LOG.info("Concurrent table writer is interrupted"); - } - } - return new ArrayList(batchSize); - } - private void checkMemStore() { - Connection checkConn = connHolder.getConn(); - try { - if (checkConn == null || checkConn.isClosed()) { - checkConn = connHolder.reconnect(); - } - }catch (Exception e) { - LOG.warn("Check connection is unusable"); - } + /** + * @param records + * @return 返回一个新的Cache用于存储接下来的数据 + */ + private List addRecordsToWriteQueue(List records) { + int i = 0; + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(records); + break; + } catch (InterruptedException e) { + i++; + LOG.info("Concurrent table writer is interrupted"); + } + } + return new ArrayList(batchSize); + } - long now = System.currentTimeMillis(); - if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) { - return; - } - double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn); - if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) { - this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW; - LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name()); - }else { - this.loadMode = FAST; - } - lastCheckMemstoreTime = now; - } - - public boolean isMemStoreFull() { - return isMemStoreFull.get(); - } + private void checkMemStore() { + Connection checkConn = connHolder.getConn(); + try { + if (checkConn == null || checkConn.isClosed()) { + checkConn = connHolder.reconnect(); + } + } catch (Exception e) { + LOG.warn("Check connection is unusable"); + } - public boolean isShouldPause() { - return this.loadMode.equals(PAUSE); - } + long now = System.currentTimeMillis(); + if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) { + return; + } + double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn); + if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) { + this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW; + LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name()); + } else { + this.loadMode = FAST; + } + lastCheckMemstoreTime = now; + } - public boolean isShouldSlow() { - return this.loadMode.equals(SLOW); - } - - public void print() { - if (LOG.isDebugEnabled()) { - LOG.debug("Statistic total task {}, finished {}, queue Size {}", - concurrentWriter.getTotalTaskCount(), - concurrentWriter.getFinishTaskCount(), - concurrentWriter.getTaskQueueSize()); - concurrentWriter.printStatistics(); - } - } - - public void waitTaskFinish() { - setPutAllTaskInQueue(); - lock.lock(); - try { - while (!concurrentWriter.checkFinish()) { - condition.await(15, TimeUnit.SECONDS); - print(); - checkMemStore(); - } - } catch (InterruptedException e) { - LOG.warn("Concurrent table writer wait task finish interrupt"); - } finally { - lock.unlock(); - } - LOG.debug("wait all InsertTask finished ..."); - } - - public void singalTaskFinish() { - lock.lock(); - condition.signal(); - lock.unlock(); - } - - @Override - public void destroy(Configuration writerSliceConfig) { - if(concurrentWriter!=null) { - concurrentWriter.destory(); - } - // 把本级持有的conn关闭掉 - DBUtil.closeDBResources(null, connHolder.getConn()); - super.destroy(writerSliceConfig); - } - - public class ConcurrentTableWriter { - private BlockingQueue> queue; - private List insertTasks; - private Configuration config; - private ServerConnectInfo connectInfo; - private String rewriteRecordSql; - private AtomicLong totalTaskCount; - private AtomicLong finishTaskCount; - private final int threadCount; + public boolean isMemStoreFull() { + return isMemStoreFull.get(); + } - public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) { - threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT); - queue = new LinkedBlockingQueue>(threadCount << 1); - insertTasks = new ArrayList(threadCount); - this.config = config; - this.connectInfo = connInfo; - this.rewriteRecordSql = rewriteRecordSql; - this.totalTaskCount = new AtomicLong(0); - this.finishTaskCount = new AtomicLong(0); - } - - public long getTotalTaskCount() { - return totalTaskCount.get(); - } - - public long getFinishTaskCount() { - return finishTaskCount.get(); - } - - public int getTaskQueueSize() { - return queue.size(); - } - - public void increFinishCount() { - finishTaskCount.incrementAndGet(); - } - - //should check after put all the task in the queue - public boolean checkFinish() { - long finishCount = finishTaskCount.get(); - long totalCount = totalTaskCount.get(); - return finishCount == totalCount; - } - - public synchronized void start() { - for (int i = 0; i < threadCount; ++i) { - LOG.info("start {} insert task.", (i+1)); - InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql); - insertTask.setWriterTask(ConcurrentTableWriterTask.this); - insertTask.setWriter(this); - insertTasks.add(insertTask); - } - WriterThreadPool.executeBatch(insertTasks); - } - - public void printStatistics() { - long insertTotalCost = 0; - long insertTotalCount = 0; - for (InsertTask task: insertTasks) { - insertTotalCost += task.getTotalCost(); - insertTotalCount += task.getInsertCount(); - } - long avgCost = 0; - if (insertTotalCount != 0) { - avgCost = insertTotalCost / insertTotalCount; - } - ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms", - insertTotalCount, insertTotalCost, avgCost); - } + public boolean isShouldPause() { + return this.loadMode.equals(PAUSE); + } - public void addBatchRecords(final List records) throws InterruptedException { - boolean isSucc = false; - while (!isSucc) { - isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS); - checkMemStore(); - } - totalTaskCount.incrementAndGet(); - } - - public synchronized void destory() { - if (insertTasks != null) { - for(InsertTask task : insertTasks) { - task.setStop(); - } - for(InsertTask task: insertTasks) { - task.destroy(); - } - } - } - } + public boolean isShouldSlow() { + return this.loadMode.equals(SLOW); + } - // 直接使用了两个类变量:columnNumber,resultSetMetaData - protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record) - throws SQLException { - for (int i = 0; i < this.columnNumber; i++) { - int columnSqltype = this.resultSetMetaData.getMiddle().get(i); - String typeName = this.resultSetMetaData.getRight().get(i); - preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i)); - } + public void print() { + if (LOG.isDebugEnabled()) { + LOG.debug("Statistic total task {}, finished {}, queue Size {}", + concurrentWriter.getTotalTaskCount(), + concurrentWriter.getFinishTaskCount(), + concurrentWriter.getTaskQueueSize()); + concurrentWriter.printStatistics(); + } + } - return preparedStatement; - } + public void waitTaskFinish() { + setPutAllTaskInQueue(); + lock.lock(); + try { + while (!concurrentWriter.checkFinish()) { + condition.await(15, TimeUnit.SECONDS); + print(); + checkMemStore(); + } + concurrentWriter.doCommit(); + } catch (InterruptedException e) { + LOG.warn("Concurrent table writer wait task finish interrupt"); + } finally { + lock.unlock(); + } + LOG.debug("wait all InsertTask finished ..."); + } - protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, - int columnSqltype, String typeName, Column column) throws SQLException { - java.util.Date utilDate; - switch (columnSqltype) { - case Types.CHAR: - case Types.NCHAR: - case Types.CLOB: - case Types.NCLOB: - case Types.VARCHAR: - case Types.LONGVARCHAR: - case Types.NVARCHAR: - case Types.LONGNVARCHAR: - preparedStatement.setString(columnIndex + 1, column - .asString()); - break; + public void singalTaskFinish() { + lock.lock(); + condition.signal(); + lock.unlock(); + } - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - case Types.NUMERIC: - case Types.DECIMAL: - case Types.FLOAT: - case Types.REAL: - case Types.DOUBLE: - String strValue = column.asString(); - if (emptyAsNull && "".equals(strValue)) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setString(columnIndex + 1, strValue); - } - break; + @Override + public void destroy(Configuration writerSliceConfig) { + if (concurrentWriter != null) { + concurrentWriter.destory(); + } + // 把本级持有的conn关闭掉 + DBUtil.closeDBResources(null, connHolder.getConn()); + super.destroy(writerSliceConfig); + } - //tinyint is a little special in some database like mysql {boolean->tinyint(1)} - case Types.TINYINT: - Long longValue = column.asLong(); - if (null == longValue) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setString(columnIndex + 1, longValue.toString()); - } - break; + public class ConcurrentTableWriter { + private BlockingQueue> queue; + private List abstractInsertTasks; + private Configuration config; + private ServerConnectInfo connectInfo; + private String rewriteRecordSql; + private AtomicLong totalTaskCount; + private AtomicLong finishTaskCount; + private final int threadCount; - // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 - case Types.DATE: - if (typeName == null) { - typeName = this.resultSetMetaData.getRight().get(columnIndex); - } + public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) { + threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT); + queue = new LinkedBlockingQueue>(threadCount << 1); + abstractInsertTasks = new ArrayList(threadCount); + this.config = config; + this.connectInfo = connInfo; + this.rewriteRecordSql = rewriteRecordSql; + this.totalTaskCount = new AtomicLong(0); + this.finishTaskCount = new AtomicLong(0); + } - if (typeName.equalsIgnoreCase("year")) { - if (column.asBigInteger() == null) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); - } - } else { - java.sql.Date sqlDate = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "Date 类型转换错误:[%s]", column)); - } + public long getTotalTaskCount() { + return totalTaskCount.get(); + } - if (null != utilDate) { - sqlDate = new java.sql.Date(utilDate.getTime()); - } - preparedStatement.setDate(columnIndex + 1, sqlDate); - } - break; + public long getFinishTaskCount() { + return finishTaskCount.get(); + } - case Types.TIME: - java.sql.Time sqlTime = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "TIME 类型转换错误:[%s]", column)); - } + public int getTaskQueueSize() { + return queue.size(); + } - if (null != utilDate) { - sqlTime = new java.sql.Time(utilDate.getTime()); - } - preparedStatement.setTime(columnIndex + 1, sqlTime); - break; + public void increFinishCount() { + finishTaskCount.incrementAndGet(); + } - case Types.TIMESTAMP: - java.sql.Timestamp sqlTimestamp = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "TIMESTAMP 类型转换错误:[%s]", column)); - } + //should check after put all the task in the queue + public boolean checkFinish() { + long finishCount = finishTaskCount.get(); + long totalCount = totalTaskCount.get(); + return finishCount == totalCount; + } - if (null != utilDate) { - sqlTimestamp = new java.sql.Timestamp( - utilDate.getTime()); - } - preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); - break; - case Types.VARBINARY: - case Types.BLOB: - case Types.LONGVARBINARY: - preparedStatement.setBytes(columnIndex + 1, column - .asBytes()); - break; - case Types.BINARY: - String isArray = column.getRawData().toString(); - if (isArray.startsWith("[")&&isArray.endsWith("]")){ - preparedStatement.setString(columnIndex + 1, column - .asString()); - }else { - preparedStatement.setBytes(columnIndex + 1, column - .asBytes()); - } - break; - case Types.BOOLEAN: - preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); - break; + public synchronized void start() { + for (int i = 0; i < threadCount; ++i) { + LOG.info("start {} insert task.", (i + 1)); + AbstractInsertTask insertTask = null; + if (directPath) { + insertTask = new DirectPathInsertTask(taskId, queue, config, connectInfo, ConcurrentTableWriterTask.this, this); + } else { + insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql); + } + insertTask.setWriterTask(ConcurrentTableWriterTask.this); + insertTask.setWriter(this); + abstractInsertTasks.add(insertTask); + } + WriterThreadPool.executeBatch(abstractInsertTasks); + } - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes - case Types.BIT: - if (this.dataBaseType == DataBaseType.MySql) { - preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); - } else { - preparedStatement.setString(columnIndex + 1, column.asString()); - } - break; - default: - throw DataXException - .asDataXException( - DBUtilErrorCode.UNSUPPORTED_TYPE, - String.format( - "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", - this.resultSetMetaData.getLeft() - .get(columnIndex), - this.resultSetMetaData.getMiddle() - .get(columnIndex), - this.resultSetMetaData.getRight() - .get(columnIndex))); - } - return preparedStatement; - } + public void doCommit() { + this.abstractInsertTasks.get(0).getConnHolder().doCommit(); + } + + public int getThreadCount() { + return threadCount; + } + + public void printStatistics() { + long insertTotalCost = 0; + long insertTotalCount = 0; + for (AbstractInsertTask task : abstractInsertTasks) { + insertTotalCost += task.getTotalCost(); + insertTotalCount += task.getInsertCount(); + } + long avgCost = 0; + if (insertTotalCount != 0) { + avgCost = insertTotalCost / insertTotalCount; + } + ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms", + insertTotalCount, insertTotalCost, avgCost); + } + + public void addBatchRecords(final List records) throws InterruptedException { + boolean isSucc = false; + while (!isSucc) { + isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS); + checkMemStore(); + } + totalTaskCount.incrementAndGet(); + } + + public synchronized void destory() { + if (abstractInsertTasks != null) { + for (AbstractInsertTask task : abstractInsertTasks) { + task.setStop(); + } + for (AbstractInsertTask task : abstractInsertTasks) { + task.destroy(); + } + } + } + } + + public String getTable() { + return table; + } + + // 直接使用了两个类变量:columnNumber,resultSetMetaData + protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record) + throws SQLException { + for (int i = 0; i < this.columnNumber; i++) { + int columnSqltype = this.resultSetMetaData.getMiddle().get(i); + String typeName = this.resultSetMetaData.getRight().get(i); + preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i)); + } + + return preparedStatement; + } + + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, + int columnSqltype, String typeName, Column column) throws SQLException { + java.util.Date utilDate; + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + break; + + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.NUMERIC: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setString(columnIndex + 1, strValue); + } + break; + + //tinyint is a little special in some database like mysql {boolean->tinyint(1)} + case Types.TINYINT: + Long longValue = column.asLong(); + if (null == longValue) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setString(columnIndex + 1, longValue.toString()); + } + break; + + // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 + case Types.DATE: + if (typeName == null) { + typeName = this.resultSetMetaData.getRight().get(columnIndex); + } + + if (typeName.equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + } + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + } + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIME 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + java.sql.Timestamp sqlTimestamp = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIMESTAMP 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new java.sql.Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + case Types.BINARY: + String isArray = column.getRawData().toString(); + if (isArray.startsWith("[") && isArray.endsWith("]")) { + preparedStatement.setString(columnIndex + 1, column + .asString()); + } else { + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + } + break; + case Types.BOOLEAN: + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + default: + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/DirectPathInsertTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/DirectPathInsertTask.java new file mode 100644 index 00000000..e39ec0c7 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/DirectPathInsertTask.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; + +import java.text.MessageFormat; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.TableCache; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathPreparedStatement; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.DirectPathConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DirectPathInsertTask extends AbstractInsertTask { + private static final Logger LOG = LoggerFactory.getLogger(DirectPathInsertTask.class); + + public DirectPathInsertTask(long taskId, Queue> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) { + super(taskId, recordsQueue, config, connectInfo, task, writer); + } + + @Override + protected void initConnHolder() { + this.connHolder = new DirectPathConnHolder(config, connInfo, writerTask.getTable(), writer.getThreadCount()); + this.connHolder.initConnection(); + } + + @Override + protected void write(List records) { + Table table = TableCache.getInstance().getTable(connInfo.databaseName, writerTask.getTable()); + if (Table.Status.FAILURE.equals(table.getStatus())) { + return; + } + DirectPathConnection conn = (DirectPathConnection) connHolder.getConn(); + if (records != null && !records.isEmpty()) { + long startTime = System.currentTimeMillis(); + try (DirectPathPreparedStatement stmt = conn.createStatement()) { + final int columnNumber = records.get(0).getColumnNumber(); + Object[] values = new Object[columnNumber]; + for (Record record : records) { + for (int i = 0; i < columnNumber; i++) { + values[i] = record.getColumn(i).getRawData(); + } + stmt.addBatch(values); + } + + int[] result = stmt.executeBatch(); + + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Insert {} rows success", Thread.currentThread().getName(), Arrays.stream(result).sum()); + } + calStatistic(System.currentTimeMillis() - startTime); + stmt.clearBatch(); + } catch (Throwable ex) { + String msg = MessageFormat.format("Insert data into table \"{0}\" failed. Error: {1}", writerTask.getTable(), ex.getMessage()); + LOG.error(msg, ex); + table.setError(ex); + table.setStatus(Table.Status.FAILURE); + throw new RuntimeException(msg); + } + } + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java index df80cf7f..1524d76f 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -public class InsertTask implements Runnable { +public class InsertTask extends AbstractInsertTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class); @@ -49,6 +49,7 @@ public class InsertTask implements Runnable { Configuration config, ServerConnectInfo connectInfo, String writeRecordSql) { + super(taskId, recordsQueue, config, connectInfo); this.taskId = taskId; this.queue = recordsQueue; this.connInfo = connectInfo; @@ -62,11 +63,15 @@ public class InsertTask implements Runnable { connHolder.initConnection(); } - void setWriterTask(ConcurrentTableWriterTask writerTask) { + protected void initConnHolder() { + + } + + public void setWriterTask(ConcurrentTableWriterTask writerTask) { this.writerTask = writerTask; } - void setWriter(ConcurrentTableWriter writer) { + public void setWriter(ConcurrentTableWriter writer) { this.writer = writer; } @@ -109,6 +114,10 @@ public class InsertTask implements Runnable { LOG.debug("Thread exist..."); } + protected void write(List records) { + + } + public void destroy() { connHolder.destroy(); } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java index 8add5382..90657373 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/WriterThreadPool.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +30,8 @@ public class WriterThreadPool { executorService.execute(task); } - public static synchronized void executeBatch(List tasks) { - for (InsertTask task : tasks) { + public static synchronized void executeBatch(List tasks) { + for (AbstractInsertTask task : tasks) { executorService.execute(task); } }