Merge pull request #2271 from xxsc0529/master

[FIX]:The defects of the OceanBase plugin are fixed
This commit is contained in:
dingxiaobo 2025-02-18 13:53:28 +08:00 committed by GitHub
commit b612516472
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 303 additions and 6 deletions

View File

@ -39,7 +39,7 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version> <version>8.0.28</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>

View File

@ -8,4 +8,6 @@ public class Constant {
public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s "; public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s ";
public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)"; public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)";
public static final int ORACLE_DEFAULT_FETCH_SIZE = 1024;
} }

View File

@ -1,7 +1,14 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.statistics.PerfRecord; import com.alibaba.datax.common.statistics.PerfRecord;
@ -11,6 +18,7 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config; import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
@ -22,6 +30,9 @@ import org.slf4j.LoggerFactory;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import static com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.Constant.ORACLE_DEFAULT_FETCH_SIZE;
public class ReaderTask extends CommonRdbmsReader.Task { public class ReaderTask extends CommonRdbmsReader.Task {
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class); private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
@ -36,6 +47,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
private int readBatchSize; private int readBatchSize;
private int retryLimit = 0; private int retryLimit = 0;
private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL;
private static final boolean IS_DEBUG = LOG.isDebugEnabled();
private boolean reuseConn = false; private boolean reuseConn = false;
public ReaderTask(int taskGroupId, int taskId) { public ReaderTask(int taskGroupId, int taskId) {
@ -247,7 +259,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
} }
} }
// 打开流式接口 // 打开流式接口
//oceanbase的oracle模式FetchSize同oracle插件的设置否则会报错invalid fetch size. in Oracle mode,
// 具体请看com.oceanbase.jdbc.OceanBaseStatement#setFetchSize
if (context.getFetchSize()==Integer.MIN_VALUE
&& Objects.equals(compatibleMode, ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)){
ps.setFetchSize(ORACLE_DEFAULT_FETCH_SIZE);
}else {
ps.setFetchSize(context.getFetchSize()); ps.setFetchSize(context.getFetchSize());
}
rs = ps.executeQuery(); rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
int columnNumber = metaData.getColumnCount(); int columnNumber = metaData.getColumnCount();
@ -294,4 +313,119 @@ public class ReaderTask extends CommonRdbmsReader.Task {
} }
} }
} }
//重写方法支持array类型
protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
TaskPluginCollector taskPluginCollector) {
Record record = recordSender.createRecord();
try {
for (int i = 1; i <= columnNumber; i++) {
switch (metaData.getColumnType(i)) {
case Types.CHAR:
case Types.NCHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
String rawData;
if(StringUtils.isBlank(mandatoryEncoding)){
rawData = rs.getString(i);
}else{
rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY :
rs.getBytes(i)), mandatoryEncoding);
}
record.addColumn(new StringColumn(rawData));
break;
case Types.CLOB:
case Types.NCLOB:
record.addColumn(new StringColumn(rs.getString(i)));
break;
case Types.SMALLINT:
case Types.TINYINT:
case Types.INTEGER:
case Types.BIGINT:
record.addColumn(new LongColumn(rs.getString(i)));
break;
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
record.addColumn(new DoubleColumn(rs.getString(i)));
break;
case Types.TIME:
record.addColumn(new DateColumn(rs.getTime(i)));
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
record.addColumn(new LongColumn(rs.getInt(i)));
} else {
record.addColumn(new DateColumn(rs.getDate(i)));
}
break;
case Types.TIMESTAMP:
record.addColumn(new DateColumn(rs.getTimestamp(i)));
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
record.addColumn(new BytesColumn(rs.getBytes(i)));
break;
case Types.BINARY:
String isArray = rs.getString(i);
if (isArray.startsWith("[")&& isArray.endsWith("]")){
record.addColumn(new StringColumn(rs.getString(i)));
}else {
record.addColumn(new BytesColumn(rs.getBytes(i)));
}
break;
// warn: bit(1) -> Types.BIT 可使用BoolColumn
// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
case Types.BOOLEAN:
case Types.BIT:
record.addColumn(new BoolColumn(rs.getBoolean(i)));
break;
case Types.NULL:
String stringData = null;
if(rs.getObject(i) != null) {
stringData = rs.getObject(i).toString();
}
record.addColumn(new StringColumn(stringData));
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
metaData.getColumnName(i),
metaData.getColumnType(i),
metaData.getColumnClassName(i)));
}
}
} catch (Exception e) {
if (IS_DEBUG) {
LOG.debug("read data " + record.toString()
+ " occur exception:", e);
}
//TODO 这里识别为脏数据靠谱吗
taskPluginCollector.collectDirtyRecord(record, e);
if (e instanceof DataXException) {
throw (DataXException) e;
}
}
return record;
}
} }

View File

@ -731,7 +731,8 @@ public class ObReaderUtils {
} else if (c instanceof DateColumn) { } else if (c instanceof DateColumn) {
ps.setTimestamp(i + 1, new Timestamp(((DateColumn) c).asDate().getTime())); ps.setTimestamp(i + 1, new Timestamp(((DateColumn) c).asDate().getTime()));
} else if (c instanceof DoubleColumn) { } else if (c instanceof DoubleColumn) {
ps.setDouble(i + 1, ((DoubleColumn) c).asDouble()); // If the byte length is larger than the size that the double can bear, use BigDecimal to ensure accuracy
ps.setBigDecimal(i + 1, ((DoubleColumn) c).asBigDecimal());
} else if (c instanceof LongColumn) { } else if (c instanceof LongColumn) {
ps.setLong(i + 1, ((LongColumn) c).asLong()); ps.setLong(i + 1, ((LongColumn) c).asLong());
} else if (c instanceof StringColumn) { } else if (c instanceof StringColumn) {

View File

@ -117,8 +117,8 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.oceanbase</groupId> <groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId> <artifactId>obkv-hbase-client</artifactId>
<version>1.2.6</version> <version>2.1.0</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>

View File

@ -58,7 +58,8 @@ public class ServerConnectInfo {
} else if (!publicCloud || tenantIndex < 0) { } else if (!publicCloud || tenantIndex < 0) {
this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex); this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex);
this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1); this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1);
this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex); // Avoid reporting errors when users do not write #
this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex < 0 ? fullUserName.length() : clusterIndex);
} else { } else {
// If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created. // If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created.
this.userName = fullUserName.substring(0, tenantIndex); this.userName = fullUserName.substring(0, tenantIndex);

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
@ -21,6 +22,7 @@ import com.oceanbase.partition.calculator.enums.ObServerMode;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -448,4 +450,161 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
} }
} }
} }
// 直接使用了两个类变量columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
return preparedStatement;
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, strValue);
}
break;
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
}
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BINARY:
String isArray = column.getRawData().toString();
if (isArray.startsWith("[")&&isArray.endsWith("]")){
preparedStatement.setString(columnIndex + 1, column
.asString());
}else {
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
} }