mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 21:11:11 +08:00
428 lines
10 KiB
Markdown
428 lines
10 KiB
Markdown
# SelectdbWriter 插件文档
|
||
|
||
## 1 快速介绍
|
||
SelectdbWriter支持将大批量数据写入SELECTDB中。
|
||
|
||
## 2 实现原理
|
||
SelectdbWriter 通过调用selectdb api (/copy/upload),返回一个重定向的S3地址,使用Http向S3地址发送字节流,设置参数达到要求时执行copy into
|
||
|
||
## 3 编译
|
||
|
||
1. 运行 init-env.sh
|
||
|
||
2. 编译 selectdbwriter:
|
||
|
||
i. 单独编译 selectdbwriter 插件:
|
||
|
||
```text
|
||
mvn clean install -pl plugin-rdbms-util,selectdbwriter -DskipTests
|
||
```
|
||
|
||
|
||
ii.编译整个 DataX 项目:
|
||
|
||
```text
|
||
mvn package assembly:assembly -Dmaven.test.skip=true
|
||
```
|
||
产出在 target/datax/datax/.
|
||
hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。
|
||
|
||
|
||
iii.编译错误
|
||
|
||
如遇到如下编译错误:
|
||
```text
|
||
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT
|
||
```
|
||
|
||
可尝试以下方式解决:
|
||
|
||
a.下载 alibaba-datax-maven-m2-20210928.tar.gz
|
||
|
||
b.解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
|
||
|
||
c.再次尝试编译。
|
||
|
||
## 3 功能说明
|
||
|
||
### 3.1 配置样例
|
||
|
||
这里是一份从Stream读取数据后导入至selectdb的配置文件。
|
||
|
||
```
|
||
{
|
||
"job":{
|
||
"content":[
|
||
{
|
||
"reader":{
|
||
"name":"streamreader",
|
||
"parameter":{
|
||
"column":[
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,5"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,5"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,21"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,20"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,10"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,100"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,1"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,1"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,64"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,20"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,31"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,3"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,3"
|
||
},
|
||
{
|
||
"type":"long",
|
||
"random":"0,19"
|
||
},
|
||
{
|
||
"type":"date",
|
||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||
},
|
||
{
|
||
"type":"string",
|
||
"random":"0,1"
|
||
}
|
||
],
|
||
"sliceRecordCount":10
|
||
}
|
||
},
|
||
"writer":{
|
||
"name":"selectdbwriter",
|
||
"parameter":{
|
||
"loadUrl":[
|
||
"xxx:47150"
|
||
],
|
||
"loadProps":{
|
||
"file.type":"json",
|
||
"file.strip_outer_array":"true"
|
||
},
|
||
"column":[
|
||
"id",
|
||
"table_id",
|
||
"table_no",
|
||
"table_name",
|
||
"table_status",
|
||
"no_disturb",
|
||
"dinner_type",
|
||
"member_id",
|
||
"reserve_bill_no",
|
||
"pre_order_no",
|
||
"queue_num",
|
||
"person_num",
|
||
"open_time",
|
||
"open_time_format",
|
||
"order_time",
|
||
"order_time_format",
|
||
"table_bill_id",
|
||
"offer_time",
|
||
"offer_time_format",
|
||
"confirm_bill_time",
|
||
"confirm_bill_time_format",
|
||
"bill_time",
|
||
"bill_time_format",
|
||
"clear_time",
|
||
"clear_time_format",
|
||
"table_message",
|
||
"bill_close",
|
||
"table_type",
|
||
"pad_mac",
|
||
"company_id",
|
||
"shop_id",
|
||
"is_sync",
|
||
"table_split_no",
|
||
"ts",
|
||
"ts_format",
|
||
"dr"
|
||
],
|
||
"username":"admin",
|
||
"password":"SelectDB2022",
|
||
"postSql":[
|
||
|
||
],
|
||
"preSql":[
|
||
|
||
],
|
||
"connection":[
|
||
{
|
||
"jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
|
||
"table":[
|
||
"ods_pos_pro_table_dynamic_delta_v4"
|
||
],
|
||
"selectedDatabase":"cl_test"
|
||
}
|
||
],
|
||
"maxBatchRows":1000000,
|
||
"maxBatchByteSize":536870912000
|
||
}
|
||
}
|
||
}
|
||
],
|
||
"setting":{
|
||
"errorLimit":{
|
||
"percentage":0.02,
|
||
"record":0
|
||
},
|
||
"speed":{
|
||
"channel":5
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
```
|
||
|
||
### 3.2 参数说明
|
||
|
||
```text
|
||
**jdbcUrl**
|
||
|
||
- 描述:selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。
|
||
- 必选:是
|
||
- 默认值:无
|
||
|
||
* **loadUrl**
|
||
|
||
- 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-link,port 是selectdb 集群的 http_port
|
||
- 必选:是
|
||
- 默认值:无
|
||
|
||
* **username**
|
||
|
||
- 描述:访问selectdb数据库的用户名
|
||
- 必选:是
|
||
- 默认值:无
|
||
|
||
* **password**
|
||
|
||
- 描述:访问selectdb数据库的密码
|
||
- 必选:否
|
||
- 默认值:空
|
||
|
||
* **connection.selectedDatabase**
|
||
- 描述:需要写入的selectdb数据库名称。
|
||
- 必选:是
|
||
- 默认值:无
|
||
|
||
* **connection.table**
|
||
- 描述:需要写入的selectdb表名称。
|
||
- 必选:是
|
||
- 默认值:无
|
||
|
||
* **column**
|
||
|
||
- 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
|
||
- 必选:是
|
||
- 默认值:否
|
||
|
||
* **preSql**
|
||
|
||
- 描述:写入数据到目的表前,会先执行这里的标准语句。
|
||
- 必选:否
|
||
- 默认值:无
|
||
|
||
* **postSql**
|
||
|
||
- 描述:写入数据到目的表后,会执行这里的标准语句。
|
||
- 必选:否
|
||
- 默认值:无
|
||
|
||
|
||
* **maxBatchRows**
|
||
|
||
- 描述:每批次导入数据的最大行数。和 **batchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
|
||
- 必选:否
|
||
- 默认值:500000
|
||
|
||
* **batchSize**
|
||
|
||
- 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
|
||
- 必选:否
|
||
- 默认值:90M
|
||
|
||
* **maxRetries**
|
||
|
||
- 描述:每批次导入数据失败后的重试次数。
|
||
- 必选:否
|
||
- 默认值:3
|
||
|
||
* **labelPrefix**
|
||
|
||
- 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入
|
||
- 必选:否
|
||
- 默认值:`datax_selectdb_writer_`
|
||
|
||
* **loadProps**
|
||
|
||
- 描述:COPY INOT 的请求参数
|
||
|
||
这里包括导入的数据格式:file.type等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分
|
||
|
||
- 必选:否
|
||
|
||
- 默认值:无
|
||
|
||
* **clusterName**
|
||
|
||
- 描述:selectdb could 集群名称
|
||
|
||
- 必选:否
|
||
|
||
- 默认值:无
|
||
|
||
* **flushQueueLength**
|
||
|
||
- 描述:队列长度
|
||
|
||
- 必选:否
|
||
|
||
- 默认值:1
|
||
|
||
* **flushInterval**
|
||
|
||
- 描述:数据写入批次的时间间隔,如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。
|
||
|
||
- 必选:否
|
||
|
||
- 默认值:30000ms
|
||
```
|
||
|
||
### 类型转换
|
||
|
||
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。
|
||
|
||
默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可:
|
||
|
||
```json
|
||
"loadProps": {
|
||
"file.column_separator": "\\x01",
|
||
"file.line_delimiter": "\\x02"
|
||
}
|
||
```
|
||
|
||
如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
|
||
```json
|
||
"loadProps": {
|
||
"file.type": "json",
|
||
"file.strip_outer_array": true
|
||
}
|
||
``` |