feature:oceanbase plugin add direct path support

This commit is contained in:
ranyu.zyh 2025-04-03 13:25:32 +08:00
parent 129924c79e
commit 8478f7a212
22 changed files with 3045 additions and 630 deletions

View File

@ -115,6 +115,11 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-hbase-client</artifactId>

View File

@ -2,62 +2,86 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer;
public interface Config {
String MEMSTORE_THRESHOLD = "memstoreThreshold";
String MEMSTORE_THRESHOLD = "memstoreThreshold";
double DEFAULT_MEMSTORE_THRESHOLD = 0.9d;
double DEFAULT_MEMSTORE_THRESHOLD = 0.9d;
double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d;
String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond";
double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d;
long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30;
String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond";
int DEFAULT_BATCH_SIZE = 100;
int MAX_BATCH_SIZE = 4096;
long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30;
String FAIL_TRY_COUNT = "failTryCount";
int DEFAULT_BATCH_SIZE = 100;
int DEFAULT_FAIL_TRY_COUNT = 10000;
int MAX_BATCH_SIZE = 4096;
String WRITER_THREAD_COUNT = "writerThreadCount";
String FAIL_TRY_COUNT = "failTryCount";
int DEFAULT_WRITER_THREAD_COUNT = 1;
int DEFAULT_FAIL_TRY_COUNT = 10000;
String CONCURRENT_WRITE = "concurrentWrite";
String WRITER_THREAD_COUNT = "writerThreadCount";
boolean DEFAULT_CONCURRENT_WRITE = true;
int DEFAULT_WRITER_THREAD_COUNT = 1;
String OB_VERSION = "obVersion";
String TIMEOUT = "timeout";
String CONCURRENT_WRITE = "concurrentWrite";
String PRINT_COST = "printCost";
boolean DEFAULT_PRINT_COST = false;
boolean DEFAULT_CONCURRENT_WRITE = true;
String COST_BOUND = "costBound";
long DEFAULT_COST_BOUND = 20;
String OB_VERSION = "obVersion";
String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
String TIMEOUT = "timeout";
String WRITER_SUB_TASK_COUNT = "writerSubTaskCount";
int DEFAULT_WRITER_SUB_TASK_COUNT = 1;
int MAX_WRITER_SUB_TASK_COUNT = 4096;
String PRINT_COST = "printCost";
boolean DEFAULT_PRINT_COST = false;
String COST_BOUND = "costBound";
long DEFAULT_COST_BOUND = 20;
String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
String WRITER_SUB_TASK_COUNT = "writerSubTaskCount";
int DEFAULT_WRITER_SUB_TASK_COUNT = 1;
int MAX_WRITER_SUB_TASK_COUNT = 4096;
String OB_WRITE_MODE = "obWriteMode";
String OB_WRITE_MODE = "obWriteMode";
String OB_COMPATIBLE_MODE = "obCompatibilityMode";
String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout";
int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout";
String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout";
int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s
int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout";
int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s
String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout";
String OB_UPDATE_COLUMNS = "obUpdateColumns";
int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s
String USE_PART_CALCULATOR = "usePartCalculator";
boolean DEFAULT_USE_PART_CALCULATOR = false;
String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout";
int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s
String OB_UPDATE_COLUMNS = "obUpdateColumns";
String USE_PART_CALCULATOR = "usePartCalculator";
boolean DEFAULT_USE_PART_CALCULATOR = false;
String BLOCKS_COUNT = "blocksCount";
String DIRECT_PATH = "directPath";
String RPC_PORT = "rpcPort";
// 区别于recordLimit这个参数仅针对某张表即一张表超过最大错误数不会影响其他表仅用于旁路导入
String MAX_ERRORS = "maxErrors";
}

View File

@ -0,0 +1,88 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.common;
import java.util.Objects;
public class Table {
private String tableName;
private String dbName;
private Throwable error;
private Status status;
public Table(String dbName, String tableName) {
this.dbName = dbName;
this.tableName = tableName;
this.status = Status.INITIAL;
}
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Table table = (Table) o;
return tableName.equals(table.tableName) && dbName.equals(table.dbName);
}
@Override
public int hashCode() {
return Objects.hash(tableName, dbName);
}
public enum Status {
/**
*
*/
INITIAL(0),
/**
*
*/
RUNNING(1),
/**
*
*/
FAILURE(2),
/**
*
*/
SUCCESS(3);
private int code;
/**
* @param code
*/
private Status(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
}

View File

@ -0,0 +1,21 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.common;
import java.util.concurrent.ConcurrentHashMap;
public class TableCache {
private static final TableCache INSTANCE = new TableCache();
private final ConcurrentHashMap<String, Table> TABLE_CACHE;
private TableCache() {
TABLE_CACHE = new ConcurrentHashMap<>();
}
public static TableCache getInstance() {
return INSTANCE;
}
public Table getTable(String dbName, String tableName) {
String fullTableName = String.join("-", dbName, tableName);
return TABLE_CACHE.computeIfAbsent(fullTableName, (k) -> new Table(dbName, tableName));
}
}

View File

@ -0,0 +1,257 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public abstract class AbstractRestrictedConnection implements java.sql.Connection {
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String) is unsupported");
}
@Override
public String nativeSQL(String sql) throws SQLException {
throw new UnsupportedOperationException("nativeSQL(String) is unsupported");
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
throw new UnsupportedOperationException("setAutoCommit(boolean) is unsupported");
}
@Override
public boolean getAutoCommit() throws SQLException {
throw new UnsupportedOperationException("getAutoCommit is unsupported");
}
@Override
public void abort(Executor executor) throws SQLException {
throw new UnsupportedOperationException("abort(Executor) is unsupported");
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
throw new UnsupportedOperationException("setNetworkTimeout(Executor, int) is unsupported");
}
@Override
public int getNetworkTimeout() throws SQLException {
throw new UnsupportedOperationException("getNetworkTimeout is unsupported");
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
throw new UnsupportedOperationException("getMetaData is unsupported");
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
throw new UnsupportedOperationException("setReadOnly(boolean) is unsupported");
}
@Override
public boolean isReadOnly() throws SQLException {
throw new UnsupportedOperationException("isReadOnly is unsupported");
}
@Override
public void setCatalog(String catalog) throws SQLException {
throw new UnsupportedOperationException("setCatalog(String) is unsupported");
}
@Override
public String getCatalog() throws SQLException {
throw new UnsupportedOperationException("getCatalog is unsupported");
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
throw new UnsupportedOperationException("setTransactionIsolation(int) is unsupported");
}
@Override
public int getTransactionIsolation() throws SQLException {
throw new UnsupportedOperationException("getTransactionIsolation is unsupported");
}
@Override
public SQLWarning getWarnings() throws SQLException {
throw new UnsupportedOperationException("getWarnings is unsupported");
}
@Override
public void clearWarnings() throws SQLException {
throw new UnsupportedOperationException("clearWarnings is unsupported");
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("createStatement(int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int, int) is unsupported");
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String, int, int) is unsupported");
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
throw new UnsupportedOperationException("getTypeMap is unsupported");
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
throw new UnsupportedOperationException("setTypeMap(Map<String, Class<?>>) is unsupported");
}
@Override
public void setHoldability(int holdability) throws SQLException {
throw new UnsupportedOperationException("setHoldability is unsupported");
}
@Override
public int getHoldability() throws SQLException {
throw new UnsupportedOperationException("getHoldability is unsupported");
}
@Override
public Savepoint setSavepoint() throws SQLException {
throw new UnsupportedOperationException("setSavepoint is unsupported");
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
throw new UnsupportedOperationException("setSavepoint(String) is unsupported");
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
throw new UnsupportedOperationException("rollback(Savepoint) is unsupported");
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
throw new UnsupportedOperationException("releaseSavepoint(Savepoint) is unsupported");
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("createStatement(int, int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int, int, int) is unsupported");
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String, int, int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int[]) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, String[]) is unsupported");
}
@Override
public Clob createClob() throws SQLException {
throw new UnsupportedOperationException("createClob is unsupported");
}
@Override
public Blob createBlob() throws SQLException {
throw new UnsupportedOperationException("createBlob is unsupported");
}
@Override
public NClob createNClob() throws SQLException {
throw new UnsupportedOperationException("createNClob is unsupported");
}
@Override
public SQLXML createSQLXML() throws SQLException {
throw new UnsupportedOperationException("createSQLXML is unsupported");
}
@Override
public boolean isValid(int timeout) throws SQLException {
throw new UnsupportedOperationException("isValid(int) is unsupported");
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
throw new UnsupportedOperationException("setClientInfo(String, String) is unsupported");
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
throw new UnsupportedOperationException("setClientInfo(Properties) is unsupported");
}
@Override
public String getClientInfo(String name) throws SQLException {
throw new UnsupportedOperationException("getClientInfo(String) is unsupported");
}
@Override
public Properties getClientInfo() throws SQLException {
throw new UnsupportedOperationException("getClientInfo is unsupported");
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
throw new UnsupportedOperationException("createArrayOf(String, Object[]) is unsupported");
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
throw new UnsupportedOperationException("createStruct(String, Object[]) is unsupported");
}
@Override
public void setSchema(String schema) throws SQLException {
throw new UnsupportedOperationException("setSchema(String) is unsupported");
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException("unwrap(Class<T>) is unsupported");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<?>) is unsupported");
}
}

View File

@ -0,0 +1,663 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.NClob;
import java.sql.ParameterMetaData;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.List;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
import com.alipay.oceanbase.rpc.util.ObVString;
import org.apache.commons.io.IOUtils;
public abstract class AbstractRestrictedPreparedStatement implements java.sql.PreparedStatement {
private boolean closed;
@Override
public void setNull(int parameterIndex, int sqlType) throws SQLException {
this.setParameter(parameterIndex, createObObj(null));
}
@Override
public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
throw new UnsupportedOperationException("setNull(int, int, String) is unsupported");
}
@Override
public void setBoolean(int parameterIndex, boolean x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setByte(int parameterIndex, byte x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setShort(int parameterIndex, short x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setLong(int parameterIndex, long x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setFloat(int parameterIndex, float x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBytes(int parameterIndex, byte[] x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDate(int parameterIndex, Date x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setDate(int, Date, Calendar) is unsupported");
}
@Override
public void setTime(int parameterIndex, Time x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setTime(int, Time, Calendar) is unsupported");
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setTimestamp(int, Timestamp, Calendar) is unsupported");
}
@Override
public void setObject(int parameterIndex, Object x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
throw new UnsupportedOperationException("setObject(int, Object, int) is unsupported");
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
throw new UnsupportedOperationException("setObject(int, Object, int, int) is unsupported");
}
@Override
public void setRef(int parameterIndex, Ref x) throws SQLException {
throw new UnsupportedOperationException("setRef(int, Ref) is unsupported");
}
@Override
public void setArray(int parameterIndex, Array x) throws SQLException {
throw new UnsupportedOperationException("setArray(int, Array) is unsupported");
}
@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
throw new UnsupportedOperationException("setSQLXML(int, SQLXML) is unsupported");
}
@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
// if (x == null) {
// this.setParameter(parameterIndex, createObObj(x));
// } else {
// // TODO If need BackslashEscapes and character encoding ?
// this.setParameter(parameterIndex, createObObj(x.toString()));
// }
throw new UnsupportedOperationException("setURL(int, URL) is unsupported");
}
@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
throw new UnsupportedOperationException("setRowId(int, RowId) is unsupported");
}
@Override
public void setNString(int parameterIndex, String value) throws SQLException {
this.setParameter(parameterIndex, createObObj(value));
}
@Override
public void setBlob(int parameterIndex, Blob x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBlob(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBlob(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setBlob(int, InputStream, length) is unsupported");
}
@Override
public void setClob(int parameterIndex, Clob x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setClob(int parameterIndex, Reader x) throws SQLException {
this.setCharacterStream(parameterIndex, x);
}
@Override
public void setClob(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setClob(int, Reader, length) is unsupported");
}
@Override
public void setNClob(int parameterIndex, NClob x) throws SQLException {
this.setClob(parameterIndex, (Clob) (x));
}
@Override
public void setNClob(int parameterIndex, Reader x) throws SQLException {
this.setClob(parameterIndex, x);
}
@Override
public void setNClob(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setNClob(int, Reader, length) is unsupported");
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Deprecated
@Override
public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported");
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported");
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported");
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported");
}
@Override
public void setCharacterStream(int parameterIndex, Reader x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setCharacterStream(int parameterIndex, Reader x, int length) throws SQLException {
throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported");
}
@Override
public void setCharacterStream(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported");
}
@Override
public void setNCharacterStream(int parameterIndex, Reader x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setNCharacterStream(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setNCharacterStream(int, InputStream, length) is unsupported");
}
/**
* @return boolean
*/
protected abstract boolean isOracleMode();
/**
* Set parameter to the target position.
*
* @param parameterIndex
* @param obObj
* @throws SQLException
*/
protected abstract void setParameter(int parameterIndex, ObObj obObj) throws SQLException;
/**
* Close the current prepared statement.
*
* @throws SQLException
*/
@Override
public void close() throws SQLException {
this.closed = true;
}
/**
* Return whether the current prepared statement is closed?
*
* @return boolean
* @throws SQLException
*/
@Override
public boolean isClosed() throws SQLException {
return this.closed;
}
/**
* Create a {@link ObObj } array with input values.
*
* @param values Original row value
* @return ObObj[]
*/
public ObObj[] createObObjArray(Object[] values) {
if (values == null) {
return null;
}
ObObj[] array = new ObObj[values.length];
for (int i = 0; i < values.length; i++) {
array[i] = createObObj(values[i]);
}
return array;
}
/**
* Create a {@link ObObj } array with input values.
*
* @param values Original row value
* @return ObObj[]
*/
public ObObj[] createObObjArray(List<Object> values) {
if (values == null) {
return null;
}
ObObj[] array = new ObObj[values.size()];
for (int i = 0; i < values.size(); i++) {
array[i] = createObObj(values.get(i));
}
return array;
}
/**
* Create a {@link ObObj } instance.
*
* @param value Original column value
* @return ObObj
*/
public ObObj createObObj(Object value) {
try {
// Only used for strongly typed declared variables
Object convertedValue = value == null ? null : convertValue(value);
return new ObObj(ObObjType.defaultObjMeta(convertedValue), convertedValue);
} catch (Exception ex) {
throw new IllegalArgumentException(ex);
}
}
/**
* Some values with data type is unsupported by ObObjType#valueOfType.
* We should convert the input value to supported value data type.
*
* @param value
* @return Object
* @throws Exception
*/
public static Object convertValue(Object value) throws Exception {
if (value instanceof BigDecimal) {
return value.toString();
} else if (value instanceof BigInteger) {
return value.toString();
} else if (value instanceof Instant) {
return Timestamp.from(((Instant) value));
} else if (value instanceof LocalDate) {
// Warn: java.sql.Date.valueOf() is deprecated. As local zone is used.
return Date.valueOf(((LocalDate) value));
} else if (value instanceof LocalTime) {
// Warn: java.sql.Time.valueOf() is deprecated.
Time t = Time.valueOf((LocalTime) value);
return new Timestamp(t.getTime());
} else if (value instanceof LocalDateTime) {
return Timestamp.valueOf(((LocalDateTime) value));
} else if (value instanceof OffsetDateTime) {
return Timestamp.from(((OffsetDateTime) value).toInstant());
} else if (value instanceof Time) {
return new Timestamp(((Time) value).getTime());
} else if (value instanceof ZonedDateTime) {
// Note: Be care of time zone!!!
return Timestamp.from(((ZonedDateTime) value).toInstant());
} else if (value instanceof OffsetTime) {
LocalTime lt = ((OffsetTime) value).toLocalTime();
// Warn: java.sql.Time.valueOf() is deprecated.
return new Timestamp(Time.valueOf(lt).getTime());
} else if (value instanceof InputStream) {
try (InputStream is = ((InputStream) value)) {
// Note: Be care of character set!!!
return new ObVString(IOUtils.toString(is, Charset.defaultCharset()));
}
} else if (value instanceof Blob) {
Blob b = (Blob) value;
try (InputStream is = b.getBinaryStream()) {
if (is == null) {
return null;
}
// Note: Be care of character set!!!
return new ObVString(IOUtils.toString(is, Charset.defaultCharset()));
} finally {
b.free();
}
} else if (value instanceof Reader) {
try (Reader r = ((Reader) value)) {
return IOUtils.toString(r);
}
} else if (value instanceof Clob) {
Clob c = (Clob) value;
try (Reader r = c.getCharacterStream()) {
return r == null ? null : IOUtils.toString(r);
} finally {
c.free();
}
} else {
return value;
}
}
// *********************************************************************************** //
@Override
public boolean getMoreResults(int current) throws SQLException {
throw new UnsupportedOperationException("getMoreResults(int) is unsupported");
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
throw new UnsupportedOperationException("getGeneratedKeys is unsupported");
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, int) is unsupported");
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, int[]) is unsupported");
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, String[]) is unsupported");
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("execute(String, int) is unsupported");
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("execute(String, int[]) is unsupported");
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("execute(String, String[]) is unsupported");
}
@Override
public int getResultSetHoldability() throws SQLException {
throw new UnsupportedOperationException("getResultSetHoldability is unsupported");
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
throw new UnsupportedOperationException("setPoolable(boolean) is unsupported");
}
@Override
public boolean isPoolable() throws SQLException {
throw new UnsupportedOperationException("isPoolable is unsupported");
}
@Override
public void closeOnCompletion() throws SQLException {
throw new UnsupportedOperationException("closeOnCompletion is unsupported");
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
throw new UnsupportedOperationException("isCloseOnCompletion is unsupported");
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
throw new UnsupportedOperationException("executeQuery(String) is unsupported");
}
@Override
public int executeUpdate(String sql) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String) is unsupported");
}
@Override
public int getMaxFieldSize() throws SQLException {
throw new UnsupportedOperationException("getMaxFieldSize is unsupported");
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
throw new UnsupportedOperationException("setMaxFieldSize(int) is unsupported");
}
@Override
public int getMaxRows() throws SQLException {
throw new UnsupportedOperationException("getMaxRows is unsupported");
}
@Override
public void setMaxRows(int max) throws SQLException {
throw new UnsupportedOperationException("setMaxRows(int) is unsupported");
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
throw new UnsupportedOperationException("setEscapeProcessing(boolean) is unsupported");
}
@Override
public int getQueryTimeout() throws SQLException {
throw new UnsupportedOperationException("getQueryTimeout is unsupported");
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
throw new UnsupportedOperationException("setQueryTimeout(int) is unsupported");
}
@Override
public void cancel() throws SQLException {
throw new UnsupportedOperationException("cancel is unsupported");
}
@Override
public SQLWarning getWarnings() throws SQLException {
throw new UnsupportedOperationException("getWarnings is unsupported");
}
@Override
public void clearWarnings() throws SQLException {
throw new UnsupportedOperationException("clearWarnings is unsupported");
}
@Override
public void setCursorName(String name) throws SQLException {
throw new UnsupportedOperationException("setCursorName(String) is unsupported");
}
@Override
public boolean execute(String sql) throws SQLException {
throw new UnsupportedOperationException("execute(String) is unsupported");
}
@Override
public ResultSet getResultSet() throws SQLException {
throw new UnsupportedOperationException("getResultSet is unsupported");
}
@Override
public int getUpdateCount() throws SQLException {
throw new UnsupportedOperationException("getUpdateCount is unsupported");
}
@Override
public boolean getMoreResults() throws SQLException {
throw new UnsupportedOperationException("getMoreResults is unsupported");
}
@Override
public void setFetchDirection(int direction) throws SQLException {
throw new UnsupportedOperationException("setFetchDirection(int) is unsupported");
}
@Override
public int getFetchDirection() throws SQLException {
throw new UnsupportedOperationException("getFetchDirection is unsupported");
}
@Override
public void setFetchSize(int rows) throws SQLException {
throw new UnsupportedOperationException("setFetchSize(int) is unsupported");
}
@Override
public int getFetchSize() throws SQLException {
throw new UnsupportedOperationException("getFetchSize is unsupported");
}
@Override
public int getResultSetConcurrency() throws SQLException {
throw new UnsupportedOperationException("getResultSetConcurrency is unsupported");
}
@Override
public int getResultSetType() throws SQLException {
throw new UnsupportedOperationException("getResultSetType is unsupported");
}
@Override
public void addBatch(String sql) throws SQLException {
throw new UnsupportedOperationException("addBatch(String) is unsupported");
}
@Override
public ResultSet executeQuery() throws SQLException {
throw new UnsupportedOperationException("executeQuery is unsupported");
}
@Override
public int executeUpdate() throws SQLException {
throw new UnsupportedOperationException("executeUpdate is unsupported");
}
@Override
public boolean execute() throws SQLException {
throw new UnsupportedOperationException("execute is unsupported");
}
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new UnsupportedOperationException("getParameterMetaData is unsupported");
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
throw new UnsupportedOperationException("getMetaData is unsupported");
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<T>) is unsupported");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<?>) is unsupported");
}
}

View File

@ -0,0 +1,170 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.io.Serializable;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The builder for {@link ObTableDirectLoad}.
*/
public class DirectLoaderBuilder implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DirectLoaderBuilder.class);
private String host;
private int port;
private String user;
private String tenant;
private String password;
private String schema;
private String table;
/**
* Server-side parallelism.
*/
private int parallel;
private long maxErrorCount;
private ObLoadDupActionType duplicateKeyAction;
/**
* The overall timeout of the direct load task
*/
private Long timeout;
private Long heartBeatTimeout;
private Long heartBeatInterval;
public DirectLoaderBuilder host(String host) {
this.host = host;
return this;
}
public DirectLoaderBuilder port(int port) {
this.port = port;
return this;
}
public DirectLoaderBuilder user(String user) {
//1.4.0的obkv版本只需要用户名称不能带租户和集群信息
int indexOf = user.indexOf("@");
this.user = user;
if (indexOf > 0) {
this.user = user.substring(0, indexOf);
}
return this;
}
public DirectLoaderBuilder tenant(String tenant) {
this.tenant = tenant;
return this;
}
public DirectLoaderBuilder password(String password) {
this.password = password;
return this;
}
public DirectLoaderBuilder schema(String schema) {
this.schema = schema;
return this;
}
public DirectLoaderBuilder table(String table) {
this.table = table;
return this;
}
public DirectLoaderBuilder parallel(int parallel) {
this.parallel = parallel;
return this;
}
public DirectLoaderBuilder maxErrorCount(long maxErrorCount) {
this.maxErrorCount = maxErrorCount;
return this;
}
public DirectLoaderBuilder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
this.duplicateKeyAction = duplicateKeyAction;
return this;
}
public DirectLoaderBuilder timeout(long timeout) {
this.timeout = timeout;
return this;
}
public DirectLoaderBuilder heartBeatTimeout(Long heartBeatTimeout) {
this.heartBeatTimeout = heartBeatTimeout;
return this;
}
public DirectLoaderBuilder heartBeatInterval(Long heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
return this;
}
public ObTableDirectLoad build() {
try {
ObDirectLoadConnection obDirectLoadConnection = buildConnection(parallel);
ObDirectLoadStatement obDirectLoadStatement = buildStatement(obDirectLoadConnection);
return new ObTableDirectLoad(schema, table, obDirectLoadStatement, obDirectLoadConnection);
} catch (ObDirectLoadException e) {
throw new ObTableException(e.getMessage(), e);
}
}
private ObDirectLoadConnection buildConnection(int writeThreadNum) throws ObDirectLoadException {
if (heartBeatTimeout == null || heartBeatInterval == null) {
throw new IllegalArgumentException("heartBeatTimeout and heartBeatInterval must not be null");
}
ObDirectLoadConnection build = ObDirectLoadManager.getConnectionBuilder()
.setServerInfo(host, port)
.setLoginInfo(tenant, user, password, schema)
.setHeartBeatInfo(heartBeatTimeout, heartBeatInterval)
.enableParallelWrite(writeThreadNum)
.build();
log.info("ObDirectLoadConnection value is:{}", ObjectUtils.toString(build));
return build;
}
private ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection) throws ObDirectLoadException {
ObDirectLoadStatement build = connection.getStatementBuilder()
.setTableName(table)
.setParallel(parallel)
.setQueryTimeout(timeout)
.setDupAction(duplicateKeyAction)
.setMaxErrorRowCount(maxErrorCount)
.build();
log.info("ObDirectLoadStatement value is:{}", ObjectUtils.toString(build));
return build;
}
}

View File

@ -0,0 +1,398 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.Arrays;
import com.alibaba.datax.common.util.Configuration;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
public class DirectPathConnection extends AbstractRestrictedConnection {
private static final int OB_DIRECT_PATH_DEFAULT_BLOCKS = 1;
private static final long OB_DIRECT_PATH_HEART_BEAT_TIMEOUT = 60000;
private static final long OB_DIRECT_PATH_HEART_BEAT_INTERVAL = 10000;
private static final int DEFAULT_BUFFERSIZE = 1048576;
private final Configuration configuration;
private State state;
private int commiters;
private final int blocks;
private final ObTableDirectLoad load;
private final Object lock = new Object();
private static final Logger log = LoggerFactory.getLogger(DirectPathConnection.class);
/**
* Construct a new instance.
*
* @param load
* @param blocks
*/
private DirectPathConnection(ObTableDirectLoad load, int blocks, Configuration configuration) {
this.configuration = configuration;
this.load = load;
this.blocks = blocks;
}
/**
* Begin a new {@link DirectPathConnection }
*
* @return DirectPathConnection
* @throws SQLException
*/
public DirectPathConnection begin() throws SQLException {
synchronized (lock) {
if (state == null || state == State.CLOSED) {
try {
this.load.begin();
this.state = State.BEGIN;
} catch (Exception ex) {
throw new SQLException(ex);
}
} else {
throw new IllegalStateException("Begin transaction failed as connection state is already BEGIN");
}
}
return this;
}
/**
* Commit buffered data with MAXIMUM timeout.
*
* @throws SQLException
*/
@Override
public void commit() throws SQLException {
synchronized (lock) {
if (state == State.BEGIN) {
this.commiters++;
if (commiters == blocks) {
try {
this.load.commit();
state = State.FINISHED;
} catch (Exception ex) {
throw new SQLException(ex);
}
} else if (commiters > blocks) {
throw new IllegalStateException("Your commit have exceed the limit. (" + commiters + ">" + blocks + ")");
}
} else {
throw new IllegalStateException("Commit transaction failed as connection state is not BEGIN");
}
}
}
/**
* Rollback if error occurred.
*
* @throws SQLException
*/
@Override
public void rollback() throws SQLException {
synchronized (lock) {
if (state == State.BEGIN) {
try {
//obkv-table-client-2.1.0的close方法包含回滚逻辑
this.load.close();
} catch (Exception ex) {
throw new SQLException(ex);
}
} else {
throw new IllegalStateException("Rollback transaction failed as connection state is not BEGIN");
}
}
}
/**
* Close this connection.
*/
@Override
public void close() {
synchronized (lock) {
// Closed only if state is BEGIN
this.load.close();
this.state = State.CLOSED;
}
}
/**
* @return DirectPathPreparedStatement
*/
@Override
public DirectPathPreparedStatement createStatement() throws SQLException {
return this.prepareStatement(null);
}
/**
* A new batch need create a new {@link DirectPathPreparedStatement }.
* The {@link DirectPathPreparedStatement } can not be reuse, otherwise it may cause duplicate records.
*
* @return DirectPathStatement
*/
@Override
public DirectPathPreparedStatement prepareStatement(String sql) throws SQLException {
if (state == State.BEGIN) {
Integer bufferSize = configuration.getInt(DirectPathConstants.BUFFERSIZE, DEFAULT_BUFFERSIZE);
log.info("The current bufferSize size is{}", bufferSize);
return new DirectPathPreparedStatement(this, bufferSize);
} else {
throw new IllegalStateException("Create statement failed as connection state is not BEGIN");
}
}
/**
* Return the schema name of this connection instance.
*
* @return String
*/
@Override
public String getSchema() {
if (state == State.BEGIN) {
return this.load.getTable().getDatabase();
} else {
throw new IllegalStateException("Get schema failed as connection state is not BEGIN");
}
}
/**
* Return the table name of this connection instance.
*
* @return String
*/
public String getTableName() {
if (state == State.BEGIN) {
return this.load.getTableName();
} else {
throw new IllegalStateException("Get table failed as connection state is not BEGIN");
}
}
/**
* Return whether this connection is closed.
*
* @return boolean
*/
@Override
public boolean isClosed() {
synchronized (lock) {
return this.state == State.CLOSED;
}
}
public boolean isFinished() {
return this.state.equals(State.FINISHED);
}
/**
* Insert bucket data into buffer.
*
* @param bucket
* @return int[]
* @throws SQLException
*/
int[] insert(ObDirectLoadBucket bucket) throws SQLException {
try {
this.load.write(bucket);
int[] result = new int[bucket.getRowNum()];
Arrays.fill(result, 1);
return result;
} catch (Exception ex) {
throw new SQLException(ex);
}
}
/**
* Indicates the state of {@link DirectPathConnection }
*/
enum State {
/**
* Begin transaction
*/
BEGIN,
/**
* Transaction is finished, ready to close.
*/
FINISHED,
/**
* Transaction is closed.
*/
CLOSED;
}
/**
* This builder used to build a new {@link DirectPathConnection }
*/
public static class Builder {
private String host;
private int port;
private String user;
private String tenant;
private String password;
private String schema;
private String table;
/**
* Client job count.
*/
private int blocks = OB_DIRECT_PATH_DEFAULT_BLOCKS;
/**
* Server threads used to sort.
*/
private int parallel;
private long maxErrorCount;
private ObLoadDupActionType duplicateKeyAction;
// Used for load data
private long serverTimeout;
private Configuration configuration;
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder user(String user) {
this.user = user;
return this;
}
public Builder tenant(String tenant) {
this.tenant = tenant;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
public Builder schema(String schema) {
this.schema = schema;
return this;
}
public Builder table(String table) {
this.table = table;
return this;
}
public Builder blocks(int blocks) {
this.blocks = blocks;
return this;
}
public Builder parallel(int parallel) {
this.parallel = parallel;
return this;
}
public Builder maxErrorCount(long maxErrorCount) {
this.maxErrorCount = maxErrorCount;
return this;
}
public Builder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
this.duplicateKeyAction = duplicateKeyAction;
return this;
}
public Builder serverTimeout(long serverTimeout) {
this.serverTimeout = serverTimeout;
return this;
}
public Builder configuration(Configuration configuration) {
this.configuration = configuration;
return this;
}
/**
* Build a new {@link DirectPathConnection }
*
* @return DirectPathConnection
*/
public DirectPathConnection build() throws Exception {
return createConnection(host, port, user, tenant, password, schema, table, //
blocks, parallel, maxErrorCount, duplicateKeyAction, serverTimeout, duplicateKeyAction).begin();
}
/**
* Create a new {@link DirectPathConnection }
*
* @param host
* @param port
* @param user
* @param tenant
* @param password
* @param schema
* @param table
* @param parallel
* @param maxErrorCount
* @param action
* @param serverTimeout
* @return DirectPathConnection
* @throws Exception
*/
DirectPathConnection createConnection(String host, int port, String user, String tenant, String password, String schema, String table, //
int blocks, int parallel, long maxErrorCount, ObLoadDupActionType action, long serverTimeout, ObLoadDupActionType obLoadDupActionType) throws Exception {
checkArgument(StringUtils.isNotBlank(host), "Host is null.(host=%s)", host);
checkArgument((port > 0 && port < 65535), "Port is invalid.(port=%s)", port);
checkArgument(StringUtils.isNotBlank(user), "User Name is null.(user=%s)", user);
checkArgument(StringUtils.isNotBlank(tenant), "Tenant Name is null.(tenant=%s)", tenant);
checkArgument(StringUtils.isNotBlank(schema), "Schema Name is null.(schema=%s)", schema);
checkArgument(StringUtils.isNotBlank(table), "Table Name is null.(table=%s)", table);
checkArgument(blocks > 0, "Client Blocks is invalid.(blocks=%s)", blocks);
checkArgument(parallel > 0, "Server Parallel is invalid.(parallel=%s)", parallel);
checkArgument(maxErrorCount > -1, "MaxErrorCount is invalid.(maxErrorCount=%s)", maxErrorCount);
checkArgument(action != null, "ObLoadDupActionType is null.(obLoadDupActionType=%s)", action);
checkArgument(serverTimeout > 0, "Server timeout is invalid.(timeout=%s)", serverTimeout);
Long heartBeatTimeout = 0L;
Long heartBeatInterval = 0L;
if (configuration != null) {
heartBeatTimeout = configuration.getLong(DirectPathConstants.HEART_BEAT_TIMEOUT, OB_DIRECT_PATH_HEART_BEAT_TIMEOUT);
heartBeatInterval = configuration.getLong(DirectPathConstants.HEART_BEAT_INTERVAL, OB_DIRECT_PATH_HEART_BEAT_INTERVAL);
parallel = configuration.getInt(DirectPathConstants.PARALLEL, parallel);
}
DirectLoaderBuilder builder = new DirectLoaderBuilder()
.host(host).port(port)
.user(user)
.tenant(tenant)
.password(password)
.schema(schema)
.table(table)
.parallel(parallel)
.maxErrorCount(maxErrorCount)
.timeout(serverTimeout)
.duplicateKeyAction(obLoadDupActionType)
.heartBeatTimeout(heartBeatTimeout)
.heartBeatInterval(heartBeatInterval);
ObTableDirectLoad directLoad = builder.build();
return new DirectPathConnection(directLoad, blocks, configuration);
}
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
public class DirectPathConstants {
// 以下常量已在DirectPathConnection中被正确使用
public static final String HEART_BEAT_TIMEOUT = "heartBeatTimeout";
public static final String HEART_BEAT_INTERVAL = "heartBeatInterval";
public static final String PARALLEL = "parallel";
public static final String BUFFERSIZE = "bufferSize";
}

View File

@ -0,0 +1,164 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import static com.google.common.base.Preconditions.checkArgument;
public class DirectPathPreparedStatement extends AbstractRestrictedPreparedStatement {
private ObDirectLoadBucket bucket;
private final DirectPathConnection conn;
private final Map<Integer, ObObj> parameters;
private final Integer bufferSize;
private static final int DEFAULT_BUFFERSIZE = 1048576;
public static final int[] EMPTY_ARRAY = new int[0];
/**
* Construct a new {@link DirectPathConnection } instance.
*
* @param conn
*/
public DirectPathPreparedStatement(DirectPathConnection conn) {
this.conn = conn;
this.parameters = new HashMap<>();
this.bufferSize = DEFAULT_BUFFERSIZE;
this.bucket = new ObDirectLoadBucket();
}
public DirectPathPreparedStatement(DirectPathConnection conn, Integer bufferSize) {
this.conn = conn;
this.parameters = new HashMap<>();
this.bufferSize = bufferSize;
this.bucket = new ObDirectLoadBucket(bufferSize);
}
/**
* Return current direct path connection.
*
* @return DirectPathConnection
* @throws SQLException
*/
@Override
public DirectPathConnection getConnection() throws SQLException {
return this.conn;
}
/**
* Copy a new row data avoid overwrite.
*
* @throws SQLException
*/
@Override
public void addBatch() throws SQLException {
checkRange();
ObObj[] objObjArray = new ObObj[parameters.size()];
for (Map.Entry<Integer, ObObj> entry : parameters.entrySet()) {
objObjArray[entry.getKey() - 1] = entry.getValue();
}
this.addBatch(objObjArray);
}
/**
* Add a new row into buffer with input original value list.
*
* @param values One original row data.
*/
public void addBatch(List<Object> values) {
this.addBatch(createObObjArray(values));
}
/**
* Add a new row into buffer with input original value array.
*
* @param values One original row data.
*/
public void addBatch(Object[] values) {
this.addBatch(createObObjArray(values));
}
/**
* Add a new row into buffer with input ObObj array.
*
* @param arr One row data described as ObObj.
*/
private void addBatch(ObObj[] arr) {
checkArgument(arr != null && arr.length > 0, "Input values is null");
try {
this.bucket.addRow(arr);
} catch (ObDirectLoadException e) {
throw new RuntimeException(e);
}
}
/**
* Buffered the row data in memory. (defined in the bucket)
* You must invoke {@code ObDirectLoadBucket.clearBatch } after executeBatch.
*
* @return int[]
* @throws SQLException
*/
@Override
public int[] executeBatch() throws SQLException {
return this.bucket.isEmpty() ? EMPTY_ARRAY : this.conn.insert(bucket);
}
/**
* Clear batch is always recreate a new {@link ObDirectLoadBucket}
*/
@Override
public void clearBatch() {
this.parameters.clear();
this.bucket = new ObDirectLoadBucket(bufferSize);
}
/**
* Clear the holder parameters.
*
* @throws SQLException
*/
@Override
public void clearParameters() throws SQLException {
this.parameters.clear();
}
/**
* @return boolean
*/
@Override
public boolean isOracleMode() {
return false;
}
/**
* Set parameter to the target position.
*
* @param parameterIndex Start From 1
* @param obObj Convert original value to {@link ObObj }
* @throws SQLException
*/
@Override
protected void setParameter(int parameterIndex, ObObj obObj) throws SQLException {
checkArgument(parameterIndex > 0, "Parameter index should start from 1");
this.parameters.put(parameterIndex, obObj);
}
/**
* Avoid range exception:
* <p>
* Map.put(1, "abc");
* Map.put(5, "def"); // Error: parameter index is 5, but 2 values exists.
*/
private void checkRange() {
OptionalInt optionalInt = parameters.keySet().stream().mapToInt(e -> e).max();
int parameterIndex = optionalInt.orElseThrow(() -> new IllegalArgumentException("No parameter index found"));
checkArgument(parameterIndex == parameters.size(), "Parameter index(%s) is unmatched with value list(%s)", parameterIndex, parameters.size());
}
}

View File

@ -0,0 +1,154 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.Objects;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
import com.alipay.oceanbase.rpc.table.ObTable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper of the direct-load API for OceanBase.
*/
public class ObTableDirectLoad implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ObTableDirectLoad.class);
private final String tableName;
private final String schemaTableName;
private final ObDirectLoadStatement statement;
private final ObDirectLoadConnection connection;
public ObTableDirectLoad(String schemaName, String tableName, ObDirectLoadStatement statement, ObDirectLoadConnection connection) {
Objects.requireNonNull(schemaName, "schemaName must not be null");
Objects.requireNonNull(tableName, "tableName must not be null");
Objects.requireNonNull(statement, "statement must not be null");
Objects.requireNonNull(connection, "connection must not be null");
this.tableName = tableName;
this.schemaTableName = String.format("%s.%s", schemaName, tableName);
this.statement = statement;
this.connection = connection;
}
/**
* Begin the direct load operation.
*
* @throws ObDirectLoadException if an error occurs during the operation.
*/
public void begin() throws ObDirectLoadException {
statement.begin();
}
/**
* Write data into the direct load operation.
*
* @param bucket The data bucket to write.
* @throws SQLException if writing fails.
*/
public void write(ObDirectLoadBucket bucket) throws SQLException {
try {
if (bucket == null || bucket.isEmpty()) {
throw new IllegalArgumentException("Bucket must not be null or empty.");
}
LOG.info("Writing {} rows to table: {}", bucket.getRowNum(), schemaTableName);
statement.write(bucket);
LOG.info("Successfully wrote bucket data to table: {}", schemaTableName);
} catch (ObDirectLoadException e) {
LOG.error("Failed to write to table: {}", schemaTableName, e);
throw new SQLException(String.format("Failed to write to table: %s", schemaTableName), e);
}
}
/**
* Commit the current direct load operation.
*
* @throws SQLException if commit fails.
*/
public void commit() throws SQLException {
try {
LOG.info("Committing direct load for table: {}", schemaTableName);
statement.commit();
LOG.info("Successfully committed direct load for table: {}", schemaTableName);
} catch (ObDirectLoadException e) {
LOG.error("Failed to commit for table: {}", schemaTableName, e);
throw new SQLException(String.format("Failed to commit for table: %s", schemaTableName), e);
}
}
/**
* Close the direct load operation.
*/
public void close() {
LOG.info("Closing direct load for table: {}", schemaTableName);
statement.close();
connection.close();
LOG.info("Direct load closed for table: {}", schemaTableName);
}
/**
* Gets the status from the current connection based on the traceId
*/
public ObTableLoadClientStatus getStatus() throws SQLException {
ObDirectLoadTraceId traceId = statement.getTraceId();
// Check if traceId is null and throw an exception with a clear message
if (traceId == null) {
throw new SQLException("traceId is null.");
}
// Retrieve the status using the traceId
ObTableLoadClientStatus status = statement.getConnection().getProtocol().getHeartBeatRpc(traceId).getStatus();
if (status == null) {
LOG.info("Direct load connect protocol heartBeatRpc for table is null: {}", schemaTableName);
throw new SQLException("status is null.");
}
// Return status if not null; otherwise, return ERROR
return status;
}
/**
* Gets the current table
*/
public ObTable getTable() {
try {
return this.statement.getObTablePool().getControlObTable();
} catch (ObDirectLoadException e) {
throw new RuntimeException(e);
}
}
public String getTableName() {
if (StringUtils.isBlank(tableName)) {
throw new IllegalArgumentException("tableName is blank.");
}
return tableName;
}
/**
* Inserts data into the direct load operation.
*
* @param bucket The data bucket containing rows to insert.
* @throws SQLException if an error occurs during the insert operation.
*/
public void insert(ObDirectLoadBucket bucket) throws SQLException {
LOG.info("Inserting {} rows to table: {}", bucket.getRowNum(), schemaTableName);
if (bucket.isEmpty()) {
LOG.warn("Parameter 'bucket' must not be empty.");
throw new IllegalArgumentException("Parameter 'bucket' must not be empty.");
}
try {
// Perform the insertion into the load operation
statement.write(bucket);
LOG.info("Successfully inserted data into table: {}", schemaTableName);
} catch (Exception ex) {
LOG.error("Unexpected error during insert operation for table: {}", schemaTableName, ex);
throw new SQLException("Unexpected error during insert operation.", ex);
}
}
}

View File

@ -14,6 +14,17 @@ public abstract class AbstractConnHolder {
protected final Configuration config;
protected Connection conn;
protected String jdbcUrl;
protected String userName;
protected String password;
protected AbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
this.config = config;
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public AbstractConnHolder(Configuration config) {
this.config = config;
}
@ -45,4 +56,6 @@ public abstract class AbstractConnHolder {
public abstract String getUserName();
public abstract void destroy();
public abstract void doCommit();
}

View File

@ -0,0 +1,61 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class DirectPathAbstractConnHolder {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnHolder.class);
protected Configuration config;
protected String jdbcUrl;
protected String userName;
protected String password;
protected Connection conn;
protected DirectPathAbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
this.config = config;
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public Connection reconnect() {
DBUtil.closeDBResources(null, conn);
return initConnection();
}
public Connection getConn() {
if (conn == null) {
return initConnection();
} else {
try {
if (conn.isClosed()) {
return reconnect();
}
return conn;
} catch (Exception e) {
LOG.debug("can not judge whether the hold connection is closed or not, just reuse the hold connection");
return conn;
}
}
}
public String getJdbcUrl() {
return jdbcUrl;
}
public Configuration getConfig() {
return config;
}
public void doCommit() {}
public abstract void destroy();
public abstract Connection initConnection();
}

View File

@ -0,0 +1,115 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirectPathConnHolder extends AbstractConnHolder {
private static final Logger LOG = LoggerFactory.getLogger(DirectPathConnHolder.class);
/**
* The server side timeout.
*/
private static final long SERVER_TIMEOUT = 24L * 60 * 60 * 1000 * 1000;
private static final ConcurrentHashMap<Table, DirectPathConnection> cache = new ConcurrentHashMap<>();
private String tableName;
private String host;
private int rpcPort;
private String tenantName;
private String databaseName;
private int blocks;
private int threads;
private int maxErrors;
private ObLoadDupActionType duplicateKeyAction;
public DirectPathConnHolder(Configuration config, ServerConnectInfo connectInfo, String tableName, int threadsPerChannel) {
super(config, connectInfo.jdbcUrl, connectInfo.userName, connectInfo.password);
// direct path:
// publicCloud & odp - single or full
// publicCloud & observer - not support
// !publicCloud & odp - full
// !publicCloud & observer - single
this.userName = connectInfo.getFullUserName();
this.host = connectInfo.host;
this.rpcPort = connectInfo.rpcPort;
this.tenantName = connectInfo.tenantName;
if (!connectInfo.publicCloud && StringUtils.isEmpty(tenantName)) {
throw new IllegalStateException("tenant name is needed when using direct path load in private cloud.");
}
this.databaseName = connectInfo.databaseName;
this.tableName = tableName;
this.blocks = config.getInt(Config.BLOCKS_COUNT, 1);
this.threads = threadsPerChannel * Math.min(blocks, 32);
this.maxErrors = config.getInt(Config.MAX_ERRORS, 0);
this.duplicateKeyAction = "insert".equalsIgnoreCase(config.getString(Config.OB_WRITE_MODE)) ? ObLoadDupActionType.IGNORE : ObLoadDupActionType.REPLACE;
}
@Override
public Connection initConnection() {
synchronized (cache) {
conn = cache.computeIfAbsent(new Table(databaseName, tableName), e -> {
try {
return new DirectPathConnection.Builder().host(host) //
.port(rpcPort) //
.tenant(tenantName) //
.user(userName) //
.password(Optional.ofNullable(password).orElse("")) //
.schema(databaseName) //
.table(tableName) //
.blocks(blocks) //
.parallel(threads) //
.maxErrorCount(maxErrors) //
.duplicateKeyAction(duplicateKeyAction) //
.serverTimeout(SERVER_TIMEOUT) //
.configuration(config)
.build();
} catch (Exception ex) {
throw DataXException.asDataXException(DBUtilErrorCode.CONN_DB_ERROR, ex);
}
});
}
return conn;
}
public String getJdbcUrl() {
return "";
}
public String getUserName() {
return "";
}
@Override
public void destroy() {
if (conn != null && ((DirectPathConnection) conn).isFinished()) {
DBUtil.closeDBResources(null, conn);
}
}
@Override
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,42 +1,54 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import com.alibaba.datax.common.util.Configuration;
/**
* wrap oceanbase java client
*
* @author oceanbase
*/
public class OCJConnHolder extends AbstractConnHolder {
private ServerConnectInfo connectInfo;
private String dataSourceKey;
private ServerConnectInfo connectInfo;
private String dataSourceKey;
public OCJConnHolder (Configuration config, ServerConnectInfo connInfo) {
super(config);
this.connectInfo = connInfo;
this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName);
OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName);
}
public OCJConnHolder(Configuration config, ServerConnectInfo connInfo) {
super(config);
this.connectInfo = connInfo;
this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName);
OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName);
}
@Override
public Connection initConnection() {
conn = OBDataSourceV10.getConnection(dataSourceKey);
return conn;
}
@Override
public Connection initConnection() {
conn = OBDataSourceV10.getConnection(dataSourceKey);
return conn;
}
@Override
public String getJdbcUrl() {
return connectInfo.jdbcUrl;
}
@Override
public String getJdbcUrl() {
return connectInfo.jdbcUrl;
}
@Override
public String getUserName() {
return connectInfo.userName;
}
public void destroy() {
OBDataSourceV10.destory(this.dataSourceKey);
}
@Override
public String getUserName() {
return connectInfo.userName;
}
public void destroy() {
OBDataSourceV10.destory(this.dataSourceKey);
}
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@ -14,50 +15,60 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
* 数据库连接代理对象,负责创建连接重新连接
*
* @author oceanbase
*
*/
public class ObClientConnHolder extends AbstractConnHolder {
private final String jdbcUrl;
private final String userName;
private final String password;
private final String jdbcUrl;
private final String userName;
private final String password;
public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
super(config);
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
super(config);
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
// Connect to ob with obclient and obproxy
@Override
public Connection initConnection() {
String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl);
DataBaseType dbType = DataBaseType.OceanBase;
if (ObWriterUtils.isOracleMode()) {
// set up for writing timestamp columns
List<String> sessionConfig = config.getList(Key.SESSION, new ArrayList<String>(), String.class);
sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'");
config.set(Key.SESSION, sessionConfig);
}
conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password);
DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE);
return conn;
}
// Connect to ob with obclient and obproxy
@Override
public Connection initConnection() {
String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl);
DataBaseType dbType = DataBaseType.OceanBase;
if (ObWriterUtils.isOracleMode()) {
// set up for writing timestamp columns
List<String> sessionConfig = config.getList(Key.SESSION, new ArrayList<String>(), String.class);
sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'");
config.set(Key.SESSION, sessionConfig);
}
conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password);
DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE);
return conn;
}
@Override
public String getJdbcUrl() {
return jdbcUrl;
}
@Override
public String getJdbcUrl() {
return jdbcUrl;
}
@Override
public String getUserName() {
return userName;
}
@Override
public String getUserName() {
return userName;
}
@Override
public void destroy() {
DBUtil.closeDBResources(null, conn);
}
@Override
public void destroy() {
DBUtil.closeDBResources(null, conn);
}
@Override
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -5,95 +5,112 @@ import static org.apache.commons.lang3.StringUtils.EMPTY;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.datax.common.util.Configuration;
public class ServerConnectInfo {
public String clusterName;
public String tenantName;
public String userName;
public String password;
public String databaseName;
public String ipPort;
public String jdbcUrl;
public boolean publicCloud;
/**
*
* @param jdbcUrl format is jdbc:oceanbase//ip:port
* @param username format is cluster:tenant:username or username@tenant#cluster or user@tenant or user
* @param password
*/
public ServerConnectInfo(final String jdbcUrl, final String username, final String password) {
this.jdbcUrl = jdbcUrl;
this.password = password;
parseJdbcUrl(jdbcUrl);
parseFullUserName(username);
}
public String clusterName;
public String tenantName;
// userName doesn't contain tenantName or clusterName
public String userName;
public String password;
public String databaseName;
public String ipPort;
public String jdbcUrl;
public String host;
public String port;
public boolean publicCloud;
public int rpcPort;
public Configuration config;
private void parseJdbcUrl(final String jdbcUrl) {
Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w-]+)\\?");
Matcher matcher = pattern.matcher(jdbcUrl);
if (matcher.find()) {
String ipPort = matcher.group(1);
String dbName = matcher.group(2);
this.ipPort = ipPort;
this.databaseName = dbName;
this.publicCloud = ipPort.split(":")[0].endsWith("aliyuncs.com");
} else {
throw new RuntimeException("Invalid argument:" + jdbcUrl);
}
}
public ServerConnectInfo(final String jdbcUrl, final String username, final String password, Configuration config) {
this.jdbcUrl = jdbcUrl;
this.password = password;
this.config = config;
parseJdbcUrl(jdbcUrl);
parseFullUserName(username);
}
private void parseFullUserName(final String fullUserName) {
int tenantIndex = fullUserName.indexOf("@");
int clusterIndex = fullUserName.indexOf("#");
if (fullUserName.contains(":") && tenantIndex < 0) {
String[] names = fullUserName.split(":");
if (names.length != 3) {
throw new RuntimeException("invalid argument: " + fullUserName);
} else {
this.clusterName = names[0];
this.tenantName = names[1];
this.userName = names[2];
}
} else if (!publicCloud || tenantIndex < 0) {
this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex);
this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1);
// Avoid reporting errors when users do not write #
this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex < 0 ? fullUserName.length() : clusterIndex);
} else {
// If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created.
this.userName = fullUserName.substring(0, tenantIndex);
if (clusterIndex > tenantIndex) {
this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex);
this.clusterName = fullUserName.substring(clusterIndex + 1);
} else {
this.tenantName = fullUserName.substring(tenantIndex + 1);
this.clusterName = EMPTY;
}
}
}
private void parseJdbcUrl(final String jdbcUrl) {
Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w]+)\\?");
Matcher matcher = pattern.matcher(jdbcUrl);
if (matcher.find()) {
String ipPort = matcher.group(1);
String dbName = matcher.group(2);
this.ipPort = ipPort;
this.host = ipPort.split(":")[0];
this.port = ipPort.split(":")[1];
this.databaseName = dbName;
this.publicCloud = host.endsWith("aliyuncs.com");
} else {
throw new RuntimeException("Invalid argument:" + jdbcUrl);
}
}
@Override
public String toString() {
StringBuffer strBuffer = new StringBuffer();
return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName)
.append(", userName:").append(userName).append(", databaseName:").append(databaseName)
.append(", ipPort:").append(ipPort).append(", jdbcUrl:").append(jdbcUrl).toString();
}
protected void parseFullUserName(final String fullUserName) {
int tenantIndex = fullUserName.indexOf("@");
int clusterIndex = fullUserName.indexOf("#");
// 适用于jdbcUrl以||_dsc_ob10_dsc_开头的场景
if (fullUserName.contains(":") && tenantIndex < 0) {
String[] names = fullUserName.split(":");
if (names.length != 3) {
throw new RuntimeException("invalid argument: " + fullUserName);
} else {
this.clusterName = names[0];
this.tenantName = names[1];
this.userName = names[2];
}
} else if (tenantIndex < 0) {
// 适用于short jdbcUrl且username中不含租户名主要是公有云场景此场景下不计算分区
this.userName = fullUserName;
this.clusterName = EMPTY;
this.tenantName = EMPTY;
} else {
// 适用于short jdbcUrl且username中含租户名
this.userName = fullUserName.substring(0, tenantIndex);
if (clusterIndex < 0) {
this.clusterName = EMPTY;
this.tenantName = fullUserName.substring(tenantIndex + 1);
} else {
this.clusterName = fullUserName.substring(clusterIndex + 1);
this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex);
}
}
}
public String getFullUserName() {
StringBuilder builder = new StringBuilder();
builder.append(userName);
if (!EMPTY.equals(tenantName)) {
builder.append("@").append(tenantName);
}
@Override
public String toString() {
return "ServerConnectInfo{" +
"clusterName='" + clusterName + '\'' +
", tenantName='" + tenantName + '\'' +
", userName='" + userName + '\'' +
", password='" + password + '\'' +
", databaseName='" + databaseName + '\'' +
", ipPort='" + ipPort + '\'' +
", jdbcUrl='" + jdbcUrl + '\'' +
", host='" + host + '\'' +
", publicCloud=" + publicCloud +
", rpcPort=" + rpcPort +
'}';
}
if (!EMPTY.equals(clusterName)) {
builder.append("#").append(clusterName);
}
if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) {
return this.userName;
}
return builder.toString();
}
public String getFullUserName() {
StringBuilder builder = new StringBuilder();
builder.append(userName);
if (!EMPTY.equals(tenantName)) {
builder.append("@").append(tenantName);
}
if (!EMPTY.equals(clusterName)) {
builder.append("#").append(clusterName);
}
if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) {
return this.userName;
}
return builder.toString();
}
public void setRpcPort(int rpcPort) {
this.rpcPort = rpcPort;
}
}

View File

@ -0,0 +1,127 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractInsertTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractInsertTask.class);
protected final long taskId;
protected ConcurrentTableWriterTask writerTask;
protected ConcurrentTableWriterTask.ConcurrentTableWriter writer;
protected Queue<List<Record>> queue;
protected boolean isStop;
protected Configuration config;
protected ServerConnectInfo connInfo;
protected AbstractConnHolder connHolder;
protected long totalCost = 0;
protected long insertCount = 0;
private boolean printCost = Config.DEFAULT_PRINT_COST;
private long costBound = Config.DEFAULT_COST_BOUND;
public AbstractInsertTask(final long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
this.taskId = taskId;
this.queue = recordsQueue;
this.config = config;
this.connInfo = connectInfo;
this.isStop = false;
this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST);
this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND);
this.writer = writer;
this.writerTask = task;
initConnHolder();
}
public AbstractInsertTask(final long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo) {
this.taskId = taskId;
this.queue = recordsQueue;
this.config = config;
this.connInfo = connectInfo;
this.isStop = false;
this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST);
this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND);
initConnHolder();
}
protected abstract void initConnHolder();
public void setWriterTask(ConcurrentTableWriterTask writerTask) {
this.writerTask = writerTask;
}
public void setWriter(ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
this.writer = writer;
}
private boolean isStop() {
return isStop;
}
public void setStop() {
isStop = true;
}
public AbstractConnHolder getConnHolder() {
return connHolder;
}
public void calStatistic(final long cost) {
writer.increFinishCount();
insertCount++;
totalCost += cost;
if (this.printCost && cost > this.costBound) {
LOG.info("slow multi insert cost {}ms", cost);
}
}
@Override
public void run() {
Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId()));
LOG.debug("Task {} start to execute...", taskId);
while (!isStop()) {
try {
List<Record> records = queue.poll();
if (null != records) {
write(records);
} else if (writerTask.isFinished()) {
writerTask.singalTaskFinish();
LOG.debug("not more task, thread exist ...");
break;
} else {
TimeUnit.MILLISECONDS.sleep(5);
}
} catch (InterruptedException e) {
LOG.debug("TableWriter is interrupt");
} catch (Exception e) {
LOG.warn("ERROR UNEXPECTED ", e);
break;
}
}
LOG.debug("Thread exist...");
}
protected abstract void write(List<Record> records);
public long getTotalCost() {
return totalCost;
}
public long getInsertCount() {
return insertCount;
}
public void destroy() {
if (connHolder != null) {
connHolder.destroy();
}
}
}

View File

@ -18,7 +18,9 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.IObPartCalculator
import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV1;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV2;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import com.oceanbase.partition.calculator.enums.ObServerMode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@ -34,8 +36,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.Config.DEFAULT_SLOW_MEMSTORE_THRESHOLD;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.FAST;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.PAUSE;
@ -44,15 +48,15 @@ import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUt
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
private volatile ObWriterUtils.LoadMode loadMode = FAST;
private volatile ObWriterUtils.LoadMode loadMode = FAST;
private static AtomicLong totalTask = new AtomicLong(0);
private long taskId = -1;
private AtomicBoolean isMemStoreFull = new AtomicBoolean(false);
@ -69,38 +73,41 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private String obUpdateColumns = null;
private String dbName;
private int calPartFailedCount = 0;
private boolean directPath;
public ConcurrentTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
taskId = totalTask.getAndIncrement();
}
public ConcurrentTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
taskId = totalTask.getAndIncrement();
}
@Override
public void init(Configuration config) {
super.init(config);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
@Override
public void init(Configuration config) {
super.init(config);
this.directPath = config.getBool(Config.DIRECT_PATH, false);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
obWriteMode = config.getString(Config.OB_WRITE_MODE, "update");
ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password);
dbName = connectInfo.databaseName;
//init check memstore
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password, config);
connectInfo.setRpcPort(config.getInt(Config.RPC_PORT, 0));
dbName = connectInfo.databaseName;
//init check memstore
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
this.isOracleCompatibleMode = ObWriterUtils.isOracleMode();
if (isOracleCompatibleMode) {
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
//在转义的情况下不翻译
if (!(table.startsWith("\"") && table.endsWith("\""))) {
table = table.toUpperCase();
}
this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
this.isOracleCompatibleMode = ObWriterUtils.isOracleMode();
if (isOracleCompatibleMode) {
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
//在转义的情况下不翻译
if (!(table.startsWith("\"") && table.endsWith("\""))) {
table = table.toUpperCase();
}
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
connectInfo.databaseName, table));
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
connectInfo.databaseName, table));
}
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
@ -135,37 +142,37 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
return new ObPartitionCalculatorV1(connectInfo, table, columns);
}
public boolean isFinished() {
return allTaskInQueue && concurrentWriter.checkFinish();
}
public boolean allTaskInQueue() {
return allTaskInQueue;
}
public void setPutAllTaskInQueue() {
this.allTaskInQueue = true;
LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}",
concurrentWriter.getTaskQueueSize(),
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount());
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) {
// change obWriteMode to insert so the insert statement will be generated.
obWriteMode = "insert";
}
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
LOG.info("writeRecordSql :{}", this.writeRecordSql);
}
public boolean isFinished() {
return allTaskInQueue && concurrentWriter.checkFinish();
}
public boolean allTaskInQueue() {
return allTaskInQueue;
}
public void setPutAllTaskInQueue() {
this.allTaskInQueue = true;
LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}",
concurrentWriter.getTaskQueueSize(),
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount());
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) {
// change obWriteMode to insert so the insert statement will be generated.
obWriteMode = "insert";
}
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
LOG.info("writeRecordSql :{}", this.writeRecordSql);
}
@Override
public void prepare(Configuration writerSliceConfig) {
super.prepare(writerSliceConfig);
concurrentWriter.start();
}
public void prepare(Configuration writerSliceConfig) {
super.prepare(writerSliceConfig);
concurrentWriter.start();
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
@ -175,25 +182,25 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
int retryTimes = 0;
boolean needRetry = false;
do {
try {
if (retryTimes > 0) {
TimeUnit.SECONDS.sleep((1 << retryTimes));
DBUtil.closeDBResources(null, connection);
connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes);
}
ColumnMetaCache.init(connection, this.table, this.columns);
this.resultSetMetaData = ColumnMetaCache.getColumnMeta();
needRetry = false;
} catch (SQLException e) {
needRetry = true;
++retryTimes;
e.printStackTrace();
LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes);
} catch (InterruptedException e) {
LOG.warn("startWriteWithConnection interrupt, ignored");
} finally {
}
try {
if (retryTimes > 0) {
TimeUnit.SECONDS.sleep((1 << retryTimes));
DBUtil.closeDBResources(null, connection);
connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes);
}
ColumnMetaCache.init(connection, this.table, this.columns);
this.resultSetMetaData = ColumnMetaCache.getColumnMeta();
needRetry = false;
} catch (SQLException e) {
needRetry = true;
++retryTimes;
e.printStackTrace();
LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes);
} catch (InterruptedException e) {
LOG.warn("startWriteWithConnection interrupt, ignored");
} finally {
}
} while (needRetry && retryTimes < 100);
try {
@ -202,8 +209,8 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等直接报错
LOG.error("column not equal {} != {}, record = {}",
this.columnNumber, record.getColumnNumber(), record.toString());
LOG.error("column not equal {} != {}, record = {}",
this.columnNumber, record.getColumnNumber(), record.toString());
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
@ -223,388 +230,406 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
}
}
public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
return fillPreparedStatement(preparedStatement, record);
}
public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
return fillPreparedStatement(preparedStatement, record);
}
private void addLeftRecords() {
//不需要刷新Cache已经是最后一批数据了
for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0 ) {
addRecordsToWriteQueue(groupValues);
}
}
}
private void addRecordToCache(final Record record) {
Long partId =null;
try {
partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record);
} catch (Exception e1) {
if (calPartFailedCount++ < 10) {
LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record);
}
}
private void addLeftRecords() {
//不需要刷新Cache已经是最后一批数据了
for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0) {
addRecordsToWriteQueue(groupValues);
}
}
}
private void addRecordToCache(final Record record) {
Long partId = null;
try {
partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record);
} catch (Exception e1) {
if (calPartFailedCount++ < 10) {
LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record);
}
}
if (partId == null) {
LOG.debug("fail to calculate parition id, just put into the default buffer.");
partId = Long.MAX_VALUE;
}
List<Record> groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList<Record>(batchSize));
groupValues.add(record);
if (groupValues.size() >= batchSize) {
groupValues = addRecordsToWriteQueue(groupValues);
groupInsertValues.put(partId, groupValues);
}
}
List<Record> groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList<Record>(batchSize));
groupValues.add(record);
if (groupValues.size() >= batchSize) {
groupValues = addRecordsToWriteQueue(groupValues);
groupInsertValues.put(partId, groupValues);
}
}
/**
*
* @param records
* @return 返回一个新的Cache用于存储接下来的数据
*/
private List<Record> addRecordsToWriteQueue(List<Record> records) {
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(records);
break;
} catch (InterruptedException e) {
i++;
LOG.info("Concurrent table writer is interrupted");
}
}
return new ArrayList<Record>(batchSize);
}
private void checkMemStore() {
Connection checkConn = connHolder.getConn();
try {
if (checkConn == null || checkConn.isClosed()) {
checkConn = connHolder.reconnect();
}
}catch (Exception e) {
LOG.warn("Check connection is unusable");
}
/**
* @param records
* @return 返回一个新的Cache用于存储接下来的数据
*/
private List<Record> addRecordsToWriteQueue(List<Record> records) {
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(records);
break;
} catch (InterruptedException e) {
i++;
LOG.info("Concurrent table writer is interrupted");
}
}
return new ArrayList<Record>(batchSize);
}
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn);
if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) {
this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW;
LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name());
}else {
this.loadMode = FAST;
}
lastCheckMemstoreTime = now;
}
public boolean isMemStoreFull() {
return isMemStoreFull.get();
}
private void checkMemStore() {
Connection checkConn = connHolder.getConn();
try {
if (checkConn == null || checkConn.isClosed()) {
checkConn = connHolder.reconnect();
}
} catch (Exception e) {
LOG.warn("Check connection is unusable");
}
public boolean isShouldPause() {
return this.loadMode.equals(PAUSE);
}
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn);
if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) {
this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW;
LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name());
} else {
this.loadMode = FAST;
}
lastCheckMemstoreTime = now;
}
public boolean isShouldSlow() {
return this.loadMode.equals(SLOW);
}
public void print() {
if (LOG.isDebugEnabled()) {
LOG.debug("Statistic total task {}, finished {}, queue Size {}",
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount(),
concurrentWriter.getTaskQueueSize());
concurrentWriter.printStatistics();
}
}
public void waitTaskFinish() {
setPutAllTaskInQueue();
lock.lock();
try {
while (!concurrentWriter.checkFinish()) {
condition.await(15, TimeUnit.SECONDS);
print();
checkMemStore();
}
} catch (InterruptedException e) {
LOG.warn("Concurrent table writer wait task finish interrupt");
} finally {
lock.unlock();
}
LOG.debug("wait all InsertTask finished ...");
}
public void singalTaskFinish() {
lock.lock();
condition.signal();
lock.unlock();
}
@Override
public void destroy(Configuration writerSliceConfig) {
if(concurrentWriter!=null) {
concurrentWriter.destory();
}
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
super.destroy(writerSliceConfig);
}
public class ConcurrentTableWriter {
private BlockingQueue<List<Record>> queue;
private List<InsertTask> insertTasks;
private Configuration config;
private ServerConnectInfo connectInfo;
private String rewriteRecordSql;
private AtomicLong totalTaskCount;
private AtomicLong finishTaskCount;
private final int threadCount;
public boolean isMemStoreFull() {
return isMemStoreFull.get();
}
public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) {
threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT);
queue = new LinkedBlockingQueue<List<Record>>(threadCount << 1);
insertTasks = new ArrayList<InsertTask>(threadCount);
this.config = config;
this.connectInfo = connInfo;
this.rewriteRecordSql = rewriteRecordSql;
this.totalTaskCount = new AtomicLong(0);
this.finishTaskCount = new AtomicLong(0);
}
public long getTotalTaskCount() {
return totalTaskCount.get();
}
public long getFinishTaskCount() {
return finishTaskCount.get();
}
public int getTaskQueueSize() {
return queue.size();
}
public void increFinishCount() {
finishTaskCount.incrementAndGet();
}
//should check after put all the task in the queue
public boolean checkFinish() {
long finishCount = finishTaskCount.get();
long totalCount = totalTaskCount.get();
return finishCount == totalCount;
}
public synchronized void start() {
for (int i = 0; i < threadCount; ++i) {
LOG.info("start {} insert task.", (i+1));
InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql);
insertTask.setWriterTask(ConcurrentTableWriterTask.this);
insertTask.setWriter(this);
insertTasks.add(insertTask);
}
WriterThreadPool.executeBatch(insertTasks);
}
public void printStatistics() {
long insertTotalCost = 0;
long insertTotalCount = 0;
for (InsertTask task: insertTasks) {
insertTotalCost += task.getTotalCost();
insertTotalCount += task.getInsertCount();
}
long avgCost = 0;
if (insertTotalCount != 0) {
avgCost = insertTotalCost / insertTotalCount;
}
ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms",
insertTotalCount, insertTotalCost, avgCost);
}
public boolean isShouldPause() {
return this.loadMode.equals(PAUSE);
}
public void addBatchRecords(final List<Record> records) throws InterruptedException {
boolean isSucc = false;
while (!isSucc) {
isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS);
checkMemStore();
}
totalTaskCount.incrementAndGet();
}
public synchronized void destory() {
if (insertTasks != null) {
for(InsertTask task : insertTasks) {
task.setStop();
}
for(InsertTask task: insertTasks) {
task.destroy();
}
}
}
}
public boolean isShouldSlow() {
return this.loadMode.equals(SLOW);
}
// 直接使用了两个类变量columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
public void print() {
if (LOG.isDebugEnabled()) {
LOG.debug("Statistic total task {}, finished {}, queue Size {}",
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount(),
concurrentWriter.getTaskQueueSize());
concurrentWriter.printStatistics();
}
}
return preparedStatement;
}
public void waitTaskFinish() {
setPutAllTaskInQueue();
lock.lock();
try {
while (!concurrentWriter.checkFinish()) {
condition.await(15, TimeUnit.SECONDS);
print();
checkMemStore();
}
concurrentWriter.doCommit();
} catch (InterruptedException e) {
LOG.warn("Concurrent table writer wait task finish interrupt");
} finally {
lock.unlock();
}
LOG.debug("wait all InsertTask finished ...");
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
public void singalTaskFinish() {
lock.lock();
condition.signal();
lock.unlock();
}
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, strValue);
}
break;
@Override
public void destroy(Configuration writerSliceConfig) {
if (concurrentWriter != null) {
concurrentWriter.destory();
}
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
super.destroy(writerSliceConfig);
}
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
}
break;
public class ConcurrentTableWriter {
private BlockingQueue<List<Record>> queue;
private List<AbstractInsertTask> abstractInsertTasks;
private Configuration config;
private ServerConnectInfo connectInfo;
private String rewriteRecordSql;
private AtomicLong totalTaskCount;
private AtomicLong finishTaskCount;
private final int threadCount;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) {
threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT);
queue = new LinkedBlockingQueue<List<Record>>(threadCount << 1);
abstractInsertTasks = new ArrayList<AbstractInsertTask>(threadCount);
this.config = config;
this.connectInfo = connInfo;
this.rewriteRecordSql = rewriteRecordSql;
this.totalTaskCount = new AtomicLong(0);
this.finishTaskCount = new AtomicLong(0);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
public long getTotalTaskCount() {
return totalTaskCount.get();
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
public long getFinishTaskCount() {
return finishTaskCount.get();
}
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
public int getTaskQueueSize() {
return queue.size();
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
public void increFinishCount() {
finishTaskCount.incrementAndGet();
}
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
//should check after put all the task in the queue
public boolean checkFinish() {
long finishCount = finishTaskCount.get();
long totalCount = totalTaskCount.get();
return finishCount == totalCount;
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BINARY:
String isArray = column.getRawData().toString();
if (isArray.startsWith("[")&&isArray.endsWith("]")){
preparedStatement.setString(columnIndex + 1, column
.asString());
}else {
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
break;
public synchronized void start() {
for (int i = 0; i < threadCount; ++i) {
LOG.info("start {} insert task.", (i + 1));
AbstractInsertTask insertTask = null;
if (directPath) {
insertTask = new DirectPathInsertTask(taskId, queue, config, connectInfo, ConcurrentTableWriterTask.this, this);
} else {
insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql);
}
insertTask.setWriterTask(ConcurrentTableWriterTask.this);
insertTask.setWriter(this);
abstractInsertTasks.add(insertTask);
}
WriterThreadPool.executeBatch(abstractInsertTasks);
}
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
public void doCommit() {
this.abstractInsertTasks.get(0).getConnHolder().doCommit();
}
public int getThreadCount() {
return threadCount;
}
public void printStatistics() {
long insertTotalCost = 0;
long insertTotalCount = 0;
for (AbstractInsertTask task : abstractInsertTasks) {
insertTotalCost += task.getTotalCost();
insertTotalCount += task.getInsertCount();
}
long avgCost = 0;
if (insertTotalCount != 0) {
avgCost = insertTotalCost / insertTotalCount;
}
ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms",
insertTotalCount, insertTotalCost, avgCost);
}
public void addBatchRecords(final List<Record> records) throws InterruptedException {
boolean isSucc = false;
while (!isSucc) {
isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS);
checkMemStore();
}
totalTaskCount.incrementAndGet();
}
public synchronized void destory() {
if (abstractInsertTasks != null) {
for (AbstractInsertTask task : abstractInsertTasks) {
task.setStop();
}
for (AbstractInsertTask task : abstractInsertTasks) {
task.destroy();
}
}
}
}
public String getTable() {
return table;
}
// 直接使用了两个类变量columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
return preparedStatement;
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, strValue);
}
break;
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
}
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BINARY:
String isArray = column.getRawData().toString();
if (isArray.startsWith("[") && isArray.endsWith("]")) {
preparedStatement.setString(columnIndex + 1, column
.asString());
} else {
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.TableCache;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathPreparedStatement;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.DirectPathConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirectPathInsertTask extends AbstractInsertTask {
private static final Logger LOG = LoggerFactory.getLogger(DirectPathInsertTask.class);
public DirectPathInsertTask(long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
super(taskId, recordsQueue, config, connectInfo, task, writer);
}
@Override
protected void initConnHolder() {
this.connHolder = new DirectPathConnHolder(config, connInfo, writerTask.getTable(), writer.getThreadCount());
this.connHolder.initConnection();
}
@Override
protected void write(List<Record> records) {
Table table = TableCache.getInstance().getTable(connInfo.databaseName, writerTask.getTable());
if (Table.Status.FAILURE.equals(table.getStatus())) {
return;
}
DirectPathConnection conn = (DirectPathConnection) connHolder.getConn();
if (records != null && !records.isEmpty()) {
long startTime = System.currentTimeMillis();
try (DirectPathPreparedStatement stmt = conn.createStatement()) {
final int columnNumber = records.get(0).getColumnNumber();
Object[] values = new Object[columnNumber];
for (Record record : records) {
for (int i = 0; i < columnNumber; i++) {
values[i] = record.getColumn(i).getRawData();
}
stmt.addBatch(values);
}
int[] result = stmt.executeBatch();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Insert {} rows success", Thread.currentThread().getName(), Arrays.stream(result).sum());
}
calStatistic(System.currentTimeMillis() - startTime);
stmt.clearBatch();
} catch (Throwable ex) {
String msg = MessageFormat.format("Insert data into table \"{0}\" failed. Error: {1}", writerTask.getTable(), ex.getMessage());
LOG.error(msg, ex);
table.setError(ex);
table.setStatus(Table.Status.FAILURE);
throw new RuntimeException(msg);
}
}
}
}

View File

@ -20,7 +20,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class InsertTask implements Runnable {
public class InsertTask extends AbstractInsertTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class);
@ -49,6 +49,7 @@ public class InsertTask implements Runnable {
Configuration config,
ServerConnectInfo connectInfo,
String writeRecordSql) {
super(taskId, recordsQueue, config, connectInfo);
this.taskId = taskId;
this.queue = recordsQueue;
this.connInfo = connectInfo;
@ -62,11 +63,15 @@ public class InsertTask implements Runnable {
connHolder.initConnection();
}
void setWriterTask(ConcurrentTableWriterTask writerTask) {
protected void initConnHolder() {
}
public void setWriterTask(ConcurrentTableWriterTask writerTask) {
this.writerTask = writerTask;
}
void setWriter(ConcurrentTableWriter writer) {
public void setWriter(ConcurrentTableWriter writer) {
this.writer = writer;
}
@ -109,6 +114,10 @@ public class InsertTask implements Runnable {
LOG.debug("Thread exist...");
}
protected void write(List<Record> records) {
}
public void destroy() {
connHolder.destroy();
}

View File

@ -4,6 +4,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,8 +30,8 @@ public class WriterThreadPool {
executorService.execute(task);
}
public static synchronized void executeBatch(List<InsertTask> tasks) {
for (InsertTask task : tasks) {
public static synchronized void executeBatch(List<AbstractInsertTask> tasks) {
for (AbstractInsertTask task : tasks) {
executorService.execute(task);
}
}