mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:59:51 +08:00
Merge pull request #1624 from caoliang-web/caoliang
Add selectdbwriter plugin
This commit is contained in:
commit
500d96303d
@ -490,5 +490,12 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>selectdbwriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
</assembly>
|
||||
|
1
pom.xml
1
pom.xml
@ -121,6 +121,7 @@
|
||||
<module>cassandrawriter</module>
|
||||
<module>clickhousewriter</module>
|
||||
<module>doriswriter</module>
|
||||
<module>selectdbwriter</module>
|
||||
<module>adbmysqlwriter</module>
|
||||
|
||||
<!-- common support module -->
|
||||
|
428
selectdbwriter/doc/selectdbwriter.md
Normal file
428
selectdbwriter/doc/selectdbwriter.md
Normal file
@ -0,0 +1,428 @@
|
||||
# SelectdbWriter 插件文档
|
||||
|
||||
## 1 快速介绍
|
||||
SelectdbWriter支持将大批量数据写入SELECTDB中。
|
||||
|
||||
## 2 实现原理
|
||||
SelectdbWriter 通过调用selectdb api (/copy/upload),返回一个重定向的S3地址,使用Http向S3地址发送字节流,设置参数达到要求时执行copy into
|
||||
|
||||
## 3 编译
|
||||
|
||||
1. 运行 init-env.sh
|
||||
|
||||
2. 编译 selectdbwriter:
|
||||
|
||||
i. 单独编译 selectdbwriter 插件:
|
||||
|
||||
```text
|
||||
mvn clean install -pl plugin-rdbms-util,selectdbwriter -DskipTests
|
||||
```
|
||||
|
||||
|
||||
ii.编译整个 DataX 项目:
|
||||
|
||||
```text
|
||||
mvn package assembly:assembly -Dmaven.test.skip=true
|
||||
```
|
||||
产出在 target/datax/datax/.
|
||||
hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。
|
||||
|
||||
|
||||
iii.编译错误
|
||||
|
||||
如遇到如下编译错误:
|
||||
```text
|
||||
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT
|
||||
```
|
||||
|
||||
可尝试以下方式解决:
|
||||
|
||||
a.下载 alibaba-datax-maven-m2-20210928.tar.gz
|
||||
|
||||
b.解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
|
||||
|
||||
c.再次尝试编译。
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
这里是一份从Stream读取数据后导入至selectdb的配置文件。
|
||||
|
||||
```
|
||||
{
|
||||
"job":{
|
||||
"content":[
|
||||
{
|
||||
"reader":{
|
||||
"name":"streamreader",
|
||||
"parameter":{
|
||||
"column":[
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,5"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,5"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,21"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,20"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,10"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,100"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,1"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,1"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,64"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,20"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,31"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,3"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,3"
|
||||
},
|
||||
{
|
||||
"type":"long",
|
||||
"random":"0,19"
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"random":"0,1"
|
||||
}
|
||||
],
|
||||
"sliceRecordCount":10
|
||||
}
|
||||
},
|
||||
"writer":{
|
||||
"name":"selectdbwriter",
|
||||
"parameter":{
|
||||
"loadUrl":[
|
||||
"xxx:47150"
|
||||
],
|
||||
"loadProps":{
|
||||
"file.type":"json",
|
||||
"file.strip_outer_array":"true"
|
||||
},
|
||||
"column":[
|
||||
"id",
|
||||
"table_id",
|
||||
"table_no",
|
||||
"table_name",
|
||||
"table_status",
|
||||
"no_disturb",
|
||||
"dinner_type",
|
||||
"member_id",
|
||||
"reserve_bill_no",
|
||||
"pre_order_no",
|
||||
"queue_num",
|
||||
"person_num",
|
||||
"open_time",
|
||||
"open_time_format",
|
||||
"order_time",
|
||||
"order_time_format",
|
||||
"table_bill_id",
|
||||
"offer_time",
|
||||
"offer_time_format",
|
||||
"confirm_bill_time",
|
||||
"confirm_bill_time_format",
|
||||
"bill_time",
|
||||
"bill_time_format",
|
||||
"clear_time",
|
||||
"clear_time_format",
|
||||
"table_message",
|
||||
"bill_close",
|
||||
"table_type",
|
||||
"pad_mac",
|
||||
"company_id",
|
||||
"shop_id",
|
||||
"is_sync",
|
||||
"table_split_no",
|
||||
"ts",
|
||||
"ts_format",
|
||||
"dr"
|
||||
],
|
||||
"username":"admin",
|
||||
"password":"SelectDB2022",
|
||||
"postSql":[
|
||||
|
||||
],
|
||||
"preSql":[
|
||||
|
||||
],
|
||||
"connection":[
|
||||
{
|
||||
"jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
|
||||
"table":[
|
||||
"ods_pos_pro_table_dynamic_delta_v4"
|
||||
],
|
||||
"selectedDatabase":"cl_test"
|
||||
}
|
||||
],
|
||||
"maxBatchRows":1000000,
|
||||
"maxBatchByteSize":536870912000
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting":{
|
||||
"errorLimit":{
|
||||
"percentage":0.02,
|
||||
"record":0
|
||||
},
|
||||
"speed":{
|
||||
"channel":5
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### 3.2 参数说明
|
||||
|
||||
```text
|
||||
**jdbcUrl**
|
||||
|
||||
- 描述:selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。
|
||||
- 必选:是
|
||||
- 默认值:无
|
||||
|
||||
* **loadUrl**
|
||||
|
||||
- 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-link,port 是selectdb 集群的 http_port
|
||||
- 必选:是
|
||||
- 默认值:无
|
||||
|
||||
* **username**
|
||||
|
||||
- 描述:访问selectdb数据库的用户名
|
||||
- 必选:是
|
||||
- 默认值:无
|
||||
|
||||
* **password**
|
||||
|
||||
- 描述:访问selectdb数据库的密码
|
||||
- 必选:否
|
||||
- 默认值:空
|
||||
|
||||
* **connection.selectedDatabase**
|
||||
- 描述:需要写入的selectdb数据库名称。
|
||||
- 必选:是
|
||||
- 默认值:无
|
||||
|
||||
* **connection.table**
|
||||
- 描述:需要写入的selectdb表名称。
|
||||
- 必选:是
|
||||
- 默认值:无
|
||||
|
||||
* **column**
|
||||
|
||||
- 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
|
||||
- 必选:是
|
||||
- 默认值:否
|
||||
|
||||
* **preSql**
|
||||
|
||||
- 描述:写入数据到目的表前,会先执行这里的标准语句。
|
||||
- 必选:否
|
||||
- 默认值:无
|
||||
|
||||
* **postSql**
|
||||
|
||||
- 描述:写入数据到目的表后,会执行这里的标准语句。
|
||||
- 必选:否
|
||||
- 默认值:无
|
||||
|
||||
|
||||
* **maxBatchRows**
|
||||
|
||||
- 描述:每批次导入数据的最大行数。和 **batchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
|
||||
- 必选:否
|
||||
- 默认值:500000
|
||||
|
||||
* **batchSize**
|
||||
|
||||
- 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
|
||||
- 必选:否
|
||||
- 默认值:90M
|
||||
|
||||
* **maxRetries**
|
||||
|
||||
- 描述:每批次导入数据失败后的重试次数。
|
||||
- 必选:否
|
||||
- 默认值:3
|
||||
|
||||
* **labelPrefix**
|
||||
|
||||
- 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入
|
||||
- 必选:否
|
||||
- 默认值:`datax_selectdb_writer_`
|
||||
|
||||
* **loadProps**
|
||||
|
||||
- 描述:COPY INOT 的请求参数
|
||||
|
||||
这里包括导入的数据格式:file.type等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分
|
||||
|
||||
- 必选:否
|
||||
|
||||
- 默认值:无
|
||||
|
||||
* **clusterName**
|
||||
|
||||
- 描述:selectdb could 集群名称
|
||||
|
||||
- 必选:否
|
||||
|
||||
- 默认值:无
|
||||
|
||||
* **flushQueueLength**
|
||||
|
||||
- 描述:队列长度
|
||||
|
||||
- 必选:否
|
||||
|
||||
- 默认值:1
|
||||
|
||||
* **flushInterval**
|
||||
|
||||
- 描述:数据写入批次的时间间隔,如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。
|
||||
|
||||
- 必选:否
|
||||
|
||||
- 默认值:30000ms
|
||||
```
|
||||
|
||||
### 类型转换
|
||||
|
||||
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。
|
||||
|
||||
默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可:
|
||||
|
||||
```json
|
||||
"loadProps": {
|
||||
"file.column_separator": "\\x01",
|
||||
"file.line_delimiter": "\\x02"
|
||||
}
|
||||
```
|
||||
|
||||
如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
|
||||
```json
|
||||
"loadProps": {
|
||||
"file.type": "json",
|
||||
"file.strip_outer_array": true
|
||||
}
|
||||
```
|
93
selectdbwriter/doc/stream2selectdb.json
Normal file
93
selectdbwriter/doc/stream2selectdb.json
Normal file
@ -0,0 +1,93 @@
|
||||
{
|
||||
"core":{
|
||||
"transport":{
|
||||
"channel":{
|
||||
"speed":{
|
||||
"byte":10485760
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"job":{
|
||||
"content":[
|
||||
{
|
||||
"reader":{
|
||||
"name":"streamreader",
|
||||
"parameter":{
|
||||
"column":[
|
||||
{
|
||||
"type":"string",
|
||||
"value":"DataX"
|
||||
},
|
||||
{
|
||||
"type":"int",
|
||||
"value":19890604
|
||||
},
|
||||
{
|
||||
"type":"date",
|
||||
"value":"1989-06-04 00:00:00"
|
||||
},
|
||||
{
|
||||
"type":"bool",
|
||||
"value":true
|
||||
},
|
||||
{
|
||||
"type":"string",
|
||||
"value":"test"
|
||||
}
|
||||
],
|
||||
"sliceRecordCount":1000000
|
||||
}
|
||||
},
|
||||
"writer":{
|
||||
"name":"selectdbwriter",
|
||||
"parameter":{
|
||||
"loadUrl":[
|
||||
"xxx:35871"
|
||||
],
|
||||
"loadProps":{
|
||||
"file.type":"json",
|
||||
"file.strip_outer_array":"true"
|
||||
},
|
||||
"database":"db1",
|
||||
"column":[
|
||||
"k1",
|
||||
"k2",
|
||||
"k3",
|
||||
"k4",
|
||||
"k5"
|
||||
],
|
||||
"username":"admin",
|
||||
"password":"SelectDB2022",
|
||||
"postSql":[
|
||||
|
||||
],
|
||||
"preSql":[
|
||||
|
||||
],
|
||||
"connection":[
|
||||
{
|
||||
"jdbcUrl":"jdbc:mysql://xxx:32386/cl_test",
|
||||
"table":[
|
||||
"test_selectdb"
|
||||
],
|
||||
"selectedDatabase":"cl_test"
|
||||
}
|
||||
],
|
||||
"maxBatchRows":200000,
|
||||
"batchSize":53687091200
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting":{
|
||||
"errorLimit":{
|
||||
"percentage":0.02,
|
||||
"record":0
|
||||
},
|
||||
"speed":{
|
||||
"byte":10485760
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
96
selectdbwriter/pom.xml
Normal file
96
selectdbwriter/pom.xml
Normal file
@ -0,0 +1,96 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>datax-all</artifactId>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>selectdbwriter</artifactId>
|
||||
<name>selectdbwriter</name>
|
||||
<packaging>jar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>plugin-rdbms-util</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>${mysql.driver.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.5.13</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>2.13.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>2.13.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.13.3</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- compiler plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>${jdk-version}</source>
|
||||
<target>${jdk-version}</target>
|
||||
<encoding>${project-sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<!-- assembly plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||
</descriptors>
|
||||
<finalName>datax</finalName>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>dwzip</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
34
selectdbwriter/src/main/assembly/package.xml
Normal file
34
selectdbwriter/src/main/assembly/package.xml
Normal file
@ -0,0 +1,34 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id/>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>plugin.json</include>
|
||||
<include>plugin_job_template.json</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/selectdbwriter</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>selectdbwriter-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/selectdbwriter</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>plugin/writer/selectdbwriter/libs</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -0,0 +1,23 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class BaseResponse<T> {
|
||||
private int code;
|
||||
private String msg;
|
||||
private T data;
|
||||
private int count;
|
||||
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public String getMsg() {
|
||||
return msg;
|
||||
}
|
||||
|
||||
public T getData(){
|
||||
return data;
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class CopyIntoResp extends BaseResponse{
|
||||
private String code;
|
||||
private String exception;
|
||||
|
||||
private Map<String,String> result;
|
||||
|
||||
public String getDataCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public String getException() {
|
||||
return exception;
|
||||
}
|
||||
|
||||
public Map<String, String> getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class CopySQLBuilder {
|
||||
private final static String COPY_SYNC = "copy.async";
|
||||
private final String fileName;
|
||||
private final Keys options;
|
||||
private Map<String, Object> properties;
|
||||
|
||||
|
||||
|
||||
public CopySQLBuilder(Keys options, String fileName) {
|
||||
this.options=options;
|
||||
this.fileName=fileName;
|
||||
this.properties=options.getLoadProps();
|
||||
}
|
||||
|
||||
public String buildCopySQL(){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("COPY INTO ")
|
||||
.append(options.getDatabase() + "." + options.getTable())
|
||||
.append(" FROM @~('").append(fileName).append("') ")
|
||||
.append("PROPERTIES (");
|
||||
|
||||
//copy into must be sync
|
||||
properties.put(COPY_SYNC,false);
|
||||
StringJoiner props = new StringJoiner(",");
|
||||
for(Map.Entry<String,Object> entry : properties.entrySet()){
|
||||
String key = String.valueOf(entry.getKey());
|
||||
String value = String.valueOf(entry.getValue());
|
||||
String prop = String.format("'%s'='%s'",key,value);
|
||||
props.add(prop);
|
||||
}
|
||||
sb.append(props).append(" )");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class DelimiterParser {
|
||||
|
||||
private static final String HEX_STRING = "0123456789ABCDEF";
|
||||
|
||||
public static String parse(String sp, String dSp) throws RuntimeException {
|
||||
if ( Strings.isNullOrEmpty(sp)) {
|
||||
return dSp;
|
||||
}
|
||||
if (!sp.toUpperCase().startsWith("\\X")) {
|
||||
return sp;
|
||||
}
|
||||
String hexStr = sp.substring(2);
|
||||
// check hex str
|
||||
if (hexStr.isEmpty()) {
|
||||
throw new RuntimeException("Failed to parse delimiter: Hex str is empty");
|
||||
}
|
||||
if (hexStr.length() % 2 != 0) {
|
||||
throw new RuntimeException("Failed to parse delimiter: Hex str length error");
|
||||
}
|
||||
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
|
||||
if (HEX_STRING.indexOf(hexChar) == -1) {
|
||||
throw new RuntimeException("Failed to parse delimiter: Hex str format error");
|
||||
}
|
||||
}
|
||||
// transform to separator
|
||||
StringWriter writer = new StringWriter();
|
||||
for (byte b : hexStrToBytes(hexStr)) {
|
||||
writer.append((char) b);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
private static byte[] hexStrToBytes(String hexStr) {
|
||||
String upperHexStr = hexStr.toUpperCase();
|
||||
int length = upperHexStr.length() / 2;
|
||||
char[] hexChars = upperHexStr.toCharArray();
|
||||
byte[] bytes = new byte[length];
|
||||
for (int i = 0; i < length; i++) {
|
||||
int pos = i * 2;
|
||||
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte charToByte(char c) {
|
||||
return (byte) HEX_STRING.indexOf(c);
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class HttpPostBuilder {
|
||||
String url;
|
||||
Map<String, String> header;
|
||||
HttpEntity httpEntity;
|
||||
public HttpPostBuilder() {
|
||||
header = new HashMap<>();
|
||||
}
|
||||
|
||||
public HttpPostBuilder setUrl(String url) {
|
||||
this.url = url;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPostBuilder addCommonHeader() {
|
||||
header.put(HttpHeaders.EXPECT, "100-continue");
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPostBuilder baseAuth(String user, String password) {
|
||||
final String authInfo = user + ":" + password;
|
||||
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
|
||||
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPostBuilder setEntity(HttpEntity httpEntity) {
|
||||
this.httpEntity = httpEntity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPost build() {
|
||||
SelectdbUtil.checkNotNull(url);
|
||||
SelectdbUtil.checkNotNull(httpEntity);
|
||||
HttpPost put = new HttpPost(url);
|
||||
header.forEach(put::setHeader);
|
||||
put.setEntity(httpEntity);
|
||||
return put;
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class HttpPutBuilder {
|
||||
String url;
|
||||
Map<String, String> header;
|
||||
HttpEntity httpEntity;
|
||||
public HttpPutBuilder() {
|
||||
header = new HashMap<>();
|
||||
}
|
||||
|
||||
public HttpPutBuilder setUrl(String url) {
|
||||
this.url = url;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPutBuilder addFileName(String fileName){
|
||||
header.put("fileName", fileName);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPutBuilder setEmptyEntity() {
|
||||
try {
|
||||
this.httpEntity = new StringEntity("");
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPutBuilder addCommonHeader() {
|
||||
header.put(HttpHeaders.EXPECT, "100-continue");
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPutBuilder baseAuth(String user, String password) {
|
||||
final String authInfo = user + ":" + password;
|
||||
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
|
||||
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPutBuilder setEntity(HttpEntity httpEntity) {
|
||||
this.httpEntity = httpEntity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpPut build() {
|
||||
SelectdbUtil.checkNotNull(url);
|
||||
SelectdbUtil.checkNotNull(httpEntity);
|
||||
HttpPut put = new HttpPut(url);
|
||||
header.forEach(put::setHeader);
|
||||
put.setEntity(httpEntity);
|
||||
return put;
|
||||
}
|
||||
}
|
@ -0,0 +1,186 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Keys implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1l;
|
||||
private static final int DEFAULT_MAX_RETRIES = 3;
|
||||
private static final int BATCH_ROWS = 500000;
|
||||
private static final long DEFAULT_FLUSH_INTERVAL = 30000;
|
||||
|
||||
private static final String LOAD_PROPS_FORMAT = "file.type";
|
||||
public enum StreamLoadFormat {
|
||||
CSV, JSON;
|
||||
}
|
||||
|
||||
private static final String USERNAME = "username";
|
||||
private static final String PASSWORD = "password";
|
||||
private static final String DATABASE = "connection[0].selectedDatabase";
|
||||
private static final String TABLE = "connection[0].table[0]";
|
||||
private static final String COLUMN = "column";
|
||||
private static final String PRE_SQL = "preSql";
|
||||
private static final String POST_SQL = "postSql";
|
||||
private static final String JDBC_URL = "connection[0].jdbcUrl";
|
||||
private static final String LABEL_PREFIX = "labelPrefix";
|
||||
private static final String MAX_BATCH_ROWS = "maxBatchRows";
|
||||
private static final String MAX_BATCH_SIZE = "batchSize";
|
||||
private static final String FLUSH_INTERVAL = "flushInterval";
|
||||
private static final String LOAD_URL = "loadUrl";
|
||||
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
||||
private static final String LOAD_PROPS = "loadProps";
|
||||
|
||||
private static final String DEFAULT_LABEL_PREFIX = "datax_selectdb_writer_";
|
||||
|
||||
private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M
|
||||
|
||||
private static final String CLUSTER_NAME = "clusterName";
|
||||
|
||||
private static final String MAX_RETRIES = "maxRetries";
|
||||
private final Configuration options;
|
||||
|
||||
private List<String> infoSchemaColumns;
|
||||
private List<String> userSetColumns;
|
||||
private boolean isWildcardColumn;
|
||||
|
||||
public Keys ( Configuration options) {
|
||||
this.options = options;
|
||||
this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
|
||||
if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) {
|
||||
this.isWildcardColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
public void doPretreatment() {
|
||||
validateRequired();
|
||||
validateStreamLoadUrl();
|
||||
}
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return options.getString(JDBC_URL);
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return options.getString(DATABASE);
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return options.getString(TABLE);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return options.getString(USERNAME);
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return options.getString(PASSWORD);
|
||||
}
|
||||
|
||||
public String getClusterName(){
|
||||
return options.getString(CLUSTER_NAME);
|
||||
}
|
||||
|
||||
public String getLabelPrefix() {
|
||||
String label = options.getString(LABEL_PREFIX);
|
||||
return null == label ? DEFAULT_LABEL_PREFIX : label;
|
||||
}
|
||||
|
||||
public List<String> getLoadUrlList() {
|
||||
return options.getList(LOAD_URL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getColumns() {
|
||||
if (isWildcardColumn) {
|
||||
return this.infoSchemaColumns;
|
||||
}
|
||||
return this.userSetColumns;
|
||||
}
|
||||
|
||||
public boolean isWildcardColumn() {
|
||||
return this.isWildcardColumn;
|
||||
}
|
||||
|
||||
public void setInfoCchemaColumns(List<String> cols) {
|
||||
this.infoSchemaColumns = cols;
|
||||
}
|
||||
|
||||
public List<String> getPreSqlList() {
|
||||
return options.getList(PRE_SQL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPostSqlList() {
|
||||
return options.getList(POST_SQL, String.class);
|
||||
}
|
||||
|
||||
public Map<String, Object> getLoadProps() {
|
||||
return options.getMap(LOAD_PROPS);
|
||||
}
|
||||
|
||||
public int getMaxRetries() {
|
||||
Integer retries = options.getInt(MAX_RETRIES);
|
||||
return null == retries ? DEFAULT_MAX_RETRIES : retries;
|
||||
}
|
||||
|
||||
public int getBatchRows() {
|
||||
Integer rows = options.getInt(MAX_BATCH_ROWS);
|
||||
return null == rows ? BATCH_ROWS : rows;
|
||||
}
|
||||
|
||||
public long getBatchSize() {
|
||||
Long size = options.getLong(MAX_BATCH_SIZE);
|
||||
return null == size ? DEFAULT_MAX_BATCH_SIZE : size;
|
||||
}
|
||||
|
||||
public long getFlushInterval() {
|
||||
Long interval = options.getLong(FLUSH_INTERVAL);
|
||||
return null == interval ? DEFAULT_FLUSH_INTERVAL : interval;
|
||||
}
|
||||
|
||||
public int getFlushQueueLength() {
|
||||
Integer len = options.getInt(FLUSH_QUEUE_LENGTH);
|
||||
return null == len ? 1 : len;
|
||||
}
|
||||
|
||||
|
||||
public StreamLoadFormat getStreamLoadFormat() {
|
||||
Map<String, Object> loadProps = getLoadProps();
|
||||
if (null == loadProps) {
|
||||
return StreamLoadFormat.CSV;
|
||||
}
|
||||
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
|
||||
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
|
||||
return StreamLoadFormat.JSON;
|
||||
}
|
||||
return StreamLoadFormat.CSV;
|
||||
}
|
||||
|
||||
private void validateStreamLoadUrl() {
|
||||
List<String> urlList = getLoadUrlList();
|
||||
for (String host : urlList) {
|
||||
if (host.split(":").length < 2) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
"The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRequired() {
|
||||
final String[] requiredOptionKeys = new String[]{
|
||||
USERNAME,
|
||||
DATABASE,
|
||||
TABLE,
|
||||
COLUMN,
|
||||
LOAD_URL
|
||||
};
|
||||
for (String optionKey : requiredOptionKeys) {
|
||||
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
|
||||
public class SelectdbBaseCodec {
|
||||
protected String convertionField( Column col) {
|
||||
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
|
||||
return null;
|
||||
}
|
||||
if ( Column.Type.BOOL == col.getType()) {
|
||||
return String.valueOf(col.asLong());
|
||||
}
|
||||
if ( Column.Type.BYTES == col.getType()) {
|
||||
byte[] bts = (byte[])col.getRawData();
|
||||
long value = 0;
|
||||
for (int i = 0; i < bts.length; i++) {
|
||||
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
|
||||
}
|
||||
return String.valueOf(value);
|
||||
}
|
||||
return col.asString();
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public interface SelectdbCodec extends Serializable {
|
||||
|
||||
String codec( Record row);
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SelectdbCodecFactory {
|
||||
public SelectdbCodecFactory (){
|
||||
|
||||
}
|
||||
public static SelectdbCodec createCodec( Keys writerOptions) {
|
||||
if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
||||
Map<String, Object> props = writerOptions.getLoadProps();
|
||||
return new SelectdbCsvCodec (null == props || !props.containsKey("file.column_separator") ? null : String.valueOf(props.get("file.column_separator")));
|
||||
}
|
||||
if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
||||
return new SelectdbJsonCodec (writerOptions.getColumns());
|
||||
}
|
||||
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
|
||||
}
|
||||
}
|
@ -0,0 +1,233 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.entity.InputStreamEntity;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class SelectdbCopyIntoObserver {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SelectdbCopyIntoObserver.class);
|
||||
|
||||
private Keys options;
|
||||
private long pos;
|
||||
public static final int SUCCESS = 0;
|
||||
public static final String FAIL = "1";
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private final HttpClientBuilder httpClientBuilder = HttpClients
|
||||
.custom()
|
||||
.disableRedirectHandling();
|
||||
private CloseableHttpClient httpClient;
|
||||
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
|
||||
private static final String COMMIT_PATTERN = "%s/copy/query";
|
||||
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded");
|
||||
|
||||
|
||||
public SelectdbCopyIntoObserver(Keys options) {
|
||||
this.options = options;
|
||||
this.httpClient = httpClientBuilder.build();
|
||||
|
||||
}
|
||||
|
||||
public void streamLoad(WriterTuple data) throws Exception {
|
||||
String host = getLoadHost();
|
||||
if (host == null) {
|
||||
throw new RuntimeException("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
|
||||
}
|
||||
String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
|
||||
String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
|
||||
put(uploadAddress, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
|
||||
executeCopy(host,data.getLabel());
|
||||
|
||||
}
|
||||
|
||||
private String getUploadAddress(String loadUrl, String fileName) throws IOException {
|
||||
HttpPutBuilder putBuilder = new HttpPutBuilder();
|
||||
putBuilder.setUrl(loadUrl)
|
||||
.addFileName(fileName)
|
||||
.addCommonHeader()
|
||||
.setEmptyEntity()
|
||||
.baseAuth(options.getUsername(), options.getPassword());
|
||||
CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
|
||||
int statusCode = execute.getStatusLine().getStatusCode();
|
||||
String reason = execute.getStatusLine().getReasonPhrase();
|
||||
if (statusCode == 307) {
|
||||
Header location = execute.getFirstHeader("location");
|
||||
String uploadAddress = location.getValue();
|
||||
LOG.info("redirect to s3:{}", uploadAddress);
|
||||
return uploadAddress;
|
||||
} else {
|
||||
HttpEntity entity = execute.getEntity();
|
||||
String result = entity == null ? null : EntityUtils.toString(entity);
|
||||
LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
|
||||
throw new RuntimeException("Could not get the redirected address.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private byte[] addRows(List<byte[]> rows, int totalBytes) {
|
||||
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
|
||||
byte[] lineDelimiter = DelimiterParser.parse((String) props.get("file.line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
||||
for (byte[] row : rows) {
|
||||
bos.put(row);
|
||||
bos.put(lineDelimiter);
|
||||
}
|
||||
return bos.array();
|
||||
}
|
||||
|
||||
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
||||
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
||||
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
||||
boolean isFirstElement = true;
|
||||
for (byte[] row : rows) {
|
||||
if (!isFirstElement) {
|
||||
bos.put(jsonDelimiter);
|
||||
}
|
||||
bos.put(row);
|
||||
isFirstElement = false;
|
||||
}
|
||||
bos.put("]".getBytes(StandardCharsets.UTF_8));
|
||||
return bos.array();
|
||||
}
|
||||
throw new RuntimeException("Failed to join rows data, unsupported `file.type` from copy into properties:");
|
||||
}
|
||||
|
||||
public void put(String loadUrl, String fileName, byte[] data) throws IOException {
|
||||
LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
|
||||
HttpPutBuilder putBuilder = new HttpPutBuilder();
|
||||
putBuilder.setUrl(loadUrl)
|
||||
.addCommonHeader()
|
||||
.setEntity(new InputStreamEntity(new ByteArrayInputStream(data)));
|
||||
CloseableHttpResponse response = httpClient.execute(putBuilder.build());
|
||||
final int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (statusCode != 200) {
|
||||
String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
|
||||
LOG.error("upload file {} error, response {}", fileName, result);
|
||||
throw new SelectdbWriterException("upload file error: " + fileName,true);
|
||||
}
|
||||
}
|
||||
|
||||
private String getLoadHost() {
|
||||
List<String> hostList = options.getLoadUrlList();
|
||||
long tmp = pos + hostList.size();
|
||||
for (; pos < tmp; pos++) {
|
||||
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
|
||||
if (checkConnection(host)) {
|
||||
return host;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean checkConnection(String host) {
|
||||
try {
|
||||
URL url = new URL(host);
|
||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
||||
co.setConnectTimeout(5000);
|
||||
co.connect();
|
||||
co.disconnect();
|
||||
return true;
|
||||
} catch (Exception e1) {
|
||||
e1.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* execute copy into
|
||||
*/
|
||||
public void executeCopy(String hostPort, String fileName) throws IOException{
|
||||
long start = System.currentTimeMillis();
|
||||
CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
|
||||
String copySQL = copySQLBuilder.buildCopySQL();
|
||||
LOG.info("build copy SQL is {}", copySQL);
|
||||
Map<String,String> params = new HashMap<>();
|
||||
params.put("sql", copySQL);
|
||||
if(StringUtils.isNotBlank(options.getClusterName())){
|
||||
params.put("cluster",options.getClusterName());
|
||||
}
|
||||
HttpPostBuilder postBuilder = new HttpPostBuilder();
|
||||
postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
|
||||
.baseAuth(options.getUsername(), options.getPassword())
|
||||
.setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
|
||||
|
||||
CloseableHttpResponse response = httpClient.execute(postBuilder.build());
|
||||
final int statusCode = response.getStatusLine().getStatusCode();
|
||||
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
|
||||
String loadResult = "";
|
||||
if (statusCode != 200) {
|
||||
LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
|
||||
throw new SelectdbWriterException("commit error with file: " + fileName,true);
|
||||
} else if (response.getEntity() != null){
|
||||
loadResult = EntityUtils.toString(response.getEntity());
|
||||
boolean success = handleCommitResponse(loadResult);
|
||||
if(success){
|
||||
LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
|
||||
}else{
|
||||
throw new SelectdbWriterException("commit fail",true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean handleCommitResponse(String loadResult) throws IOException {
|
||||
BaseResponse<CopyIntoResp> baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse<CopyIntoResp>>(){});
|
||||
if(baseResponse.getCode() == SUCCESS){
|
||||
CopyIntoResp dataResp = baseResponse.getData();
|
||||
if(FAIL.equals(dataResp.getDataCode())){
|
||||
LOG.error("copy into execute failed, reason:{}", loadResult);
|
||||
return false;
|
||||
}else{
|
||||
Map<String, String> result = dataResp.getResult();
|
||||
if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){
|
||||
LOG.error("copy into load failed, reason:{}", loadResult);
|
||||
return false;
|
||||
}else{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
LOG.error("commit failed, reason:{}", loadResult);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isCommitted(String msg) {
|
||||
return COMMITTED_PATTERN.matcher(msg).matches();
|
||||
}
|
||||
|
||||
|
||||
public void close() throws IOException {
|
||||
if (null != httpClient) {
|
||||
try {
|
||||
httpClient.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Closing httpClient failed.", e);
|
||||
throw new RuntimeException("Closing httpClient failed.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
|
||||
public class SelectdbCsvCodec extends SelectdbBaseCodec implements SelectdbCodec {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final String columnSeparator;
|
||||
|
||||
public SelectdbCsvCodec ( String sp) {
|
||||
this.columnSeparator = DelimiterParser.parse(sp, "\t");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String codec( Record row) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < row.getColumnNumber(); i++) {
|
||||
String value = convertionField(row.getColumn(i));
|
||||
sb.append(null == value ? "\\N" : value);
|
||||
if (i < row.getColumnNumber() - 1) {
|
||||
sb.append(columnSeparator);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SelectdbJsonCodec extends SelectdbBaseCodec implements SelectdbCodec {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final List<String> fieldNames;
|
||||
|
||||
public SelectdbJsonCodec ( List<String> fieldNames) {
|
||||
this.fieldNames = fieldNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String codec( Record row) {
|
||||
if (null == fieldNames) {
|
||||
return "";
|
||||
}
|
||||
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
|
||||
int idx = 0;
|
||||
for (String fieldName : fieldNames) {
|
||||
rowMap.put(fieldName, convertionField(row.getColumn(idx)));
|
||||
idx++;
|
||||
}
|
||||
return JSON.toJSONString(rowMap);
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||
import com.alibaba.druid.sql.parser.ParserException;
|
||||
import com.google.common.base.Strings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* jdbc util
|
||||
*/
|
||||
public class SelectdbUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SelectdbUtil.class);
|
||||
|
||||
private SelectdbUtil() {}
|
||||
|
||||
public static List<String> getDorisTableColumns( Connection conn, String databaseName, String tableName) {
|
||||
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
|
||||
List<String> columns = new ArrayList<> ();
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
rs = DBUtil.query(conn, currentSql);
|
||||
while (DBUtil.asyncResultSetNext(rs)) {
|
||||
String colName = rs.getString("COLUMN_NAME");
|
||||
columns.add(colName);
|
||||
}
|
||||
return columns;
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(rs, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
|
||||
if (null == preOrPostSqls) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<String> renderedSqls = new ArrayList<>();
|
||||
for (String sql : preOrPostSqls) {
|
||||
if (! Strings.isNullOrEmpty(sql)) {
|
||||
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
|
||||
}
|
||||
}
|
||||
return renderedSqls;
|
||||
}
|
||||
|
||||
public static void executeSqls(Connection conn, List<String> sqls) {
|
||||
Statement stmt = null;
|
||||
String currentSql = null;
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
for (String sql : sqls) {
|
||||
currentSql = sql;
|
||||
DBUtil.executeSqlWithoutResultSet(stmt, sql);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, stmt, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPrePareSQL( Keys options) {
|
||||
String table = options.getTable();
|
||||
List<String> preSqls = options.getPreSqlList();
|
||||
List<String> renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(preSqls, table);
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
|
||||
for (String sql : renderedPreSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch ( ParserException e) {
|
||||
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPostSQL( Keys options) {
|
||||
String table = options.getTable();
|
||||
List<String> postSqls = options.getPostSqlList();
|
||||
List<String> renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(postSqls, table);
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
|
||||
for(String sql : renderedPostSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e){
|
||||
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T checkNotNull(T reference) {
|
||||
if (reference == null) {
|
||||
throw new NullPointerException();
|
||||
} else {
|
||||
return reference;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* doris data writer
|
||||
*/
|
||||
public class SelectdbWriter extends Writer {
|
||||
|
||||
public static class Job extends Writer.Job {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||
private Configuration originalConfig = null;
|
||||
private Keys options;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
options = new Keys (super.getPluginJobConf());
|
||||
options.doPretreatment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCheck(){
|
||||
this.init();
|
||||
SelectdbUtil.preCheckPrePareSQL(options);
|
||||
SelectdbUtil.preCheckPostSQL(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
List<String> renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
|
||||
SelectdbUtil.executeSqls(conn, renderedPreSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int mandatoryNumber) {
|
||||
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
|
||||
for (int i = 0; i < mandatoryNumber; i++) {
|
||||
configurations.add(originalConfig);
|
||||
}
|
||||
return configurations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
List<String> renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
||||
SelectdbUtil.executeSqls(conn, renderedPostSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Task extends Writer.Task {
|
||||
private SelectdbWriterManager writerManager;
|
||||
private Keys options;
|
||||
private SelectdbCodec rowCodec;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
options = new Keys (super.getPluginJobConf());
|
||||
if (options.isWildcardColumn()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
|
||||
List<String> columns = SelectdbUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
|
||||
options.setInfoCchemaColumns(columns);
|
||||
}
|
||||
writerManager = new SelectdbWriterManager(options);
|
||||
rowCodec = SelectdbCodecFactory.createCodec(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
}
|
||||
|
||||
public void startWrite(RecordReceiver recordReceiver) {
|
||||
try {
|
||||
Record record;
|
||||
while ((record = recordReceiver.getFromReader()) != null) {
|
||||
if (record.getColumnNumber() != options.getColumns().size()) {
|
||||
throw DataXException
|
||||
.asDataXException(
|
||||
DBUtilErrorCode.CONF_ERROR,
|
||||
String.format(
|
||||
"There is an error in the column configuration information. " +
|
||||
"This is because you have configured a task where the number of fields to be read from the source:%s " +
|
||||
"is not equal to the number of fields to be written to the destination table:%s. " +
|
||||
"Please check your configuration and make changes.",
|
||||
record.getColumnNumber(),
|
||||
options.getColumns().size()));
|
||||
}
|
||||
writerManager.writeRecord(rowCodec.codec(record));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
try {
|
||||
writerManager.close();
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public boolean supportFailOver(){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
|
||||
public class SelectdbWriterException extends RuntimeException {
|
||||
|
||||
private boolean reCreateLabel;
|
||||
|
||||
|
||||
public SelectdbWriterException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public SelectdbWriterException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public SelectdbWriterException(String message, boolean reCreateLabel) {
|
||||
super(message);
|
||||
this.reCreateLabel = reCreateLabel;
|
||||
}
|
||||
|
||||
public SelectdbWriterException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public SelectdbWriterException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
protected SelectdbWriterException(String message, Throwable cause,
|
||||
boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
public boolean needReCreateLabel() {
|
||||
return reCreateLabel;
|
||||
}
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SelectdbWriterManager {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SelectdbWriterManager.class);
|
||||
|
||||
private final SelectdbCopyIntoObserver visitor;
|
||||
private final Keys options;
|
||||
private final List<byte[]> buffer = new ArrayList<>();
|
||||
private int batchCount = 0;
|
||||
private long batchSize = 0;
|
||||
private volatile boolean closed = false;
|
||||
private volatile Exception flushException;
|
||||
private final LinkedBlockingDeque<WriterTuple> flushQueue;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
public SelectdbWriterManager(Keys options) {
|
||||
this.options = options;
|
||||
this.visitor = new SelectdbCopyIntoObserver(options);
|
||||
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
||||
this.startScheduler();
|
||||
this.startAsyncFlushing();
|
||||
}
|
||||
|
||||
public void startScheduler() {
|
||||
stopScheduler();
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());
|
||||
this.scheduledFuture = this.scheduler.schedule(() -> {
|
||||
synchronized (SelectdbWriterManager.this) {
|
||||
if (!closed) {
|
||||
try {
|
||||
String label = createBatchLabel();
|
||||
LOG.info(String.format("Selectdb interval Sinking triggered: label[%s].", label));
|
||||
if (batchCount == 0) {
|
||||
startScheduler();
|
||||
}
|
||||
flush(label, false);
|
||||
} catch (Exception e) {
|
||||
flushException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}, options.getFlushInterval(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void stopScheduler() {
|
||||
if (this.scheduledFuture != null) {
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public final synchronized void writeRecord(String record) throws IOException {
|
||||
checkFlushException();
|
||||
try {
|
||||
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
|
||||
buffer.add(bts);
|
||||
batchCount++;
|
||||
batchSize += bts.length;
|
||||
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
|
||||
String label = createBatchLabel();
|
||||
if(LOG.isDebugEnabled()){
|
||||
LOG.debug(String.format("buffer Sinking triggered: rows[%d] label [%s].", batchCount, label));
|
||||
}
|
||||
flush(label, false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SelectdbWriterException("Writing records to selectdb failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
|
||||
checkFlushException();
|
||||
if (batchCount == 0) {
|
||||
if (waitUtilDone) {
|
||||
waitAsyncFlushingDone();
|
||||
}
|
||||
return;
|
||||
}
|
||||
flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer)));
|
||||
if (waitUtilDone) {
|
||||
// wait the last flush
|
||||
waitAsyncFlushingDone();
|
||||
}
|
||||
buffer.clear();
|
||||
batchCount = 0;
|
||||
batchSize = 0;
|
||||
}
|
||||
|
||||
public synchronized void close() throws IOException {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
try {
|
||||
String label = createBatchLabel();
|
||||
if (batchCount > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Selectdb Sink is about to close: label[%s].", label));
|
||||
}
|
||||
}
|
||||
flush(label, true);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Writing records to Selectdb failed.", e);
|
||||
}
|
||||
}
|
||||
checkFlushException();
|
||||
}
|
||||
|
||||
public String createBatchLabel() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
|
||||
sb.append(options.getLabelPrefix());
|
||||
}
|
||||
return sb.append(UUID.randomUUID().toString())
|
||||
.toString();
|
||||
}
|
||||
|
||||
private void startAsyncFlushing() {
|
||||
// start flush thread
|
||||
Thread flushThread = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
asyncFlush();
|
||||
} catch (Exception e) {
|
||||
flushException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
flushThread.setDaemon(true);
|
||||
flushThread.start();
|
||||
}
|
||||
|
||||
private void waitAsyncFlushingDone() throws InterruptedException {
|
||||
// wait previous flushings
|
||||
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
|
||||
flushQueue.put(new WriterTuple("", 0l, null));
|
||||
}
|
||||
checkFlushException();
|
||||
}
|
||||
|
||||
private void asyncFlush() throws Exception {
|
||||
WriterTuple flushData = flushQueue.take();
|
||||
if (Strings.isNullOrEmpty(flushData.getLabel())) {
|
||||
return;
|
||||
}
|
||||
stopScheduler();
|
||||
for (int i = 0; i <= options.getMaxRetries(); i++) {
|
||||
try {
|
||||
// copy into
|
||||
visitor.streamLoad(flushData);
|
||||
startScheduler();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to flush batch data to selectdb, retry times = {}", i, e);
|
||||
if (i >= options.getMaxRetries()) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (e instanceof SelectdbWriterException && ((SelectdbWriterException)e).needReCreateLabel()) {
|
||||
String newLabel = createBatchLabel();
|
||||
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
|
||||
flushData.setLabel(newLabel);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000l * Math.min(i + 1, 100));
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Unable to flush, interrupted while doing another attempt", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkFlushException() {
|
||||
if (flushException != null) {
|
||||
throw new RuntimeException("Writing records to selectdb failed.", flushException);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.alibaba.datax.plugin.writer.selectdbwriter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class WriterTuple {
|
||||
private String label;
|
||||
private Long bytes;
|
||||
private List<byte[]> rows;
|
||||
|
||||
|
||||
public WriterTuple ( String label, Long bytes, List<byte[]> rows){
|
||||
this.label = label;
|
||||
this.rows = rows;
|
||||
this.bytes = bytes;
|
||||
}
|
||||
|
||||
public String getLabel() { return label; }
|
||||
public void setLabel(String label) { this.label = label; }
|
||||
public Long getBytes() { return bytes; }
|
||||
public List<byte[]> getRows() { return rows; }
|
||||
|
||||
}
|
6
selectdbwriter/src/main/resources/plugin.json
Normal file
6
selectdbwriter/src/main/resources/plugin.json
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "selectdbwriter",
|
||||
"class": "com.alibaba.datax.plugin.writer.selectdbwriter.SelectdbWriter",
|
||||
"description": "selectdb writer plugin",
|
||||
"developer": "selectdb"
|
||||
}
|
19
selectdbwriter/src/main/resources/plugin_job_template.json
Normal file
19
selectdbwriter/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,19 @@
|
||||
{
|
||||
"name": "selectdbwriter",
|
||||
"parameter": {
|
||||
"username": "",
|
||||
"password": "",
|
||||
"column": [],
|
||||
"preSql": [],
|
||||
"postSql": [],
|
||||
"loadUrl": [],
|
||||
"loadProps": {},
|
||||
"connection": [
|
||||
{
|
||||
"jdbcUrl": "",
|
||||
"selectedDatabase": "",
|
||||
"table": []
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user