Merge pull request #2283 from xxsc0529/master

[FEATURE]:oceanbase plugin add direct path support
This commit is contained in:
dingxiaobo 2025-04-03 15:08:03 +08:00 committed by GitHub
commit d82a5aeae6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 3641 additions and 630 deletions

View File

@ -0,0 +1,231 @@
## 1 快速介绍
OceanbaseV10Reader插件实现了从Oceanbase V1.0读取数据。在底层实现上该读取插件通过java clientjdbc连接远程Oceanbase 1.0数据库并执行相应的sql语句将数据从库中SELECT出来。
注意oceanbasev10reader只适用于ob1.0及以后版本的reader。
## 2 实现原理
简而言之Oceanbasev10Reader通过java client连接器连接到远程的Oceanbase数据库并根据用户配置的信息生成查询SELECT SQL语句然后发送到远程Oceanbase v1.0及更高版本数据库并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集并传递给下游Writer处理。<br />对于用户配置Table、Column、Where的信息OceanbaseV10Reader将其拼接为SQL语句发送到Oceanbase 数据库对于用户配置querySql信息Oceanbasev10Reader直接将其发送到Oceanbase数据库。
## 3 功能说明
### 3.1 配置样例
- 配置一个从Oceanbase数据库同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度单位为byte/sDataX运行会尽可能达到该速度但是不超过它.
"byte": 1048576
}
//出错限制
"errorLimit": {
//出错的record条数上限当大于该值即报错。
"record": 0,
//出错的record百分比上限 1.0表示100%0.02表示2%
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"where": "",
"timeout": 5,
"readBatchSize": 50000,
"column": [
"id""name"
],
"connection": [
{
"jdbcUrl": ["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/dbName"],
"table": [
"table"
]
}
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
//是否打印内容
"parameter": {
"print":true,
}
}
}
]
}
}
```
```
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"where": "",
"timeout": 5,
"fetchSize": 500,
"column": [
"id",
"name"
],
"splitPk": "pk",
"connection": [
{
"jdbcUrl": ["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/dbName"],
"table": [
"table"
]
}
],
"username":"xxx",
"password":"xxx"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
```
- 配置一个自定义SQL的数据库同步任务到本地内容的作业
```
{
"job": {
"setting": {
"channel": 3
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"timeout": 5,
"fetchSize": 500,
"splitPk": "pk",
"connection": [
{
"jdbcUrl": ["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/dbName"],
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
]
}
],
"username":"xxx",
"password":"xxx"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
- **jdbcUrl**
- 描述连接ob使用的jdbc url支持两种格式
- ||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/db
- 此格式下username仅填写用户名本身无需三段式写法
- jdbc:mysql://ip:port/db
- 此格式下username需要三段式写法
- 必选:是
- 默认值:无
- **table**
- 描述所选取的需要同步的表。使用JSON的数组描述因此支持多张表同时抽取。当配置为多张表时用户自己需保证多张表是同一schema结构OceanbaseReader不予检查表是否同一逻辑表。注意table必须包含在connection配置单元中。
- 必选:是
- 默认值:无
- **column**
- 描述所配置的表中需要同步的列名集合使用JSON的数组描述字段信息。
- 支持列裁剪,即列可以挑选部分列进行导出。
```
支持列换序即列可以不按照表schema信息进行导出同时支持通配符*,在使用之前需仔细核对列信息。
```
- 必选:是
- 默认值:无
- **where**
- 描述筛选条件OceanbaseReader根据指定的column、table、where条件拼接SQL并根据这个SQL进行数据抽取。在实际业务场景中往往会选择当天的数据进行同步可以将where条件指定为gmt_create > $bizdate 。这里gmt_create不可以是索引字段也不可以是联合索引的第一个字段<br /><br />where条件可以有效地进行业务增量同步。如果不填写where语句包括不提供where的key或者valueDataX均视作同步全量数据
- 必选:否
- 默认值:无
- **splitPk**
- 描述OBReader进行数据抽取时如果指定splitPk表示用户希望使用splitPk代表的字段进行数据分片DataX因此会启动并发任务进行数据同步这样可以大大提供数据同步的效能。
- 推荐splitPk用户使用表主键因为表主键通常情况下比较均匀因此切分出来的分片也不容易出现数据热点。
- 目前splitPk仅支持int数据切分`不支持其他类型`。如果用户指定其他非支持类型将报错。<br />splitPk如果不填写将视作用户不对单表进行切分OBReader使用单通道同步全量数据。
- 必选:否
- 默认值:空
- **querySql**
- 描述在有些业务场景下where这一配置项不足以描述所筛选的条件用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后DataX系统就会忽略tablecolumn这些配置型直接使用这个配置项的内容对数据进行筛选
- `当用户配置querySql时OceanbaseReader直接忽略table、column、where条件的配置`querySql优先级大于table、column、where选项。
- 必选:否
- 默认值:无
- **timeout**
- 描述sql执行的超时时间 单位分钟
- 必选:否
- 默认值5
- **username**
- 描述访问oceanbase的用户名
- 必选:是
- 默认值:无
- ** password**
- 描述访问oceanbase的密码
- 必选:是
- 默认值:无
- **readByPartition**
- 描述:对分区表是否按照分区切分任务
- 必选:否
- 默认值fasle
- **readBatchSize**
- 描述:一次读取的行数,如果遇到内存不足的情况,可将该值调小
- 必选:否
- 默认值10000
### 3.3 类
### 3.3 类型转换
下面列出OceanbaseReader针对Oceanbase类型转换列表:
| DataX 内部类型 | Oceanbase 数据类型 |
| --- | --- |
| Long | int |
| Double | numeric |
| String | varchar |
| Date | timestamp |
| Boolean | bool |
## 4性能测试
### 4.1 测试报告
影响速度的主要原因在于channel数量channel值受限于分表的数量或者单个表的数据分片数量<br />单表导出时查看分片数量的办法,idb执行`select/*+query_timeout(150000000)*/ s.tablet_count from __all_table t,__table_stat s where t.table_id = s.table_id and t.table_name = '表名'`
| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) |
| --- | --- | --- |
| 1 | 15001 | 4.7 |
| 2 | 28169 | 11.66 |
| 3 | 37076 | 14.77 |
| 4 | 55862 | 17.60 |
| 5 | 70860 | 22.31 |
#

View File

@ -0,0 +1,363 @@
## 1 快速介绍
OceanBaseV10Writer 插件实现了写入数据到 OceanBase V1.0以及更高版本数据库的目的表的功能。在底层实现上, OceanbaseV10Writer 通过 java客户端(底层MySQL JDBC或oceanbase client) 连接obproxy远程 OceanBase 数据库,并执行相应的 insert .. on duplicate key update这条sql 语句将数据写入 OceanBase ,内部会分批次提交入库。
Oceanbasev10Writer 面向ETL开发工程师他们使用 Oceanbasev10Writer 从数仓导入数据到 Oceanbase。同时 Oceanbasev10Writer 亦可以作为数据迁移工具为DBA等用户提供服务。
注意oceanbasewriter是ob 0.5的writeroceanbasev10writer是ob 1.0及以后版本的writer。
## 2 实现原理
Oceanbasev10Writer 通过 DataX 框架获取 Reader 生成的协议数据生成insert ... on duplicate key update语句在主键或唯一键冲突时更新表中的所有字段。目前只有这一种行为写入模式只写入不更新和更新指定字段目前暂未支持。 出于性能考虑写入采用batch方式批量写当行数累计到预定阈值时才发起写入请求。
插件连接ob使用Mysql/Oceanbase JDBC driver通过obproxy连接ob
## 3 功能说明
### 3.1 配置样例
- 这里使用一份从内存产生到 Oceanbase 导入的数据。
```
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "update",
"column": [
"id",
"name"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/dbName",
"table": [
"test"
]
}
],
"username": "xxx",
"password":"xxx",
"batchSize": 256,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
```
- 这里使用一份从内存产生到 Oceanbase 旁路导入的数据。
```
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "update",
"column": [
"id",
"name"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/dbName",
"table": [
"test"
]
}
],
"username": "xxx",
"password":"xxx",
"batchSize": 256,
"directPath": true,
"rpcPort": 2882,
"parallel": 8,
"heartBeatInterval": 1000,
"heartBeatTimeout": 6000,
"bufferSize": 1048576,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
```
### 3.2 参数说明
- **jdbcUrl**
- 描述连接ob使用的jdbc url支持两种格式
- ||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/db
- 此格式下username仅填写用户名本身无需三段式写法
- jdbc:mysql://ip:port/db
- 此格式下username需要三段式写法
- 必选:是
- 默认值:无
- **table**
- 描述目的表的表名称。开源版obwriter插件仅支持写入一个表。表名中一般不含库名
- 必选:是
- 默认值:无
- **column**
- 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
```
**column配置项必须指定不能留空**
注意1、我们强烈不推荐你这样配置因为当你目的表字段个数、类型等有改动时你的任务可能运行不正确或者失败
2、 column 不能配置任何常量值
```
- 必选:是
- 默认值:否
- **preSql**
- 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 `@table` 表示,这样在实际执行 Sql 语句时会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称.只支持delete语句
- 必选:否
- 默认值:无
- **batchSize**
- 描述一次性批量提交的记录数大小该值可以极大减少DataX与Oceanbase的网络交互次数并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
- 必选:否
- 默认值1000
- **memstoreThreshold**
- 描述OB租户的memstore使用率当达到这个阀值的时候暂停导入,等释放内存后继续导入. 防止租户内存溢出
- 必选:否
- 默认值0.9
- **username**
- 描述访问oceanbase的用户名。注意当jdbcUrl配置为||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||这样的格式时此处不配置ob的集群名和租户名。否则需要配置为三段式形式。
- 必选:是
- 默认值:无
- **** password****
- 描述访问oceanbase的密码
- 必选:是
- 默认值:无
- writerThreadCount
- 描述每个通道channel中写入使用的线程数
- 必选:否
- 默认值1
- directPath
- 描述:开启旁路导入
- 必选:否
- 默认值false
- rpcPort
- 描述oceanbase的rpc端口
- 必选:否
- 默认值:无
- parallel
- 描述:旁路导入的启用线程数
- 必选:否
- 默认值1
- bufferSize
- 描述:旁路导入的切分数据块大小
- 必选:否
- 默认值1048576
- heartBeatInterval
- 描述:旁路导入的心跳间隔
- 必选:否
- 默认值1000
- heartBeatTimeout
- 描述:旁路导入的心跳超时时间
- 必选:否
- 默认值6000
```
**开启了旁路导入即directPath:true时**
注意1、此时rpcPort为必填项。
2、设置parallel时parallel和oceanbase的负载有关。
3、设置heartBeatTimeout最低不能低于6000heartBeatTimeout的值最低不能低于1000
当heartBeatTimeout和heartBeatTimeout同时设置时heartBeatTimeout-heartBeatTimeout的差值不能低于4000。
4、bufferSize的单位为字节数默认为1M即1048576。
```
## 4 常见问题
###
4.1 连接断开导致写入失败
Data X写入ob的任务失败在log中可以发现在写入ob时连接被断开
```
2018-12-14 05:40:48.586 [18705170-3-17-writer] WARN CommonRdbmsWriter$Task - 遇到OB异常,回滚此次写入, 休眠 1秒,采用逐条写入提交,SQLState:S1000
java.sql.SQLException: Could not retrieve transation read-only status server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:877) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:873) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3603) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3572) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1225) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:958) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.task.MultiTableWriterTask.write(MultiTableWriterTask.java:357) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.task.MultiTableWriterTask.calcRuleAndDoBatchInsert(MultiTableWriterTask.java:338) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.task.MultiTableWriterTask.startWrite(MultiTableWriterTask.java:227) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.OceanBaseV10Writer$Task.startWrite(OceanBaseV10Writer.java:360) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:62) [datax-core-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:834) [na:1.8.0_112]
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 5 milliseconds ago. The last packet sent successfully to the server was 4 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_112]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_112]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_112]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_112]
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3556) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3456) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3897) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2503) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1369) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3597) ~[mysql-connector-java-5.1.40.jar:5.1.40]
... 9 common frames omitted
Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3008) ~[mysql-connector-java-5.1.40.jar:5.1.40]
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3466) ~[mysql-connector-java-5.1.40.jar:5.1.40]
... 17 common frames omitted
```
关键字could not retrieve transation status from read-only status server, communication link failure
检查运行Data X任务的机器发现obproxy在任务运行时发生若干次重启
![](https://cdn.nlark.com/lark/0/2018/png/97504/1544760936504-948a2699-e21b-4970-ad76-25b6ac1cd89d.png#height=156&id=wutJw&originHeight=156&originWidth=507&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=&width=507)
在第一次obproxy退出的日志里找到退出原因
```
[2018-12-14 05:40:47.611683] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:889) [7262][Y0-7F4480213880] [AL=47391-47390-29] obproxy's memroy is out of limit, will be going to commit suicide(mem_limited=838860800, OTHER_MEMORY_SIZE=73400320, is_out_of_mem_limit=true, cur_pos=9) BACKTRACE:0x49db91 0x47fdc9 0x43b115 0x43ee5d 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.612334] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47392-47391-651] history memory size, history_mem_size[0]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.612934] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47393-47392-600] history memory size, history_mem_size[1]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.613530] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47394-47393-596] history memory size, history_mem_size[2]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.614121] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47395-47394-591] history memory size, history_mem_size[3]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.614717] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47396-47395-596] history memory size, history_mem_size[4]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
[2018-12-14 05:40:47.615307] ERROR [PROXY] do_monitor_mem (ob_proxy_main.cpp:891) [7262][Y0-7F4480213880] [AL=47397-47396-590] history memory size, history_mem_size[5]=765460480 BACKTRACE:0x49db91 0x47fdc9 0x48717a 0x43f121 0xa6e623 0xe401b2 0xe3f497 0x4f674c 0x7f4487ace77d 0x7f44865ed9ad
```
关键字obproxy's memroy is out of limit, will be going to commit suicide
可以看到obproxy由于内存不足退出。
#### 解决方案
obproxy在启动时 可以指定使用内存上限默认是800M在某些情况下比如连接数较多该失败的任务为写入100张分表并发数32因此连接数为3200可能会导致obproxy内存不够用。要解决该问题一方面可以调低任务的并发数另一方面可以调大obproxy的内存限制比如调整至2G。
### 4.2 Session interrupted
在使用ob 1.0 writer往单表里写入数据时遇到以下错误
```
2019-01-03 19:37:27.197 [0-insertTask-73] WARN InsertTask - Insert fatal error SqlState =HY000, errorCode = 5066, java.sql.SQLException: Session interrupted, server ip:port[11.145.28.93:2881]
```
关键字fatalSession interruptedserver ip:port
在任务执行的log中还可以发现如下log
```
2019-08-09 11:56:56.758 [2-insertTask-82] ERROR StdoutPluginCollector -
java.sql.SQLException: Session interrupted, server ip:port[11.232.58.16:2881]
at com.alipay.oceanbase.obproxy.connection.ObGroupConnection.checkAndThrowException(ObGroupConnection.java:431) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObStatement.doExecute(ObStatement.java:598) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObStatement.execute(ObStatement.java:456) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObPreparedStatement.execute(ObPreparedStatement.java:148) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:430) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.task.InsertTask.doMultiInsert(InsertTask.java:196) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.oceanbasev10writer.task.InsertTask.run(InsertTask.java:85) [oceanbasev10writer-0.0.1-SNAPSHOT.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) [na:1.8.0_112]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) [na:1.8.0_112]
at java.lang.Thread.run(Thread.java:834) [na:1.8.0_112]
Caused by: com.alipay.oceanbase.obproxy.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: INSERT command denied to user 'dwexp'@'%' for table 'mobile_product_version_info'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_112]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_112]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_112]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_112]
at com.alipay.oceanbase.obproxy.mysql.jdbc.Util.handleNewInstance(Util.java:409) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.Util.getInstance(Util.java:384) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4403) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4275) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2706) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2867) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2843) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2085) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1310) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:493) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObPreparedStatement.executeOnConnection(ObPreparedStatement.java:121) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObStatement.doExecuteOnConnection(ObStatement.java:677) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
at com.alipay.oceanbase.obproxy.statement.ObStatement.doExecute(ObStatement.java:558) ~[oceanbase-connector-java-2.0.8.20180730.jar:na]
... 8 common frames omitted
```
可以看到异常是由于没有insert权限INSERT command denied to user 'dwexp'@'%' for table引起的。
关键字INSERT command denied to user 'dwexp'@'%'
可以看到这个错误是由于没有写入权限导致的因此在observer的log、obproxy的log中都没有相关的信息。
#### 解决方案
在ob中给相关用户授权之后任务重试即可成功。
参考授权命令为:
```sql
grant select, insert, update on dbName.tableName to dwexp;
grant select on oceanbase.gv$memstore to dwexp;
```

View File

@ -115,6 +115,11 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-hbase-client</artifactId>

View File

@ -2,62 +2,86 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer;
public interface Config {
String MEMSTORE_THRESHOLD = "memstoreThreshold";
String MEMSTORE_THRESHOLD = "memstoreThreshold";
double DEFAULT_MEMSTORE_THRESHOLD = 0.9d;
double DEFAULT_MEMSTORE_THRESHOLD = 0.9d;
double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d;
String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond";
double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d;
long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30;
String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond";
int DEFAULT_BATCH_SIZE = 100;
int MAX_BATCH_SIZE = 4096;
long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30;
String FAIL_TRY_COUNT = "failTryCount";
int DEFAULT_BATCH_SIZE = 100;
int DEFAULT_FAIL_TRY_COUNT = 10000;
int MAX_BATCH_SIZE = 4096;
String WRITER_THREAD_COUNT = "writerThreadCount";
String FAIL_TRY_COUNT = "failTryCount";
int DEFAULT_WRITER_THREAD_COUNT = 1;
int DEFAULT_FAIL_TRY_COUNT = 10000;
String CONCURRENT_WRITE = "concurrentWrite";
String WRITER_THREAD_COUNT = "writerThreadCount";
boolean DEFAULT_CONCURRENT_WRITE = true;
int DEFAULT_WRITER_THREAD_COUNT = 1;
String OB_VERSION = "obVersion";
String TIMEOUT = "timeout";
String CONCURRENT_WRITE = "concurrentWrite";
String PRINT_COST = "printCost";
boolean DEFAULT_PRINT_COST = false;
boolean DEFAULT_CONCURRENT_WRITE = true;
String COST_BOUND = "costBound";
long DEFAULT_COST_BOUND = 20;
String OB_VERSION = "obVersion";
String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
String TIMEOUT = "timeout";
String WRITER_SUB_TASK_COUNT = "writerSubTaskCount";
int DEFAULT_WRITER_SUB_TASK_COUNT = 1;
int MAX_WRITER_SUB_TASK_COUNT = 4096;
String PRINT_COST = "printCost";
boolean DEFAULT_PRINT_COST = false;
String COST_BOUND = "costBound";
long DEFAULT_COST_BOUND = 20;
String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
String WRITER_SUB_TASK_COUNT = "writerSubTaskCount";
int DEFAULT_WRITER_SUB_TASK_COUNT = 1;
int MAX_WRITER_SUB_TASK_COUNT = 4096;
String OB_WRITE_MODE = "obWriteMode";
String OB_WRITE_MODE = "obWriteMode";
String OB_COMPATIBLE_MODE = "obCompatibilityMode";
String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout";
int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_GET_CONNECT_TIMEOUT = "ocjGetConnectTimeout";
String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout";
int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s
int DEFAULT_OCJ_GET_CONNECT_TIMEOUT = 5000; // 5s
String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout";
int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s
String OCJ_PROXY_CONNECT_TIMEOUT = "ocjProxyConnectTimeout";
String OB_UPDATE_COLUMNS = "obUpdateColumns";
int DEFAULT_OCJ_PROXY_CONNECT_TIMEOUT = 5000; // 5s
String USE_PART_CALCULATOR = "usePartCalculator";
boolean DEFAULT_USE_PART_CALCULATOR = false;
String OCJ_CREATE_RESOURCE_TIMEOUT = "ocjCreateResourceTimeout";
int DEFAULT_OCJ_CREATE_RESOURCE_TIMEOUT = 60000; // 60s
String OB_UPDATE_COLUMNS = "obUpdateColumns";
String USE_PART_CALCULATOR = "usePartCalculator";
boolean DEFAULT_USE_PART_CALCULATOR = false;
String BLOCKS_COUNT = "blocksCount";
String DIRECT_PATH = "directPath";
String RPC_PORT = "rpcPort";
// 区别于recordLimit这个参数仅针对某张表即一张表超过最大错误数不会影响其他表仅用于旁路导入
String MAX_ERRORS = "maxErrors";
}

View File

@ -0,0 +1,88 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.common;
import java.util.Objects;
public class Table {
private String tableName;
private String dbName;
private Throwable error;
private Status status;
public Table(String dbName, String tableName) {
this.dbName = dbName;
this.tableName = tableName;
this.status = Status.INITIAL;
}
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Table table = (Table) o;
return tableName.equals(table.tableName) && dbName.equals(table.dbName);
}
@Override
public int hashCode() {
return Objects.hash(tableName, dbName);
}
public enum Status {
/**
*
*/
INITIAL(0),
/**
*
*/
RUNNING(1),
/**
*
*/
FAILURE(2),
/**
*
*/
SUCCESS(3);
private int code;
/**
* @param code
*/
private Status(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}
}

View File

@ -0,0 +1,21 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.common;
import java.util.concurrent.ConcurrentHashMap;
public class TableCache {
private static final TableCache INSTANCE = new TableCache();
private final ConcurrentHashMap<String, Table> TABLE_CACHE;
private TableCache() {
TABLE_CACHE = new ConcurrentHashMap<>();
}
public static TableCache getInstance() {
return INSTANCE;
}
public Table getTable(String dbName, String tableName) {
String fullTableName = String.join("-", dbName, tableName);
return TABLE_CACHE.computeIfAbsent(fullTableName, (k) -> new Table(dbName, tableName));
}
}

View File

@ -0,0 +1,257 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public abstract class AbstractRestrictedConnection implements java.sql.Connection {
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String) is unsupported");
}
@Override
public String nativeSQL(String sql) throws SQLException {
throw new UnsupportedOperationException("nativeSQL(String) is unsupported");
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
throw new UnsupportedOperationException("setAutoCommit(boolean) is unsupported");
}
@Override
public boolean getAutoCommit() throws SQLException {
throw new UnsupportedOperationException("getAutoCommit is unsupported");
}
@Override
public void abort(Executor executor) throws SQLException {
throw new UnsupportedOperationException("abort(Executor) is unsupported");
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
throw new UnsupportedOperationException("setNetworkTimeout(Executor, int) is unsupported");
}
@Override
public int getNetworkTimeout() throws SQLException {
throw new UnsupportedOperationException("getNetworkTimeout is unsupported");
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
throw new UnsupportedOperationException("getMetaData is unsupported");
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
throw new UnsupportedOperationException("setReadOnly(boolean) is unsupported");
}
@Override
public boolean isReadOnly() throws SQLException {
throw new UnsupportedOperationException("isReadOnly is unsupported");
}
@Override
public void setCatalog(String catalog) throws SQLException {
throw new UnsupportedOperationException("setCatalog(String) is unsupported");
}
@Override
public String getCatalog() throws SQLException {
throw new UnsupportedOperationException("getCatalog is unsupported");
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
throw new UnsupportedOperationException("setTransactionIsolation(int) is unsupported");
}
@Override
public int getTransactionIsolation() throws SQLException {
throw new UnsupportedOperationException("getTransactionIsolation is unsupported");
}
@Override
public SQLWarning getWarnings() throws SQLException {
throw new UnsupportedOperationException("getWarnings is unsupported");
}
@Override
public void clearWarnings() throws SQLException {
throw new UnsupportedOperationException("clearWarnings is unsupported");
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("createStatement(int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int, int) is unsupported");
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String, int, int) is unsupported");
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
throw new UnsupportedOperationException("getTypeMap is unsupported");
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
throw new UnsupportedOperationException("setTypeMap(Map<String, Class<?>>) is unsupported");
}
@Override
public void setHoldability(int holdability) throws SQLException {
throw new UnsupportedOperationException("setHoldability is unsupported");
}
@Override
public int getHoldability() throws SQLException {
throw new UnsupportedOperationException("getHoldability is unsupported");
}
@Override
public Savepoint setSavepoint() throws SQLException {
throw new UnsupportedOperationException("setSavepoint is unsupported");
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
throw new UnsupportedOperationException("setSavepoint(String) is unsupported");
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
throw new UnsupportedOperationException("rollback(Savepoint) is unsupported");
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
throw new UnsupportedOperationException("releaseSavepoint(Savepoint) is unsupported");
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("createStatement(int, int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int, int, int) is unsupported");
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
throw new UnsupportedOperationException("prepareCall(String, int, int, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, int[]) is unsupported");
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("prepareStatement(String, String[]) is unsupported");
}
@Override
public Clob createClob() throws SQLException {
throw new UnsupportedOperationException("createClob is unsupported");
}
@Override
public Blob createBlob() throws SQLException {
throw new UnsupportedOperationException("createBlob is unsupported");
}
@Override
public NClob createNClob() throws SQLException {
throw new UnsupportedOperationException("createNClob is unsupported");
}
@Override
public SQLXML createSQLXML() throws SQLException {
throw new UnsupportedOperationException("createSQLXML is unsupported");
}
@Override
public boolean isValid(int timeout) throws SQLException {
throw new UnsupportedOperationException("isValid(int) is unsupported");
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
throw new UnsupportedOperationException("setClientInfo(String, String) is unsupported");
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
throw new UnsupportedOperationException("setClientInfo(Properties) is unsupported");
}
@Override
public String getClientInfo(String name) throws SQLException {
throw new UnsupportedOperationException("getClientInfo(String) is unsupported");
}
@Override
public Properties getClientInfo() throws SQLException {
throw new UnsupportedOperationException("getClientInfo is unsupported");
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
throw new UnsupportedOperationException("createArrayOf(String, Object[]) is unsupported");
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
throw new UnsupportedOperationException("createStruct(String, Object[]) is unsupported");
}
@Override
public void setSchema(String schema) throws SQLException {
throw new UnsupportedOperationException("setSchema(String) is unsupported");
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException("unwrap(Class<T>) is unsupported");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<?>) is unsupported");
}
}

View File

@ -0,0 +1,663 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.NClob;
import java.sql.ParameterMetaData;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.List;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
import com.alipay.oceanbase.rpc.util.ObVString;
import org.apache.commons.io.IOUtils;
public abstract class AbstractRestrictedPreparedStatement implements java.sql.PreparedStatement {
private boolean closed;
@Override
public void setNull(int parameterIndex, int sqlType) throws SQLException {
this.setParameter(parameterIndex, createObObj(null));
}
@Override
public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
throw new UnsupportedOperationException("setNull(int, int, String) is unsupported");
}
@Override
public void setBoolean(int parameterIndex, boolean x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setByte(int parameterIndex, byte x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setShort(int parameterIndex, short x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setLong(int parameterIndex, long x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setFloat(int parameterIndex, float x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBytes(int parameterIndex, byte[] x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDate(int parameterIndex, Date x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setDate(int, Date, Calendar) is unsupported");
}
@Override
public void setTime(int parameterIndex, Time x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setTime(int, Time, Calendar) is unsupported");
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
throw new UnsupportedOperationException("setTimestamp(int, Timestamp, Calendar) is unsupported");
}
@Override
public void setObject(int parameterIndex, Object x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
throw new UnsupportedOperationException("setObject(int, Object, int) is unsupported");
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
throw new UnsupportedOperationException("setObject(int, Object, int, int) is unsupported");
}
@Override
public void setRef(int parameterIndex, Ref x) throws SQLException {
throw new UnsupportedOperationException("setRef(int, Ref) is unsupported");
}
@Override
public void setArray(int parameterIndex, Array x) throws SQLException {
throw new UnsupportedOperationException("setArray(int, Array) is unsupported");
}
@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
throw new UnsupportedOperationException("setSQLXML(int, SQLXML) is unsupported");
}
@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
// if (x == null) {
// this.setParameter(parameterIndex, createObObj(x));
// } else {
// // TODO If need BackslashEscapes and character encoding ?
// this.setParameter(parameterIndex, createObObj(x.toString()));
// }
throw new UnsupportedOperationException("setURL(int, URL) is unsupported");
}
@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
throw new UnsupportedOperationException("setRowId(int, RowId) is unsupported");
}
@Override
public void setNString(int parameterIndex, String value) throws SQLException {
this.setParameter(parameterIndex, createObObj(value));
}
@Override
public void setBlob(int parameterIndex, Blob x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBlob(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBlob(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setBlob(int, InputStream, length) is unsupported");
}
@Override
public void setClob(int parameterIndex, Clob x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setClob(int parameterIndex, Reader x) throws SQLException {
this.setCharacterStream(parameterIndex, x);
}
@Override
public void setClob(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setClob(int, Reader, length) is unsupported");
}
@Override
public void setNClob(int parameterIndex, NClob x) throws SQLException {
this.setClob(parameterIndex, (Clob) (x));
}
@Override
public void setNClob(int parameterIndex, Reader x) throws SQLException {
this.setClob(parameterIndex, x);
}
@Override
public void setNClob(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setNClob(int, Reader, length) is unsupported");
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Deprecated
@Override
public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported");
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setAsciiStream(int, InputStream, length) is unsupported");
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported");
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new UnsupportedOperationException("setBinaryStream(int, InputStream, length) is unsupported");
}
@Override
public void setCharacterStream(int parameterIndex, Reader x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setCharacterStream(int parameterIndex, Reader x, int length) throws SQLException {
throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported");
}
@Override
public void setCharacterStream(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setCharacterStream(int, InputStream, length) is unsupported");
}
@Override
public void setNCharacterStream(int parameterIndex, Reader x) throws SQLException {
this.setParameter(parameterIndex, createObObj(x));
}
@Override
public void setNCharacterStream(int parameterIndex, Reader x, long length) throws SQLException {
throw new UnsupportedOperationException("setNCharacterStream(int, InputStream, length) is unsupported");
}
/**
* @return boolean
*/
protected abstract boolean isOracleMode();
/**
* Set parameter to the target position.
*
* @param parameterIndex
* @param obObj
* @throws SQLException
*/
protected abstract void setParameter(int parameterIndex, ObObj obObj) throws SQLException;
/**
* Close the current prepared statement.
*
* @throws SQLException
*/
@Override
public void close() throws SQLException {
this.closed = true;
}
/**
* Return whether the current prepared statement is closed?
*
* @return boolean
* @throws SQLException
*/
@Override
public boolean isClosed() throws SQLException {
return this.closed;
}
/**
* Create a {@link ObObj } array with input values.
*
* @param values Original row value
* @return ObObj[]
*/
public ObObj[] createObObjArray(Object[] values) {
if (values == null) {
return null;
}
ObObj[] array = new ObObj[values.length];
for (int i = 0; i < values.length; i++) {
array[i] = createObObj(values[i]);
}
return array;
}
/**
* Create a {@link ObObj } array with input values.
*
* @param values Original row value
* @return ObObj[]
*/
public ObObj[] createObObjArray(List<Object> values) {
if (values == null) {
return null;
}
ObObj[] array = new ObObj[values.size()];
for (int i = 0; i < values.size(); i++) {
array[i] = createObObj(values.get(i));
}
return array;
}
/**
* Create a {@link ObObj } instance.
*
* @param value Original column value
* @return ObObj
*/
public ObObj createObObj(Object value) {
try {
// Only used for strongly typed declared variables
Object convertedValue = value == null ? null : convertValue(value);
return new ObObj(ObObjType.defaultObjMeta(convertedValue), convertedValue);
} catch (Exception ex) {
throw new IllegalArgumentException(ex);
}
}
/**
* Some values with data type is unsupported by ObObjType#valueOfType.
* We should convert the input value to supported value data type.
*
* @param value
* @return Object
* @throws Exception
*/
public static Object convertValue(Object value) throws Exception {
if (value instanceof BigDecimal) {
return value.toString();
} else if (value instanceof BigInteger) {
return value.toString();
} else if (value instanceof Instant) {
return Timestamp.from(((Instant) value));
} else if (value instanceof LocalDate) {
// Warn: java.sql.Date.valueOf() is deprecated. As local zone is used.
return Date.valueOf(((LocalDate) value));
} else if (value instanceof LocalTime) {
// Warn: java.sql.Time.valueOf() is deprecated.
Time t = Time.valueOf((LocalTime) value);
return new Timestamp(t.getTime());
} else if (value instanceof LocalDateTime) {
return Timestamp.valueOf(((LocalDateTime) value));
} else if (value instanceof OffsetDateTime) {
return Timestamp.from(((OffsetDateTime) value).toInstant());
} else if (value instanceof Time) {
return new Timestamp(((Time) value).getTime());
} else if (value instanceof ZonedDateTime) {
// Note: Be care of time zone!!!
return Timestamp.from(((ZonedDateTime) value).toInstant());
} else if (value instanceof OffsetTime) {
LocalTime lt = ((OffsetTime) value).toLocalTime();
// Warn: java.sql.Time.valueOf() is deprecated.
return new Timestamp(Time.valueOf(lt).getTime());
} else if (value instanceof InputStream) {
try (InputStream is = ((InputStream) value)) {
// Note: Be care of character set!!!
return new ObVString(IOUtils.toString(is, Charset.defaultCharset()));
}
} else if (value instanceof Blob) {
Blob b = (Blob) value;
try (InputStream is = b.getBinaryStream()) {
if (is == null) {
return null;
}
// Note: Be care of character set!!!
return new ObVString(IOUtils.toString(is, Charset.defaultCharset()));
} finally {
b.free();
}
} else if (value instanceof Reader) {
try (Reader r = ((Reader) value)) {
return IOUtils.toString(r);
}
} else if (value instanceof Clob) {
Clob c = (Clob) value;
try (Reader r = c.getCharacterStream()) {
return r == null ? null : IOUtils.toString(r);
} finally {
c.free();
}
} else {
return value;
}
}
// *********************************************************************************** //
@Override
public boolean getMoreResults(int current) throws SQLException {
throw new UnsupportedOperationException("getMoreResults(int) is unsupported");
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
throw new UnsupportedOperationException("getGeneratedKeys is unsupported");
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, int) is unsupported");
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, int[]) is unsupported");
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String, String[]) is unsupported");
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException("execute(String, int) is unsupported");
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException("execute(String, int[]) is unsupported");
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException("execute(String, String[]) is unsupported");
}
@Override
public int getResultSetHoldability() throws SQLException {
throw new UnsupportedOperationException("getResultSetHoldability is unsupported");
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
throw new UnsupportedOperationException("setPoolable(boolean) is unsupported");
}
@Override
public boolean isPoolable() throws SQLException {
throw new UnsupportedOperationException("isPoolable is unsupported");
}
@Override
public void closeOnCompletion() throws SQLException {
throw new UnsupportedOperationException("closeOnCompletion is unsupported");
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
throw new UnsupportedOperationException("isCloseOnCompletion is unsupported");
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
throw new UnsupportedOperationException("executeQuery(String) is unsupported");
}
@Override
public int executeUpdate(String sql) throws SQLException {
throw new UnsupportedOperationException("executeUpdate(String) is unsupported");
}
@Override
public int getMaxFieldSize() throws SQLException {
throw new UnsupportedOperationException("getMaxFieldSize is unsupported");
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
throw new UnsupportedOperationException("setMaxFieldSize(int) is unsupported");
}
@Override
public int getMaxRows() throws SQLException {
throw new UnsupportedOperationException("getMaxRows is unsupported");
}
@Override
public void setMaxRows(int max) throws SQLException {
throw new UnsupportedOperationException("setMaxRows(int) is unsupported");
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
throw new UnsupportedOperationException("setEscapeProcessing(boolean) is unsupported");
}
@Override
public int getQueryTimeout() throws SQLException {
throw new UnsupportedOperationException("getQueryTimeout is unsupported");
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
throw new UnsupportedOperationException("setQueryTimeout(int) is unsupported");
}
@Override
public void cancel() throws SQLException {
throw new UnsupportedOperationException("cancel is unsupported");
}
@Override
public SQLWarning getWarnings() throws SQLException {
throw new UnsupportedOperationException("getWarnings is unsupported");
}
@Override
public void clearWarnings() throws SQLException {
throw new UnsupportedOperationException("clearWarnings is unsupported");
}
@Override
public void setCursorName(String name) throws SQLException {
throw new UnsupportedOperationException("setCursorName(String) is unsupported");
}
@Override
public boolean execute(String sql) throws SQLException {
throw new UnsupportedOperationException("execute(String) is unsupported");
}
@Override
public ResultSet getResultSet() throws SQLException {
throw new UnsupportedOperationException("getResultSet is unsupported");
}
@Override
public int getUpdateCount() throws SQLException {
throw new UnsupportedOperationException("getUpdateCount is unsupported");
}
@Override
public boolean getMoreResults() throws SQLException {
throw new UnsupportedOperationException("getMoreResults is unsupported");
}
@Override
public void setFetchDirection(int direction) throws SQLException {
throw new UnsupportedOperationException("setFetchDirection(int) is unsupported");
}
@Override
public int getFetchDirection() throws SQLException {
throw new UnsupportedOperationException("getFetchDirection is unsupported");
}
@Override
public void setFetchSize(int rows) throws SQLException {
throw new UnsupportedOperationException("setFetchSize(int) is unsupported");
}
@Override
public int getFetchSize() throws SQLException {
throw new UnsupportedOperationException("getFetchSize is unsupported");
}
@Override
public int getResultSetConcurrency() throws SQLException {
throw new UnsupportedOperationException("getResultSetConcurrency is unsupported");
}
@Override
public int getResultSetType() throws SQLException {
throw new UnsupportedOperationException("getResultSetType is unsupported");
}
@Override
public void addBatch(String sql) throws SQLException {
throw new UnsupportedOperationException("addBatch(String) is unsupported");
}
@Override
public ResultSet executeQuery() throws SQLException {
throw new UnsupportedOperationException("executeQuery is unsupported");
}
@Override
public int executeUpdate() throws SQLException {
throw new UnsupportedOperationException("executeUpdate is unsupported");
}
@Override
public boolean execute() throws SQLException {
throw new UnsupportedOperationException("execute is unsupported");
}
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new UnsupportedOperationException("getParameterMetaData is unsupported");
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
throw new UnsupportedOperationException("getMetaData is unsupported");
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<T>) is unsupported");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException("isWrapperFor(Class<?>) is unsupported");
}
}

View File

@ -0,0 +1,170 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.io.Serializable;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The builder for {@link ObTableDirectLoad}.
*/
public class DirectLoaderBuilder implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DirectLoaderBuilder.class);
private String host;
private int port;
private String user;
private String tenant;
private String password;
private String schema;
private String table;
/**
* Server-side parallelism.
*/
private int parallel;
private long maxErrorCount;
private ObLoadDupActionType duplicateKeyAction;
/**
* The overall timeout of the direct load task
*/
private Long timeout;
private Long heartBeatTimeout;
private Long heartBeatInterval;
public DirectLoaderBuilder host(String host) {
this.host = host;
return this;
}
public DirectLoaderBuilder port(int port) {
this.port = port;
return this;
}
public DirectLoaderBuilder user(String user) {
//1.4.0的obkv版本只需要用户名称不能带租户和集群信息
int indexOf = user.indexOf("@");
this.user = user;
if (indexOf > 0) {
this.user = user.substring(0, indexOf);
}
return this;
}
public DirectLoaderBuilder tenant(String tenant) {
this.tenant = tenant;
return this;
}
public DirectLoaderBuilder password(String password) {
this.password = password;
return this;
}
public DirectLoaderBuilder schema(String schema) {
this.schema = schema;
return this;
}
public DirectLoaderBuilder table(String table) {
this.table = table;
return this;
}
public DirectLoaderBuilder parallel(int parallel) {
this.parallel = parallel;
return this;
}
public DirectLoaderBuilder maxErrorCount(long maxErrorCount) {
this.maxErrorCount = maxErrorCount;
return this;
}
public DirectLoaderBuilder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
this.duplicateKeyAction = duplicateKeyAction;
return this;
}
public DirectLoaderBuilder timeout(long timeout) {
this.timeout = timeout;
return this;
}
public DirectLoaderBuilder heartBeatTimeout(Long heartBeatTimeout) {
this.heartBeatTimeout = heartBeatTimeout;
return this;
}
public DirectLoaderBuilder heartBeatInterval(Long heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
return this;
}
public ObTableDirectLoad build() {
try {
ObDirectLoadConnection obDirectLoadConnection = buildConnection(parallel);
ObDirectLoadStatement obDirectLoadStatement = buildStatement(obDirectLoadConnection);
return new ObTableDirectLoad(schema, table, obDirectLoadStatement, obDirectLoadConnection);
} catch (ObDirectLoadException e) {
throw new ObTableException(e.getMessage(), e);
}
}
private ObDirectLoadConnection buildConnection(int writeThreadNum) throws ObDirectLoadException {
if (heartBeatTimeout == null || heartBeatInterval == null) {
throw new IllegalArgumentException("heartBeatTimeout and heartBeatInterval must not be null");
}
ObDirectLoadConnection build = ObDirectLoadManager.getConnectionBuilder()
.setServerInfo(host, port)
.setLoginInfo(tenant, user, password, schema)
.setHeartBeatInfo(heartBeatTimeout, heartBeatInterval)
.enableParallelWrite(writeThreadNum)
.build();
log.info("ObDirectLoadConnection value is:{}", ObjectUtils.toString(build));
return build;
}
private ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection) throws ObDirectLoadException {
ObDirectLoadStatement build = connection.getStatementBuilder()
.setTableName(table)
.setParallel(parallel)
.setQueryTimeout(timeout)
.setDupAction(duplicateKeyAction)
.setMaxErrorRowCount(maxErrorCount)
.build();
log.info("ObDirectLoadStatement value is:{}", ObjectUtils.toString(build));
return build;
}
}

View File

@ -0,0 +1,398 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.Arrays;
import com.alibaba.datax.common.util.Configuration;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
public class DirectPathConnection extends AbstractRestrictedConnection {
private static final int OB_DIRECT_PATH_DEFAULT_BLOCKS = 1;
private static final long OB_DIRECT_PATH_HEART_BEAT_TIMEOUT = 60000;
private static final long OB_DIRECT_PATH_HEART_BEAT_INTERVAL = 10000;
private static final int DEFAULT_BUFFERSIZE = 1048576;
private final Configuration configuration;
private State state;
private int commiters;
private final int blocks;
private final ObTableDirectLoad load;
private final Object lock = new Object();
private static final Logger log = LoggerFactory.getLogger(DirectPathConnection.class);
/**
* Construct a new instance.
*
* @param load
* @param blocks
*/
private DirectPathConnection(ObTableDirectLoad load, int blocks, Configuration configuration) {
this.configuration = configuration;
this.load = load;
this.blocks = blocks;
}
/**
* Begin a new {@link DirectPathConnection }
*
* @return DirectPathConnection
* @throws SQLException
*/
public DirectPathConnection begin() throws SQLException {
synchronized (lock) {
if (state == null || state == State.CLOSED) {
try {
this.load.begin();
this.state = State.BEGIN;
} catch (Exception ex) {
throw new SQLException(ex);
}
} else {
throw new IllegalStateException("Begin transaction failed as connection state is already BEGIN");
}
}
return this;
}
/**
* Commit buffered data with MAXIMUM timeout.
*
* @throws SQLException
*/
@Override
public void commit() throws SQLException {
synchronized (lock) {
if (state == State.BEGIN) {
this.commiters++;
if (commiters == blocks) {
try {
this.load.commit();
state = State.FINISHED;
} catch (Exception ex) {
throw new SQLException(ex);
}
} else if (commiters > blocks) {
throw new IllegalStateException("Your commit have exceed the limit. (" + commiters + ">" + blocks + ")");
}
} else {
throw new IllegalStateException("Commit transaction failed as connection state is not BEGIN");
}
}
}
/**
* Rollback if error occurred.
*
* @throws SQLException
*/
@Override
public void rollback() throws SQLException {
synchronized (lock) {
if (state == State.BEGIN) {
try {
//obkv-table-client-2.1.0的close方法包含回滚逻辑
this.load.close();
} catch (Exception ex) {
throw new SQLException(ex);
}
} else {
throw new IllegalStateException("Rollback transaction failed as connection state is not BEGIN");
}
}
}
/**
* Close this connection.
*/
@Override
public void close() {
synchronized (lock) {
// Closed only if state is BEGIN
this.load.close();
this.state = State.CLOSED;
}
}
/**
* @return DirectPathPreparedStatement
*/
@Override
public DirectPathPreparedStatement createStatement() throws SQLException {
return this.prepareStatement(null);
}
/**
* A new batch need create a new {@link DirectPathPreparedStatement }.
* The {@link DirectPathPreparedStatement } can not be reuse, otherwise it may cause duplicate records.
*
* @return DirectPathStatement
*/
@Override
public DirectPathPreparedStatement prepareStatement(String sql) throws SQLException {
if (state == State.BEGIN) {
Integer bufferSize = configuration.getInt(DirectPathConstants.BUFFERSIZE, DEFAULT_BUFFERSIZE);
log.info("The current bufferSize size is{}", bufferSize);
return new DirectPathPreparedStatement(this, bufferSize);
} else {
throw new IllegalStateException("Create statement failed as connection state is not BEGIN");
}
}
/**
* Return the schema name of this connection instance.
*
* @return String
*/
@Override
public String getSchema() {
if (state == State.BEGIN) {
return this.load.getTable().getDatabase();
} else {
throw new IllegalStateException("Get schema failed as connection state is not BEGIN");
}
}
/**
* Return the table name of this connection instance.
*
* @return String
*/
public String getTableName() {
if (state == State.BEGIN) {
return this.load.getTableName();
} else {
throw new IllegalStateException("Get table failed as connection state is not BEGIN");
}
}
/**
* Return whether this connection is closed.
*
* @return boolean
*/
@Override
public boolean isClosed() {
synchronized (lock) {
return this.state == State.CLOSED;
}
}
public boolean isFinished() {
return this.state.equals(State.FINISHED);
}
/**
* Insert bucket data into buffer.
*
* @param bucket
* @return int[]
* @throws SQLException
*/
int[] insert(ObDirectLoadBucket bucket) throws SQLException {
try {
this.load.write(bucket);
int[] result = new int[bucket.getRowNum()];
Arrays.fill(result, 1);
return result;
} catch (Exception ex) {
throw new SQLException(ex);
}
}
/**
* Indicates the state of {@link DirectPathConnection }
*/
enum State {
/**
* Begin transaction
*/
BEGIN,
/**
* Transaction is finished, ready to close.
*/
FINISHED,
/**
* Transaction is closed.
*/
CLOSED;
}
/**
* This builder used to build a new {@link DirectPathConnection }
*/
public static class Builder {
private String host;
private int port;
private String user;
private String tenant;
private String password;
private String schema;
private String table;
/**
* Client job count.
*/
private int blocks = OB_DIRECT_PATH_DEFAULT_BLOCKS;
/**
* Server threads used to sort.
*/
private int parallel;
private long maxErrorCount;
private ObLoadDupActionType duplicateKeyAction;
// Used for load data
private long serverTimeout;
private Configuration configuration;
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder user(String user) {
this.user = user;
return this;
}
public Builder tenant(String tenant) {
this.tenant = tenant;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
public Builder schema(String schema) {
this.schema = schema;
return this;
}
public Builder table(String table) {
this.table = table;
return this;
}
public Builder blocks(int blocks) {
this.blocks = blocks;
return this;
}
public Builder parallel(int parallel) {
this.parallel = parallel;
return this;
}
public Builder maxErrorCount(long maxErrorCount) {
this.maxErrorCount = maxErrorCount;
return this;
}
public Builder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
this.duplicateKeyAction = duplicateKeyAction;
return this;
}
public Builder serverTimeout(long serverTimeout) {
this.serverTimeout = serverTimeout;
return this;
}
public Builder configuration(Configuration configuration) {
this.configuration = configuration;
return this;
}
/**
* Build a new {@link DirectPathConnection }
*
* @return DirectPathConnection
*/
public DirectPathConnection build() throws Exception {
return createConnection(host, port, user, tenant, password, schema, table, //
blocks, parallel, maxErrorCount, duplicateKeyAction, serverTimeout, duplicateKeyAction).begin();
}
/**
* Create a new {@link DirectPathConnection }
*
* @param host
* @param port
* @param user
* @param tenant
* @param password
* @param schema
* @param table
* @param parallel
* @param maxErrorCount
* @param action
* @param serverTimeout
* @return DirectPathConnection
* @throws Exception
*/
DirectPathConnection createConnection(String host, int port, String user, String tenant, String password, String schema, String table, //
int blocks, int parallel, long maxErrorCount, ObLoadDupActionType action, long serverTimeout, ObLoadDupActionType obLoadDupActionType) throws Exception {
checkArgument(StringUtils.isNotBlank(host), "Host is null.(host=%s)", host);
checkArgument((port > 0 && port < 65535), "Port is invalid.(port=%s)", port);
checkArgument(StringUtils.isNotBlank(user), "User Name is null.(user=%s)", user);
checkArgument(StringUtils.isNotBlank(tenant), "Tenant Name is null.(tenant=%s)", tenant);
checkArgument(StringUtils.isNotBlank(schema), "Schema Name is null.(schema=%s)", schema);
checkArgument(StringUtils.isNotBlank(table), "Table Name is null.(table=%s)", table);
checkArgument(blocks > 0, "Client Blocks is invalid.(blocks=%s)", blocks);
checkArgument(parallel > 0, "Server Parallel is invalid.(parallel=%s)", parallel);
checkArgument(maxErrorCount > -1, "MaxErrorCount is invalid.(maxErrorCount=%s)", maxErrorCount);
checkArgument(action != null, "ObLoadDupActionType is null.(obLoadDupActionType=%s)", action);
checkArgument(serverTimeout > 0, "Server timeout is invalid.(timeout=%s)", serverTimeout);
Long heartBeatTimeout = 0L;
Long heartBeatInterval = 0L;
if (configuration != null) {
heartBeatTimeout = configuration.getLong(DirectPathConstants.HEART_BEAT_TIMEOUT, OB_DIRECT_PATH_HEART_BEAT_TIMEOUT);
heartBeatInterval = configuration.getLong(DirectPathConstants.HEART_BEAT_INTERVAL, OB_DIRECT_PATH_HEART_BEAT_INTERVAL);
parallel = configuration.getInt(DirectPathConstants.PARALLEL, parallel);
}
DirectLoaderBuilder builder = new DirectLoaderBuilder()
.host(host).port(port)
.user(user)
.tenant(tenant)
.password(password)
.schema(schema)
.table(table)
.parallel(parallel)
.maxErrorCount(maxErrorCount)
.timeout(serverTimeout)
.duplicateKeyAction(obLoadDupActionType)
.heartBeatTimeout(heartBeatTimeout)
.heartBeatInterval(heartBeatInterval);
ObTableDirectLoad directLoad = builder.build();
return new DirectPathConnection(directLoad, blocks, configuration);
}
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
public class DirectPathConstants {
// 以下常量已在DirectPathConnection中被正确使用
public static final String HEART_BEAT_TIMEOUT = "heartBeatTimeout";
public static final String HEART_BEAT_INTERVAL = "heartBeatInterval";
public static final String PARALLEL = "parallel";
public static final String BUFFERSIZE = "bufferSize";
}

View File

@ -0,0 +1,164 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import static com.google.common.base.Preconditions.checkArgument;
public class DirectPathPreparedStatement extends AbstractRestrictedPreparedStatement {
private ObDirectLoadBucket bucket;
private final DirectPathConnection conn;
private final Map<Integer, ObObj> parameters;
private final Integer bufferSize;
private static final int DEFAULT_BUFFERSIZE = 1048576;
public static final int[] EMPTY_ARRAY = new int[0];
/**
* Construct a new {@link DirectPathConnection } instance.
*
* @param conn
*/
public DirectPathPreparedStatement(DirectPathConnection conn) {
this.conn = conn;
this.parameters = new HashMap<>();
this.bufferSize = DEFAULT_BUFFERSIZE;
this.bucket = new ObDirectLoadBucket();
}
public DirectPathPreparedStatement(DirectPathConnection conn, Integer bufferSize) {
this.conn = conn;
this.parameters = new HashMap<>();
this.bufferSize = bufferSize;
this.bucket = new ObDirectLoadBucket(bufferSize);
}
/**
* Return current direct path connection.
*
* @return DirectPathConnection
* @throws SQLException
*/
@Override
public DirectPathConnection getConnection() throws SQLException {
return this.conn;
}
/**
* Copy a new row data avoid overwrite.
*
* @throws SQLException
*/
@Override
public void addBatch() throws SQLException {
checkRange();
ObObj[] objObjArray = new ObObj[parameters.size()];
for (Map.Entry<Integer, ObObj> entry : parameters.entrySet()) {
objObjArray[entry.getKey() - 1] = entry.getValue();
}
this.addBatch(objObjArray);
}
/**
* Add a new row into buffer with input original value list.
*
* @param values One original row data.
*/
public void addBatch(List<Object> values) {
this.addBatch(createObObjArray(values));
}
/**
* Add a new row into buffer with input original value array.
*
* @param values One original row data.
*/
public void addBatch(Object[] values) {
this.addBatch(createObObjArray(values));
}
/**
* Add a new row into buffer with input ObObj array.
*
* @param arr One row data described as ObObj.
*/
private void addBatch(ObObj[] arr) {
checkArgument(arr != null && arr.length > 0, "Input values is null");
try {
this.bucket.addRow(arr);
} catch (ObDirectLoadException e) {
throw new RuntimeException(e);
}
}
/**
* Buffered the row data in memory. (defined in the bucket)
* You must invoke {@code ObDirectLoadBucket.clearBatch } after executeBatch.
*
* @return int[]
* @throws SQLException
*/
@Override
public int[] executeBatch() throws SQLException {
return this.bucket.isEmpty() ? EMPTY_ARRAY : this.conn.insert(bucket);
}
/**
* Clear batch is always recreate a new {@link ObDirectLoadBucket}
*/
@Override
public void clearBatch() {
this.parameters.clear();
this.bucket = new ObDirectLoadBucket(bufferSize);
}
/**
* Clear the holder parameters.
*
* @throws SQLException
*/
@Override
public void clearParameters() throws SQLException {
this.parameters.clear();
}
/**
* @return boolean
*/
@Override
public boolean isOracleMode() {
return false;
}
/**
* Set parameter to the target position.
*
* @param parameterIndex Start From 1
* @param obObj Convert original value to {@link ObObj }
* @throws SQLException
*/
@Override
protected void setParameter(int parameterIndex, ObObj obObj) throws SQLException {
checkArgument(parameterIndex > 0, "Parameter index should start from 1");
this.parameters.put(parameterIndex, obObj);
}
/**
* Avoid range exception:
* <p>
* Map.put(1, "abc");
* Map.put(5, "def"); // Error: parameter index is 5, but 2 values exists.
*/
private void checkRange() {
OptionalInt optionalInt = parameters.keySet().stream().mapToInt(e -> e).max();
int parameterIndex = optionalInt.orElseThrow(() -> new IllegalArgumentException("No parameter index found"));
checkArgument(parameterIndex == parameters.size(), "Parameter index(%s) is unmatched with value list(%s)", parameterIndex, parameters.size());
}
}

View File

@ -0,0 +1,154 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath;
import java.sql.SQLException;
import java.util.Objects;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId;
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
import com.alipay.oceanbase.rpc.table.ObTable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper of the direct-load API for OceanBase.
*/
public class ObTableDirectLoad implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ObTableDirectLoad.class);
private final String tableName;
private final String schemaTableName;
private final ObDirectLoadStatement statement;
private final ObDirectLoadConnection connection;
public ObTableDirectLoad(String schemaName, String tableName, ObDirectLoadStatement statement, ObDirectLoadConnection connection) {
Objects.requireNonNull(schemaName, "schemaName must not be null");
Objects.requireNonNull(tableName, "tableName must not be null");
Objects.requireNonNull(statement, "statement must not be null");
Objects.requireNonNull(connection, "connection must not be null");
this.tableName = tableName;
this.schemaTableName = String.format("%s.%s", schemaName, tableName);
this.statement = statement;
this.connection = connection;
}
/**
* Begin the direct load operation.
*
* @throws ObDirectLoadException if an error occurs during the operation.
*/
public void begin() throws ObDirectLoadException {
statement.begin();
}
/**
* Write data into the direct load operation.
*
* @param bucket The data bucket to write.
* @throws SQLException if writing fails.
*/
public void write(ObDirectLoadBucket bucket) throws SQLException {
try {
if (bucket == null || bucket.isEmpty()) {
throw new IllegalArgumentException("Bucket must not be null or empty.");
}
LOG.info("Writing {} rows to table: {}", bucket.getRowNum(), schemaTableName);
statement.write(bucket);
LOG.info("Successfully wrote bucket data to table: {}", schemaTableName);
} catch (ObDirectLoadException e) {
LOG.error("Failed to write to table: {}", schemaTableName, e);
throw new SQLException(String.format("Failed to write to table: %s", schemaTableName), e);
}
}
/**
* Commit the current direct load operation.
*
* @throws SQLException if commit fails.
*/
public void commit() throws SQLException {
try {
LOG.info("Committing direct load for table: {}", schemaTableName);
statement.commit();
LOG.info("Successfully committed direct load for table: {}", schemaTableName);
} catch (ObDirectLoadException e) {
LOG.error("Failed to commit for table: {}", schemaTableName, e);
throw new SQLException(String.format("Failed to commit for table: %s", schemaTableName), e);
}
}
/**
* Close the direct load operation.
*/
public void close() {
LOG.info("Closing direct load for table: {}", schemaTableName);
statement.close();
connection.close();
LOG.info("Direct load closed for table: {}", schemaTableName);
}
/**
* Gets the status from the current connection based on the traceId
*/
public ObTableLoadClientStatus getStatus() throws SQLException {
ObDirectLoadTraceId traceId = statement.getTraceId();
// Check if traceId is null and throw an exception with a clear message
if (traceId == null) {
throw new SQLException("traceId is null.");
}
// Retrieve the status using the traceId
ObTableLoadClientStatus status = statement.getConnection().getProtocol().getHeartBeatRpc(traceId).getStatus();
if (status == null) {
LOG.info("Direct load connect protocol heartBeatRpc for table is null: {}", schemaTableName);
throw new SQLException("status is null.");
}
// Return status if not null; otherwise, return ERROR
return status;
}
/**
* Gets the current table
*/
public ObTable getTable() {
try {
return this.statement.getObTablePool().getControlObTable();
} catch (ObDirectLoadException e) {
throw new RuntimeException(e);
}
}
public String getTableName() {
if (StringUtils.isBlank(tableName)) {
throw new IllegalArgumentException("tableName is blank.");
}
return tableName;
}
/**
* Inserts data into the direct load operation.
*
* @param bucket The data bucket containing rows to insert.
* @throws SQLException if an error occurs during the insert operation.
*/
public void insert(ObDirectLoadBucket bucket) throws SQLException {
LOG.info("Inserting {} rows to table: {}", bucket.getRowNum(), schemaTableName);
if (bucket.isEmpty()) {
LOG.warn("Parameter 'bucket' must not be empty.");
throw new IllegalArgumentException("Parameter 'bucket' must not be empty.");
}
try {
// Perform the insertion into the load operation
statement.write(bucket);
LOG.info("Successfully inserted data into table: {}", schemaTableName);
} catch (Exception ex) {
LOG.error("Unexpected error during insert operation for table: {}", schemaTableName, ex);
throw new SQLException("Unexpected error during insert operation.", ex);
}
}
}

View File

@ -14,6 +14,17 @@ public abstract class AbstractConnHolder {
protected final Configuration config;
protected Connection conn;
protected String jdbcUrl;
protected String userName;
protected String password;
protected AbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
this.config = config;
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public AbstractConnHolder(Configuration config) {
this.config = config;
}
@ -45,4 +56,6 @@ public abstract class AbstractConnHolder {
public abstract String getUserName();
public abstract void destroy();
public abstract void doCommit();
}

View File

@ -0,0 +1,61 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class DirectPathAbstractConnHolder {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnHolder.class);
protected Configuration config;
protected String jdbcUrl;
protected String userName;
protected String password;
protected Connection conn;
protected DirectPathAbstractConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
this.config = config;
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public Connection reconnect() {
DBUtil.closeDBResources(null, conn);
return initConnection();
}
public Connection getConn() {
if (conn == null) {
return initConnection();
} else {
try {
if (conn.isClosed()) {
return reconnect();
}
return conn;
} catch (Exception e) {
LOG.debug("can not judge whether the hold connection is closed or not, just reuse the hold connection");
return conn;
}
}
}
public String getJdbcUrl() {
return jdbcUrl;
}
public Configuration getConfig() {
return config;
}
public void doCommit() {}
public abstract void destroy();
public abstract Connection initConnection();
}

View File

@ -0,0 +1,115 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirectPathConnHolder extends AbstractConnHolder {
private static final Logger LOG = LoggerFactory.getLogger(DirectPathConnHolder.class);
/**
* The server side timeout.
*/
private static final long SERVER_TIMEOUT = 24L * 60 * 60 * 1000 * 1000;
private static final ConcurrentHashMap<Table, DirectPathConnection> cache = new ConcurrentHashMap<>();
private String tableName;
private String host;
private int rpcPort;
private String tenantName;
private String databaseName;
private int blocks;
private int threads;
private int maxErrors;
private ObLoadDupActionType duplicateKeyAction;
public DirectPathConnHolder(Configuration config, ServerConnectInfo connectInfo, String tableName, int threadsPerChannel) {
super(config, connectInfo.jdbcUrl, connectInfo.userName, connectInfo.password);
// direct path:
// publicCloud & odp - single or full
// publicCloud & observer - not support
// !publicCloud & odp - full
// !publicCloud & observer - single
this.userName = connectInfo.getFullUserName();
this.host = connectInfo.host;
this.rpcPort = connectInfo.rpcPort;
this.tenantName = connectInfo.tenantName;
if (!connectInfo.publicCloud && StringUtils.isEmpty(tenantName)) {
throw new IllegalStateException("tenant name is needed when using direct path load in private cloud.");
}
this.databaseName = connectInfo.databaseName;
this.tableName = tableName;
this.blocks = config.getInt(Config.BLOCKS_COUNT, 1);
this.threads = threadsPerChannel * Math.min(blocks, 32);
this.maxErrors = config.getInt(Config.MAX_ERRORS, 0);
this.duplicateKeyAction = "insert".equalsIgnoreCase(config.getString(Config.OB_WRITE_MODE)) ? ObLoadDupActionType.IGNORE : ObLoadDupActionType.REPLACE;
}
@Override
public Connection initConnection() {
synchronized (cache) {
conn = cache.computeIfAbsent(new Table(databaseName, tableName), e -> {
try {
return new DirectPathConnection.Builder().host(host) //
.port(rpcPort) //
.tenant(tenantName) //
.user(userName) //
.password(Optional.ofNullable(password).orElse("")) //
.schema(databaseName) //
.table(tableName) //
.blocks(blocks) //
.parallel(threads) //
.maxErrorCount(maxErrors) //
.duplicateKeyAction(duplicateKeyAction) //
.serverTimeout(SERVER_TIMEOUT) //
.configuration(config)
.build();
} catch (Exception ex) {
throw DataXException.asDataXException(DBUtilErrorCode.CONN_DB_ERROR, ex);
}
});
}
return conn;
}
public String getJdbcUrl() {
return "";
}
public String getUserName() {
return "";
}
@Override
public void destroy() {
if (conn != null && ((DirectPathConnection) conn).isFinished()) {
DBUtil.closeDBResources(null, conn);
}
}
@Override
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,42 +1,54 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import com.alibaba.datax.common.util.Configuration;
/**
* wrap oceanbase java client
*
* @author oceanbase
*/
public class OCJConnHolder extends AbstractConnHolder {
private ServerConnectInfo connectInfo;
private String dataSourceKey;
private ServerConnectInfo connectInfo;
private String dataSourceKey;
public OCJConnHolder (Configuration config, ServerConnectInfo connInfo) {
super(config);
this.connectInfo = connInfo;
this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName);
OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName);
}
public OCJConnHolder(Configuration config, ServerConnectInfo connInfo) {
super(config);
this.connectInfo = connInfo;
this.dataSourceKey = OBDataSourceV10.genKey(connectInfo.getFullUserName(), connectInfo.databaseName);
OBDataSourceV10.init(config, connectInfo.getFullUserName(), connectInfo.password, connectInfo.databaseName);
}
@Override
public Connection initConnection() {
conn = OBDataSourceV10.getConnection(dataSourceKey);
return conn;
}
@Override
public Connection initConnection() {
conn = OBDataSourceV10.getConnection(dataSourceKey);
return conn;
}
@Override
public String getJdbcUrl() {
return connectInfo.jdbcUrl;
}
@Override
public String getJdbcUrl() {
return connectInfo.jdbcUrl;
}
@Override
public String getUserName() {
return connectInfo.userName;
}
public void destroy() {
OBDataSourceV10.destory(this.dataSourceKey);
}
@Override
public String getUserName() {
return connectInfo.userName;
}
public void destroy() {
OBDataSourceV10.destory(this.dataSourceKey);
}
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@ -14,50 +15,60 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
* 数据库连接代理对象,负责创建连接重新连接
*
* @author oceanbase
*
*/
public class ObClientConnHolder extends AbstractConnHolder {
private final String jdbcUrl;
private final String userName;
private final String password;
private final String jdbcUrl;
private final String userName;
private final String password;
public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
super(config);
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
public ObClientConnHolder(Configuration config, String jdbcUrl, String userName, String password) {
super(config);
this.jdbcUrl = jdbcUrl;
this.userName = userName;
this.password = password;
}
// Connect to ob with obclient and obproxy
@Override
public Connection initConnection() {
String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl);
DataBaseType dbType = DataBaseType.OceanBase;
if (ObWriterUtils.isOracleMode()) {
// set up for writing timestamp columns
List<String> sessionConfig = config.getList(Key.SESSION, new ArrayList<String>(), String.class);
sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'");
config.set(Key.SESSION, sessionConfig);
}
conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password);
DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE);
return conn;
}
// Connect to ob with obclient and obproxy
@Override
public Connection initConnection() {
String BASIC_MESSAGE = String.format("jdbcUrl:[%s]", this.jdbcUrl);
DataBaseType dbType = DataBaseType.OceanBase;
if (ObWriterUtils.isOracleMode()) {
// set up for writing timestamp columns
List<String> sessionConfig = config.getList(Key.SESSION, new ArrayList<String>(), String.class);
sessionConfig.add("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF'");
sessionConfig.add("ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT='YYYY-MM-DD HH24:MI:SS.FF TZR TZD'");
config.set(Key.SESSION, sessionConfig);
}
conn = DBUtil.getConnection(dbType, jdbcUrl, userName, password);
DBUtil.dealWithSessionConfig(conn, config, dbType, BASIC_MESSAGE);
return conn;
}
@Override
public String getJdbcUrl() {
return jdbcUrl;
}
@Override
public String getJdbcUrl() {
return jdbcUrl;
}
@Override
public String getUserName() {
return userName;
}
@Override
public String getUserName() {
return userName;
}
@Override
public void destroy() {
DBUtil.closeDBResources(null, conn);
}
@Override
public void destroy() {
DBUtil.closeDBResources(null, conn);
}
@Override
public void doCommit() {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -5,95 +5,112 @@ import static org.apache.commons.lang3.StringUtils.EMPTY;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.datax.common.util.Configuration;
public class ServerConnectInfo {
public String clusterName;
public String tenantName;
public String userName;
public String password;
public String databaseName;
public String ipPort;
public String jdbcUrl;
public boolean publicCloud;
/**
*
* @param jdbcUrl format is jdbc:oceanbase//ip:port
* @param username format is cluster:tenant:username or username@tenant#cluster or user@tenant or user
* @param password
*/
public ServerConnectInfo(final String jdbcUrl, final String username, final String password) {
this.jdbcUrl = jdbcUrl;
this.password = password;
parseJdbcUrl(jdbcUrl);
parseFullUserName(username);
}
public String clusterName;
public String tenantName;
// userName doesn't contain tenantName or clusterName
public String userName;
public String password;
public String databaseName;
public String ipPort;
public String jdbcUrl;
public String host;
public String port;
public boolean publicCloud;
public int rpcPort;
public Configuration config;
private void parseJdbcUrl(final String jdbcUrl) {
Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w-]+)\\?");
Matcher matcher = pattern.matcher(jdbcUrl);
if (matcher.find()) {
String ipPort = matcher.group(1);
String dbName = matcher.group(2);
this.ipPort = ipPort;
this.databaseName = dbName;
this.publicCloud = ipPort.split(":")[0].endsWith("aliyuncs.com");
} else {
throw new RuntimeException("Invalid argument:" + jdbcUrl);
}
}
public ServerConnectInfo(final String jdbcUrl, final String username, final String password, Configuration config) {
this.jdbcUrl = jdbcUrl;
this.password = password;
this.config = config;
parseJdbcUrl(jdbcUrl);
parseFullUserName(username);
}
private void parseFullUserName(final String fullUserName) {
int tenantIndex = fullUserName.indexOf("@");
int clusterIndex = fullUserName.indexOf("#");
if (fullUserName.contains(":") && tenantIndex < 0) {
String[] names = fullUserName.split(":");
if (names.length != 3) {
throw new RuntimeException("invalid argument: " + fullUserName);
} else {
this.clusterName = names[0];
this.tenantName = names[1];
this.userName = names[2];
}
} else if (!publicCloud || tenantIndex < 0) {
this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex);
this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1);
// Avoid reporting errors when users do not write #
this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex < 0 ? fullUserName.length() : clusterIndex);
} else {
// If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created.
this.userName = fullUserName.substring(0, tenantIndex);
if (clusterIndex > tenantIndex) {
this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex);
this.clusterName = fullUserName.substring(clusterIndex + 1);
} else {
this.tenantName = fullUserName.substring(tenantIndex + 1);
this.clusterName = EMPTY;
}
}
}
private void parseJdbcUrl(final String jdbcUrl) {
Pattern pattern = Pattern.compile("//([\\w\\.\\-]+:\\d+)/([\\w]+)\\?");
Matcher matcher = pattern.matcher(jdbcUrl);
if (matcher.find()) {
String ipPort = matcher.group(1);
String dbName = matcher.group(2);
this.ipPort = ipPort;
this.host = ipPort.split(":")[0];
this.port = ipPort.split(":")[1];
this.databaseName = dbName;
this.publicCloud = host.endsWith("aliyuncs.com");
} else {
throw new RuntimeException("Invalid argument:" + jdbcUrl);
}
}
@Override
public String toString() {
StringBuffer strBuffer = new StringBuffer();
return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName)
.append(", userName:").append(userName).append(", databaseName:").append(databaseName)
.append(", ipPort:").append(ipPort).append(", jdbcUrl:").append(jdbcUrl).toString();
}
protected void parseFullUserName(final String fullUserName) {
int tenantIndex = fullUserName.indexOf("@");
int clusterIndex = fullUserName.indexOf("#");
// 适用于jdbcUrl以||_dsc_ob10_dsc_开头的场景
if (fullUserName.contains(":") && tenantIndex < 0) {
String[] names = fullUserName.split(":");
if (names.length != 3) {
throw new RuntimeException("invalid argument: " + fullUserName);
} else {
this.clusterName = names[0];
this.tenantName = names[1];
this.userName = names[2];
}
} else if (tenantIndex < 0) {
// 适用于short jdbcUrl且username中不含租户名主要是公有云场景此场景下不计算分区
this.userName = fullUserName;
this.clusterName = EMPTY;
this.tenantName = EMPTY;
} else {
// 适用于short jdbcUrl且username中含租户名
this.userName = fullUserName.substring(0, tenantIndex);
if (clusterIndex < 0) {
this.clusterName = EMPTY;
this.tenantName = fullUserName.substring(tenantIndex + 1);
} else {
this.clusterName = fullUserName.substring(clusterIndex + 1);
this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex);
}
}
}
public String getFullUserName() {
StringBuilder builder = new StringBuilder();
builder.append(userName);
if (!EMPTY.equals(tenantName)) {
builder.append("@").append(tenantName);
}
@Override
public String toString() {
return "ServerConnectInfo{" +
"clusterName='" + clusterName + '\'' +
", tenantName='" + tenantName + '\'' +
", userName='" + userName + '\'' +
", password='" + password + '\'' +
", databaseName='" + databaseName + '\'' +
", ipPort='" + ipPort + '\'' +
", jdbcUrl='" + jdbcUrl + '\'' +
", host='" + host + '\'' +
", publicCloud=" + publicCloud +
", rpcPort=" + rpcPort +
'}';
}
if (!EMPTY.equals(clusterName)) {
builder.append("#").append(clusterName);
}
if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) {
return this.userName;
}
return builder.toString();
}
public String getFullUserName() {
StringBuilder builder = new StringBuilder();
builder.append(userName);
if (!EMPTY.equals(tenantName)) {
builder.append("@").append(tenantName);
}
if (!EMPTY.equals(clusterName)) {
builder.append("#").append(clusterName);
}
if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) {
return this.userName;
}
return builder.toString();
}
public void setRpcPort(int rpcPort) {
this.rpcPort = rpcPort;
}
}

View File

@ -0,0 +1,127 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractInsertTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractInsertTask.class);
protected final long taskId;
protected ConcurrentTableWriterTask writerTask;
protected ConcurrentTableWriterTask.ConcurrentTableWriter writer;
protected Queue<List<Record>> queue;
protected boolean isStop;
protected Configuration config;
protected ServerConnectInfo connInfo;
protected AbstractConnHolder connHolder;
protected long totalCost = 0;
protected long insertCount = 0;
private boolean printCost = Config.DEFAULT_PRINT_COST;
private long costBound = Config.DEFAULT_COST_BOUND;
public AbstractInsertTask(final long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
this.taskId = taskId;
this.queue = recordsQueue;
this.config = config;
this.connInfo = connectInfo;
this.isStop = false;
this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST);
this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND);
this.writer = writer;
this.writerTask = task;
initConnHolder();
}
public AbstractInsertTask(final long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo) {
this.taskId = taskId;
this.queue = recordsQueue;
this.config = config;
this.connInfo = connectInfo;
this.isStop = false;
this.printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST);
this.costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND);
initConnHolder();
}
protected abstract void initConnHolder();
public void setWriterTask(ConcurrentTableWriterTask writerTask) {
this.writerTask = writerTask;
}
public void setWriter(ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
this.writer = writer;
}
private boolean isStop() {
return isStop;
}
public void setStop() {
isStop = true;
}
public AbstractConnHolder getConnHolder() {
return connHolder;
}
public void calStatistic(final long cost) {
writer.increFinishCount();
insertCount++;
totalCost += cost;
if (this.printCost && cost > this.costBound) {
LOG.info("slow multi insert cost {}ms", cost);
}
}
@Override
public void run() {
Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId()));
LOG.debug("Task {} start to execute...", taskId);
while (!isStop()) {
try {
List<Record> records = queue.poll();
if (null != records) {
write(records);
} else if (writerTask.isFinished()) {
writerTask.singalTaskFinish();
LOG.debug("not more task, thread exist ...");
break;
} else {
TimeUnit.MILLISECONDS.sleep(5);
}
} catch (InterruptedException e) {
LOG.debug("TableWriter is interrupt");
} catch (Exception e) {
LOG.warn("ERROR UNEXPECTED ", e);
break;
}
}
LOG.debug("Thread exist...");
}
protected abstract void write(List<Record> records);
public long getTotalCost() {
return totalCost;
}
public long getInsertCount() {
return insertCount;
}
public void destroy() {
if (connHolder != null) {
connHolder.destroy();
}
}
}

View File

@ -18,7 +18,9 @@ import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.IObPartCalculator
import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV1;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV2;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
import com.oceanbase.partition.calculator.enums.ObServerMode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@ -34,8 +36,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.Config.DEFAULT_SLOW_MEMSTORE_THRESHOLD;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.FAST;
import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.PAUSE;
@ -44,15 +48,15 @@ import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUt
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
// memstore_total memstore_limit 比例的阈值,一旦超过这个值,则暂停写入
private double memstoreThreshold = Config.DEFAULT_MEMSTORE_THRESHOLD;
// memstore检查的间隔
private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND;
// 最后一次检查
private long lastCheckMemstoreTime;
private volatile ObWriterUtils.LoadMode loadMode = FAST;
private volatile ObWriterUtils.LoadMode loadMode = FAST;
private static AtomicLong totalTask = new AtomicLong(0);
private long taskId = -1;
private AtomicBoolean isMemStoreFull = new AtomicBoolean(false);
@ -69,38 +73,41 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private String obUpdateColumns = null;
private String dbName;
private int calPartFailedCount = 0;
private boolean directPath;
public ConcurrentTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
taskId = totalTask.getAndIncrement();
}
public ConcurrentTableWriterTask(DataBaseType dataBaseType) {
super(dataBaseType);
taskId = totalTask.getAndIncrement();
}
@Override
public void init(Configuration config) {
super.init(config);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
@Override
public void init(Configuration config) {
super.init(config);
this.directPath = config.getBool(Config.DIRECT_PATH, false);
// OceanBase 所有操作都是 insert into on duplicate key update 模式
// writeMode应该使用enum来定义
this.writeMode = "update";
obWriteMode = config.getString(Config.OB_WRITE_MODE, "update");
ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password);
dbName = connectInfo.databaseName;
//init check memstore
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
ServerConnectInfo connectInfo = new ServerConnectInfo(jdbcUrl, username, password, config);
connectInfo.setRpcPort(config.getInt(Config.RPC_PORT, 0));
dbName = connectInfo.databaseName;
//init check memstore
this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD);
this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND,
Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND);
this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
this.isOracleCompatibleMode = ObWriterUtils.isOracleMode();
if (isOracleCompatibleMode) {
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
//在转义的情况下不翻译
if (!(table.startsWith("\"") && table.endsWith("\""))) {
table = table.toUpperCase();
}
this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl,
connectInfo.getFullUserName(), connectInfo.password);
this.isOracleCompatibleMode = ObWriterUtils.isOracleMode();
if (isOracleCompatibleMode) {
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
//在转义的情况下不翻译
if (!(table.startsWith("\"") && table.endsWith("\""))) {
table = table.toUpperCase();
}
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
connectInfo.databaseName, table));
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
connectInfo.databaseName, table));
}
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
@ -135,37 +142,37 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
return new ObPartitionCalculatorV1(connectInfo, table, columns);
}
public boolean isFinished() {
return allTaskInQueue && concurrentWriter.checkFinish();
}
public boolean allTaskInQueue() {
return allTaskInQueue;
}
public void setPutAllTaskInQueue() {
this.allTaskInQueue = true;
LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}",
concurrentWriter.getTaskQueueSize(),
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount());
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) {
// change obWriteMode to insert so the insert statement will be generated.
obWriteMode = "insert";
}
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
LOG.info("writeRecordSql :{}", this.writeRecordSql);
}
public boolean isFinished() {
return allTaskInQueue && concurrentWriter.checkFinish();
}
public boolean allTaskInQueue() {
return allTaskInQueue;
}
public void setPutAllTaskInQueue() {
this.allTaskInQueue = true;
LOG.info("ConcurrentTableWriter has put all task in queue, queueSize = {}, total = {}, finished = {}",
concurrentWriter.getTaskQueueSize(),
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount());
}
private void rewriteSql() {
Connection conn = connHolder.initConnection();
if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) {
// change obWriteMode to insert so the insert statement will be generated.
obWriteMode = "insert";
}
this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns);
LOG.info("writeRecordSql :{}", this.writeRecordSql);
}
@Override
public void prepare(Configuration writerSliceConfig) {
super.prepare(writerSliceConfig);
concurrentWriter.start();
}
public void prepare(Configuration writerSliceConfig) {
super.prepare(writerSliceConfig);
concurrentWriter.start();
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
@ -175,25 +182,25 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
int retryTimes = 0;
boolean needRetry = false;
do {
try {
if (retryTimes > 0) {
TimeUnit.SECONDS.sleep((1 << retryTimes));
DBUtil.closeDBResources(null, connection);
connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes);
}
ColumnMetaCache.init(connection, this.table, this.columns);
this.resultSetMetaData = ColumnMetaCache.getColumnMeta();
needRetry = false;
} catch (SQLException e) {
needRetry = true;
++retryTimes;
e.printStackTrace();
LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes);
} catch (InterruptedException e) {
LOG.warn("startWriteWithConnection interrupt, ignored");
} finally {
}
try {
if (retryTimes > 0) {
TimeUnit.SECONDS.sleep((1 << retryTimes));
DBUtil.closeDBResources(null, connection);
connection = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
LOG.warn("getColumnMetaData of table {} failed, retry the {} times ...", this.table, retryTimes);
}
ColumnMetaCache.init(connection, this.table, this.columns);
this.resultSetMetaData = ColumnMetaCache.getColumnMeta();
needRetry = false;
} catch (SQLException e) {
needRetry = true;
++retryTimes;
e.printStackTrace();
LOG.warn("fetch column meta of [{}] failed..., retry {} times", this.table, retryTimes);
} catch (InterruptedException e) {
LOG.warn("startWriteWithConnection interrupt, ignored");
} finally {
}
} while (needRetry && retryTimes < 100);
try {
@ -202,8 +209,8 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等直接报错
LOG.error("column not equal {} != {}, record = {}",
this.columnNumber, record.getColumnNumber(), record.toString());
LOG.error("column not equal {} != {}, record = {}",
this.columnNumber, record.getColumnNumber(), record.toString());
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
@ -223,388 +230,408 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
}
}
public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
return fillPreparedStatement(preparedStatement, record);
}
public PreparedStatement fillStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
return fillPreparedStatement(preparedStatement, record);
}
private void addLeftRecords() {
//不需要刷新Cache已经是最后一批数据了
for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0 ) {
addRecordsToWriteQueue(groupValues);
}
}
}
private void addRecordToCache(final Record record) {
Long partId =null;
try {
partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record);
} catch (Exception e1) {
if (calPartFailedCount++ < 10) {
LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record);
}
}
private void addLeftRecords() {
//不需要刷新Cache已经是最后一批数据了
for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0) {
addRecordsToWriteQueue(groupValues);
}
}
}
private void addRecordToCache(final Record record) {
Long partId = null;
try {
partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record);
} catch (Exception e1) {
if (calPartFailedCount++ < 10) {
LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record);
}
}
if (partId == null) {
LOG.debug("fail to calculate parition id, just put into the default buffer.");
partId = Long.MAX_VALUE;
}
List<Record> groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList<Record>(batchSize));
groupValues.add(record);
if (groupValues.size() >= batchSize) {
groupValues = addRecordsToWriteQueue(groupValues);
groupInsertValues.put(partId, groupValues);
}
}
List<Record> groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList<Record>(batchSize));
groupValues.add(record);
if (groupValues.size() >= batchSize) {
groupValues = addRecordsToWriteQueue(groupValues);
groupInsertValues.put(partId, groupValues);
}
}
/**
*
* @param records
* @return 返回一个新的Cache用于存储接下来的数据
*/
private List<Record> addRecordsToWriteQueue(List<Record> records) {
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(records);
break;
} catch (InterruptedException e) {
i++;
LOG.info("Concurrent table writer is interrupted");
}
}
return new ArrayList<Record>(batchSize);
}
private void checkMemStore() {
Connection checkConn = connHolder.getConn();
try {
if (checkConn == null || checkConn.isClosed()) {
checkConn = connHolder.reconnect();
}
}catch (Exception e) {
LOG.warn("Check connection is unusable");
}
/**
* @param records
* @return 返回一个新的Cache用于存储接下来的数据
*/
private List<Record> addRecordsToWriteQueue(List<Record> records) {
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(records);
break;
} catch (InterruptedException e) {
i++;
LOG.info("Concurrent table writer is interrupted");
}
}
return new ArrayList<Record>(batchSize);
}
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn);
if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) {
this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW;
LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name());
}else {
this.loadMode = FAST;
}
lastCheckMemstoreTime = now;
}
public boolean isMemStoreFull() {
return isMemStoreFull.get();
}
private void checkMemStore() {
Connection checkConn = connHolder.getConn();
try {
if (checkConn == null || checkConn.isClosed()) {
checkConn = connHolder.reconnect();
}
} catch (Exception e) {
LOG.warn("Check connection is unusable");
}
public boolean isShouldPause() {
return this.loadMode.equals(PAUSE);
}
long now = System.currentTimeMillis();
if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) {
return;
}
double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn);
if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) {
this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW;
LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name());
} else {
this.loadMode = FAST;
}
lastCheckMemstoreTime = now;
}
public boolean isShouldSlow() {
return this.loadMode.equals(SLOW);
}
public void print() {
if (LOG.isDebugEnabled()) {
LOG.debug("Statistic total task {}, finished {}, queue Size {}",
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount(),
concurrentWriter.getTaskQueueSize());
concurrentWriter.printStatistics();
}
}
public void waitTaskFinish() {
setPutAllTaskInQueue();
lock.lock();
try {
while (!concurrentWriter.checkFinish()) {
condition.await(15, TimeUnit.SECONDS);
print();
checkMemStore();
}
} catch (InterruptedException e) {
LOG.warn("Concurrent table writer wait task finish interrupt");
} finally {
lock.unlock();
}
LOG.debug("wait all InsertTask finished ...");
}
public void singalTaskFinish() {
lock.lock();
condition.signal();
lock.unlock();
}
@Override
public void destroy(Configuration writerSliceConfig) {
if(concurrentWriter!=null) {
concurrentWriter.destory();
}
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
super.destroy(writerSliceConfig);
}
public class ConcurrentTableWriter {
private BlockingQueue<List<Record>> queue;
private List<InsertTask> insertTasks;
private Configuration config;
private ServerConnectInfo connectInfo;
private String rewriteRecordSql;
private AtomicLong totalTaskCount;
private AtomicLong finishTaskCount;
private final int threadCount;
public boolean isMemStoreFull() {
return isMemStoreFull.get();
}
public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) {
threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT);
queue = new LinkedBlockingQueue<List<Record>>(threadCount << 1);
insertTasks = new ArrayList<InsertTask>(threadCount);
this.config = config;
this.connectInfo = connInfo;
this.rewriteRecordSql = rewriteRecordSql;
this.totalTaskCount = new AtomicLong(0);
this.finishTaskCount = new AtomicLong(0);
}
public long getTotalTaskCount() {
return totalTaskCount.get();
}
public long getFinishTaskCount() {
return finishTaskCount.get();
}
public int getTaskQueueSize() {
return queue.size();
}
public void increFinishCount() {
finishTaskCount.incrementAndGet();
}
//should check after put all the task in the queue
public boolean checkFinish() {
long finishCount = finishTaskCount.get();
long totalCount = totalTaskCount.get();
return finishCount == totalCount;
}
public synchronized void start() {
for (int i = 0; i < threadCount; ++i) {
LOG.info("start {} insert task.", (i+1));
InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql);
insertTask.setWriterTask(ConcurrentTableWriterTask.this);
insertTask.setWriter(this);
insertTasks.add(insertTask);
}
WriterThreadPool.executeBatch(insertTasks);
}
public void printStatistics() {
long insertTotalCost = 0;
long insertTotalCount = 0;
for (InsertTask task: insertTasks) {
insertTotalCost += task.getTotalCost();
insertTotalCount += task.getInsertCount();
}
long avgCost = 0;
if (insertTotalCount != 0) {
avgCost = insertTotalCost / insertTotalCount;
}
ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms",
insertTotalCount, insertTotalCost, avgCost);
}
public boolean isShouldPause() {
return this.loadMode.equals(PAUSE);
}
public void addBatchRecords(final List<Record> records) throws InterruptedException {
boolean isSucc = false;
while (!isSucc) {
isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS);
checkMemStore();
}
totalTaskCount.incrementAndGet();
}
public synchronized void destory() {
if (insertTasks != null) {
for(InsertTask task : insertTasks) {
task.setStop();
}
for(InsertTask task: insertTasks) {
task.destroy();
}
}
}
}
public boolean isShouldSlow() {
return this.loadMode.equals(SLOW);
}
// 直接使用了两个类变量columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
public void print() {
if (LOG.isDebugEnabled()) {
LOG.debug("Statistic total task {}, finished {}, queue Size {}",
concurrentWriter.getTotalTaskCount(),
concurrentWriter.getFinishTaskCount(),
concurrentWriter.getTaskQueueSize());
concurrentWriter.printStatistics();
}
}
return preparedStatement;
}
public void waitTaskFinish() {
setPutAllTaskInQueue();
lock.lock();
try {
while (!concurrentWriter.checkFinish()) {
condition.await(15, TimeUnit.SECONDS);
print();
checkMemStore();
}
if (directPath){
concurrentWriter.doCommit();
}
} catch (InterruptedException e) {
LOG.warn("Concurrent table writer wait task finish interrupt");
} finally {
lock.unlock();
}
LOG.debug("wait all InsertTask finished ...");
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
public void singalTaskFinish() {
lock.lock();
condition.signal();
lock.unlock();
}
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, strValue);
}
break;
@Override
public void destroy(Configuration writerSliceConfig) {
if (concurrentWriter != null) {
concurrentWriter.destory();
}
// 把本级持有的conn关闭掉
DBUtil.closeDBResources(null, connHolder.getConn());
super.destroy(writerSliceConfig);
}
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
}
break;
public class ConcurrentTableWriter {
private BlockingQueue<List<Record>> queue;
private List<AbstractInsertTask> abstractInsertTasks;
private Configuration config;
private ServerConnectInfo connectInfo;
private String rewriteRecordSql;
private AtomicLong totalTaskCount;
private AtomicLong finishTaskCount;
private final int threadCount;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
public ConcurrentTableWriter(Configuration config, ServerConnectInfo connInfo, String rewriteRecordSql) {
threadCount = config.getInt(Config.WRITER_THREAD_COUNT, Config.DEFAULT_WRITER_THREAD_COUNT);
queue = new LinkedBlockingQueue<List<Record>>(threadCount << 1);
abstractInsertTasks = new ArrayList<AbstractInsertTask>(threadCount);
this.config = config;
this.connectInfo = connInfo;
this.rewriteRecordSql = rewriteRecordSql;
this.totalTaskCount = new AtomicLong(0);
this.finishTaskCount = new AtomicLong(0);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
public long getTotalTaskCount() {
return totalTaskCount.get();
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
public long getFinishTaskCount() {
return finishTaskCount.get();
}
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
public int getTaskQueueSize() {
return queue.size();
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
public void increFinishCount() {
finishTaskCount.incrementAndGet();
}
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
//should check after put all the task in the queue
public boolean checkFinish() {
long finishCount = finishTaskCount.get();
long totalCount = totalTaskCount.get();
return finishCount == totalCount;
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BINARY:
String isArray = column.getRawData().toString();
if (isArray.startsWith("[")&&isArray.endsWith("]")){
preparedStatement.setString(columnIndex + 1, column
.asString());
}else {
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
break;
public synchronized void start() {
for (int i = 0; i < threadCount; ++i) {
LOG.info("start {} insert task.", (i + 1));
AbstractInsertTask insertTask = null;
if (directPath) {
insertTask = new DirectPathInsertTask(taskId, queue, config, connectInfo, ConcurrentTableWriterTask.this, this);
} else {
insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql);
}
insertTask.setWriterTask(ConcurrentTableWriterTask.this);
insertTask.setWriter(this);
abstractInsertTasks.add(insertTask);
}
WriterThreadPool.executeBatch(abstractInsertTasks);
}
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
public void doCommit() {
this.abstractInsertTasks.get(0).getConnHolder().doCommit();
}
public int getThreadCount() {
return threadCount;
}
public void printStatistics() {
long insertTotalCost = 0;
long insertTotalCount = 0;
for (AbstractInsertTask task : abstractInsertTasks) {
insertTotalCost += task.getTotalCost();
insertTotalCount += task.getInsertCount();
}
long avgCost = 0;
if (insertTotalCount != 0) {
avgCost = insertTotalCost / insertTotalCount;
}
ConcurrentTableWriterTask.LOG.debug("Insert {} times, totalCost {} ms, average {} ms",
insertTotalCount, insertTotalCost, avgCost);
}
public void addBatchRecords(final List<Record> records) throws InterruptedException {
boolean isSucc = false;
while (!isSucc) {
isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS);
checkMemStore();
}
totalTaskCount.incrementAndGet();
}
public synchronized void destory() {
if (abstractInsertTasks != null) {
for (AbstractInsertTask task : abstractInsertTasks) {
task.setStop();
}
for (AbstractInsertTask task : abstractInsertTasks) {
task.destroy();
}
}
}
}
public String getTable() {
return table;
}
// 直接使用了两个类变量columnNumber,resultSetMetaData
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException {
for (int i = 0; i < this.columnNumber; i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, columnSqltype, typeName, record.getColumn(i));
}
return preparedStatement;
}
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, strValue);
}
break;
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
}
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BINARY:
String isArray = column.getRawData().toString();
if (isArray.startsWith("[") && isArray.endsWith("]")) {
preparedStatement.setString(columnIndex + 1, column
.asString());
} else {
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.Table;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.common.TableCache;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathConnection;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.directPath.DirectPathPreparedStatement;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.DirectPathConnHolder;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DirectPathInsertTask extends AbstractInsertTask {
private static final Logger LOG = LoggerFactory.getLogger(DirectPathInsertTask.class);
public DirectPathInsertTask(long taskId, Queue<List<Record>> recordsQueue, Configuration config, ServerConnectInfo connectInfo, ConcurrentTableWriterTask task, ConcurrentTableWriterTask.ConcurrentTableWriter writer) {
super(taskId, recordsQueue, config, connectInfo, task, writer);
}
@Override
protected void initConnHolder() {
this.connHolder = new DirectPathConnHolder(config, connInfo, writerTask.getTable(), writer.getThreadCount());
this.connHolder.initConnection();
}
@Override
protected void write(List<Record> records) {
Table table = TableCache.getInstance().getTable(connInfo.databaseName, writerTask.getTable());
if (Table.Status.FAILURE.equals(table.getStatus())) {
return;
}
DirectPathConnection conn = (DirectPathConnection) connHolder.getConn();
if (records != null && !records.isEmpty()) {
long startTime = System.currentTimeMillis();
try (DirectPathPreparedStatement stmt = conn.createStatement()) {
final int columnNumber = records.get(0).getColumnNumber();
Object[] values = new Object[columnNumber];
for (Record record : records) {
for (int i = 0; i < columnNumber; i++) {
values[i] = record.getColumn(i).getRawData();
}
stmt.addBatch(values);
}
int[] result = stmt.executeBatch();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Insert {} rows success", Thread.currentThread().getName(), Arrays.stream(result).sum());
}
calStatistic(System.currentTimeMillis() - startTime);
stmt.clearBatch();
} catch (Throwable ex) {
String msg = MessageFormat.format("Insert data into table \"{0}\" failed. Error: {1}", writerTask.getTable(), ex.getMessage());
LOG.error(msg, ex);
table.setError(ex);
table.setStatus(Table.Status.FAILURE);
throw new RuntimeException(msg);
}
}
}
}

View File

@ -20,7 +20,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class InsertTask implements Runnable {
public class InsertTask extends AbstractInsertTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class);
@ -49,6 +49,7 @@ public class InsertTask implements Runnable {
Configuration config,
ServerConnectInfo connectInfo,
String writeRecordSql) {
super(taskId, recordsQueue, config, connectInfo);
this.taskId = taskId;
this.queue = recordsQueue;
this.connInfo = connectInfo;
@ -62,11 +63,15 @@ public class InsertTask implements Runnable {
connHolder.initConnection();
}
void setWriterTask(ConcurrentTableWriterTask writerTask) {
protected void initConnHolder() {
}
public void setWriterTask(ConcurrentTableWriterTask writerTask) {
this.writerTask = writerTask;
}
void setWriter(ConcurrentTableWriter writer) {
public void setWriter(ConcurrentTableWriter writer) {
this.writer = writer;
}
@ -109,6 +114,10 @@ public class InsertTask implements Runnable {
LOG.debug("Thread exist...");
}
protected void write(List<Record> records) {
}
public void destroy() {
connHolder.destroy();
}

View File

@ -4,6 +4,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,8 +30,8 @@ public class WriterThreadPool {
executorService.execute(task);
}
public static synchronized void executeBatch(List<InsertTask> tasks) {
for (InsertTask task : tasks) {
public static synchronized void executeBatch(List<AbstractInsertTask> tasks) {
for (AbstractInsertTask task : tasks) {
executorService.execute(task);
}
}