diff --git a/oceanbasev10reader/pom.xml b/oceanbasev10reader/pom.xml index 49477241..66965320 100644 --- a/oceanbasev10reader/pom.xml +++ b/oceanbasev10reader/pom.xml @@ -39,7 +39,7 @@ mysql mysql-connector-java - 5.1.40 + 8.0.28 log4j diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java index 57977ca4..7988bc99 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/Constant.java @@ -8,4 +8,6 @@ public class Constant { public static String WEAK_READ_QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select /*+read_consistency(weak)*/ %s from %s "; public static String WEAK_READ_QUERY_SQL_TEMPLATE = "select /*+read_consistency(weak)*/ %s from %s where (%s)"; + + public static final int ORACLE_DEFAULT_FETCH_SIZE = 1024; } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java index a43dcebd..652f0887 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java @@ -1,7 +1,14 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext; +import com.alibaba.datax.common.element.BoolColumn; +import com.alibaba.datax.common.element.BytesColumn; import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DateColumn; +import com.alibaba.datax.common.element.DoubleColumn; +import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.statistics.PerfRecord; @@ -11,6 +18,7 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config; import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils; @@ -22,6 +30,9 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.ArrayList; import java.util.List; +import java.util.Objects; + +import static com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.Constant.ORACLE_DEFAULT_FETCH_SIZE; public class ReaderTask extends CommonRdbmsReader.Task { private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class); @@ -36,6 +47,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { private int readBatchSize; private int retryLimit = 0; private String compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_MYSQL; + private static final boolean IS_DEBUG = LOG.isDebugEnabled(); private boolean reuseConn = false; public ReaderTask(int taskGroupId, int taskId) { @@ -247,7 +259,14 @@ public class ReaderTask extends CommonRdbmsReader.Task { } } // 打开流式接口 - ps.setFetchSize(context.getFetchSize()); + //oceanbase的oracle模式FetchSize同oracle插件的设置,否则会报错。invalid fetch size. in Oracle mode, + // 具体请看com.oceanbase.jdbc.OceanBaseStatement#setFetchSize + if (context.getFetchSize()==Integer.MIN_VALUE + && Objects.equals(compatibleMode, ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)){ + ps.setFetchSize(ORACLE_DEFAULT_FETCH_SIZE); + }else { + ps.setFetchSize(context.getFetchSize()); + } rs = ps.executeQuery(); ResultSetMetaData metaData = rs.getMetaData(); int columnNumber = metaData.getColumnCount(); @@ -294,4 +313,119 @@ public class ReaderTask extends CommonRdbmsReader.Task { } } } + + //重写方法支持array类型 + protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, + TaskPluginCollector taskPluginCollector) { + Record record = recordSender.createRecord(); + + try { + for (int i = 1; i <= columnNumber; i++) { + switch (metaData.getColumnType(i)) { + + case Types.CHAR: + case Types.NCHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + String rawData; + if(StringUtils.isBlank(mandatoryEncoding)){ + rawData = rs.getString(i); + }else{ + rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY : + rs.getBytes(i)), mandatoryEncoding); + } + record.addColumn(new StringColumn(rawData)); + break; + + case Types.CLOB: + case Types.NCLOB: + record.addColumn(new StringColumn(rs.getString(i))); + break; + + case Types.SMALLINT: + case Types.TINYINT: + case Types.INTEGER: + case Types.BIGINT: + record.addColumn(new LongColumn(rs.getString(i))); + break; + + case Types.NUMERIC: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + record.addColumn(new DoubleColumn(rs.getString(i))); + break; + + case Types.TIME: + record.addColumn(new DateColumn(rs.getTime(i))); + break; + + // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 + case Types.DATE: + if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) { + record.addColumn(new LongColumn(rs.getInt(i))); + } else { + record.addColumn(new DateColumn(rs.getDate(i))); + } + break; + + case Types.TIMESTAMP: + record.addColumn(new DateColumn(rs.getTimestamp(i))); + break; + + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + record.addColumn(new BytesColumn(rs.getBytes(i))); + break; + case Types.BINARY: + String isArray = rs.getString(i); + if (isArray.startsWith("[")&& isArray.endsWith("]")){ + record.addColumn(new StringColumn(rs.getString(i))); + }else { + record.addColumn(new BytesColumn(rs.getBytes(i))); + } + break; + // warn: bit(1) -> Types.BIT 可使用BoolColumn + // warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn + case Types.BOOLEAN: + case Types.BIT: + record.addColumn(new BoolColumn(rs.getBoolean(i))); + break; + + case Types.NULL: + String stringData = null; + if(rs.getObject(i) != null) { + stringData = rs.getObject(i).toString(); + } + record.addColumn(new StringColumn(stringData)); + break; + + default: + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .", + metaData.getColumnName(i), + metaData.getColumnType(i), + metaData.getColumnClassName(i))); + } + } + } catch (Exception e) { + if (IS_DEBUG) { + LOG.debug("read data " + record.toString() + + " occur exception:", e); + } + //TODO 这里识别为脏数据靠谱吗? + taskPluginCollector.collectDirtyRecord(record, e); + if (e instanceof DataXException) { + throw (DataXException) e; + } + } + return record; + } } diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index cca2f66c..d6dd7cf5 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -731,7 +731,8 @@ public class ObReaderUtils { } else if (c instanceof DateColumn) { ps.setTimestamp(i + 1, new Timestamp(((DateColumn) c).asDate().getTime())); } else if (c instanceof DoubleColumn) { - ps.setDouble(i + 1, ((DoubleColumn) c).asDouble()); + // If the byte length is larger than the size that the double can bear, use BigDecimal to ensure accuracy + ps.setBigDecimal(i + 1, ((DoubleColumn) c).asBigDecimal()); } else if (c instanceof LongColumn) { ps.setLong(i + 1, ((LongColumn) c).asLong()); } else if (c instanceof StringColumn) { diff --git a/oceanbasev10writer/pom.xml b/oceanbasev10writer/pom.xml index 4f9cbf52..22b64c58 100644 --- a/oceanbasev10writer/pom.xml +++ b/oceanbasev10writer/pom.xml @@ -117,8 +117,8 @@ com.oceanbase - obkv-table-client - 1.2.6 + obkv-hbase-client + 2.1.0 com.alibaba diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java index fe8889e1..e7489dda 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java @@ -58,7 +58,8 @@ public class ServerConnectInfo { } else if (!publicCloud || tenantIndex < 0) { this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex); this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1); - this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex); + // Avoid reporting errors when users do not write # + this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex < 0 ? fullUserName.length() : clusterIndex); } else { // If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created. this.userName = fullUserName.substring(0, tenantIndex); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index 0ad3a1ed..8867f3e2 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; +import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; @@ -21,6 +22,7 @@ import com.oceanbase.partition.calculator.enums.ObServerMode; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -448,4 +450,161 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } } } + + // 直接使用了两个类变量:columnNumber,resultSetMetaData + protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record) + throws SQLException { + for (int i = 0; i < this.columnNumber; i++) { + int columnSqltype = this.resultSetMetaData.getMiddle().get(i); + String typeName = this.resultSetMetaData.getRight().get(i); + preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i)); + } + + return preparedStatement; + } + + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, + int columnSqltype, String typeName, Column column) throws SQLException { + java.util.Date utilDate; + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + break; + + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.NUMERIC: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setString(columnIndex + 1, strValue); + } + break; + + //tinyint is a little special in some database like mysql {boolean->tinyint(1)} + case Types.TINYINT: + Long longValue = column.asLong(); + if (null == longValue) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setString(columnIndex + 1, longValue.toString()); + } + break; + + // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 + case Types.DATE: + if (typeName == null) { + typeName = this.resultSetMetaData.getRight().get(columnIndex); + } + + if (typeName.equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + } + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + } + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIME 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + java.sql.Timestamp sqlTimestamp = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIMESTAMP 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new java.sql.Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + case Types.BINARY: + String isArray = column.getRawData().toString(); + if (isArray.startsWith("[")&&isArray.endsWith("]")){ + preparedStatement.setString(columnIndex + 1, column + .asString()); + }else { + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + } + break; + case Types.BOOLEAN: + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + default: + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } }