Add selectdbwriter plugin

This commit is contained in:
caoliang 2022-12-02 18:33:53 +08:00
parent 66a7ef416c
commit 54d5d792b9
26 changed files with 2061 additions and 0 deletions

View File

@ -483,5 +483,12 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>selectdbwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -120,6 +120,7 @@
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<module>doriswriter</module>
<module>selectdbwriter</module>
<module>adbmysqlwriter</module>
<!-- common support module -->

View File

@ -0,0 +1,428 @@
# SelectdbWriter 插件文档
## 1 快速介绍
SelectdbWriter支持将大批量数据写入SELECTDB中。
## 2 实现原理
SelectdbWriter 通过调用selectdb api /copy/upload返回一个重定向的S3地址使用Http向S3地址发送字节流设置参数达到要求时执行copy into
## 3 编译
1. 运行 init-env.sh
2. 编译 selectdbwriter
i. 单独编译 selectdbwriter 插件:
```text
mvn clean install -pl plugin-rdbms-util,selectdbwriter -DskipTests
```
ii.编译整个 DataX 项目:
```text
mvn package assembly:assembly -Dmaven.test.skip=true
```
产出在 target/datax/datax/.
hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件可以在 DataX/pom.xml 中删除这些插件的模块。
iii.编译错误
如遇到如下编译错误:
```text
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT
```
可尝试以下方式解决:
a.下载 alibaba-datax-maven-m2-20210928.tar.gz
b.解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。
c.再次尝试编译。
## 3 功能说明
### 3.1 配置样例
这里是一份从Stream读取数据后导入至selectdb的配置文件。
```
{
"job":{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"random":"0,31"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"long",
"random":"0,5"
},
{
"type":"string",
"random":"0,10"
},
{
"type":"string",
"random":"0,5"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"string",
"random":"0,21"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"long",
"random":"0,20"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"string",
"random":"0,10"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"long",
"random":"0,10"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"string",
"random":"0,100"
},
{
"type":"string",
"random":"0,1"
},
{
"type":"long",
"random":"0,1"
},
{
"type":"string",
"random":"0,64"
},
{
"type":"string",
"random":"0,20"
},
{
"type":"string",
"random":"0,31"
},
{
"type":"long",
"random":"0,3"
},
{
"type":"long",
"random":"0,3"
},
{
"type":"long",
"random":"0,19"
},
{
"type":"date",
"random":"2022-01-01 12:00:00,2023-01-01 12:00:00"
},
{
"type":"string",
"random":"0,1"
}
],
"sliceRecordCount":10
}
},
"writer":{
"name":"selectdbwriter",
"parameter":{
"loadUrl":[
"xxx:47150"
],
"loadProps":{
"file.type":"json",
"file.strip_outer_array":"true"
},
"column":[
"id",
"table_id",
"table_no",
"table_name",
"table_status",
"no_disturb",
"dinner_type",
"member_id",
"reserve_bill_no",
"pre_order_no",
"queue_num",
"person_num",
"open_time",
"open_time_format",
"order_time",
"order_time_format",
"table_bill_id",
"offer_time",
"offer_time_format",
"confirm_bill_time",
"confirm_bill_time_format",
"bill_time",
"bill_time_format",
"clear_time",
"clear_time_format",
"table_message",
"bill_close",
"table_type",
"pad_mac",
"company_id",
"shop_id",
"is_sync",
"table_split_no",
"ts",
"ts_format",
"dr"
],
"username":"admin",
"password":"SelectDB2022",
"postSql":[
],
"preSql":[
],
"connection":[
{
"jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
"table":[
"ods_pos_pro_table_dynamic_delta_v4"
],
"selectedDatabase":"cl_test"
}
],
"maxBatchRows":1000000,
"maxBatchByteSize":536870912000
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"channel":5
}
}
}
}
```
### 3.2 参数说明
```text
**jdbcUrl**
- 描述selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。
- 必选:是
- 默认值:无
* **loadUrl**
- 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-linkport 是selectdb 集群的 http_port
- 必选:是
- 默认值:无
* **username**
- 描述访问selectdb数据库的用户名
- 必选:是
- 默认值:无
* **password**
- 描述访问selectdb数据库的密码
- 必选:否
- 默认值:空
* **connection.selectedDatabase**
- 描述需要写入的selectdb数据库名称。
- 必选:是
- 默认值:无
* **connection.table**
- 描述需要写入的selectdb表名称。
- 必选:是
- 默认值:无
* **column**
- 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
- 必选:是
- 默认值:否
* **preSql**
- 描述:写入数据到目的表前,会先执行这里的标准语句。
- 必选:否
- 默认值:无
* **postSql**
- 描述:写入数据到目的表后,会执行这里的标准语句。
- 必选:否
- 默认值:无
* **maxBatchRows**
- 描述:每批次导入数据的最大行数。和 **maxBatchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值500000
* **batchSize**
- 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值90M
* **maxRetries**
- 描述:每批次导入数据失败后的重试次数。
- 必选:否
- 默认值3
* **labelPrefix**
- 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label确保数据不会重复导入
- 必选:否
- 默认值:`datax_selectdb_writer_`
* **loadProps**
- 描述COPY INOT 的请求参数
这里包括导入的数据格式file.type等导入数据格式默认我们使用csv支持JSON具体可以参照下面类型转换部分
- 必选:否
- 默认值:无
* **clusterName**
- 描述selectdb could 集群名称
- 必选:否
- 默认值:无
* **flushQueueLength**
- 描述:队列长度
- 必选:否
- 默认值1
* **flushInterval**
- 描述数据写入批次的时间间隔如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。
- 必选:否
- 默认值30000ms
```
### 类型转换
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。
默认是csv格式导入如需更改列分隔符 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"file.column_separator": "\\x01",
"file.line_delimiter": "\\x02"
}
```
如需更改导入格式为`json` 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"file.type": "json",
"file.strip_outer_array": true
}
```

View File

@ -0,0 +1,93 @@
{
"core":{
"transport":{
"channel":{
"speed":{
"byte":10485760
}
}
}
},
"job":{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"value":"DataX"
},
{
"type":"int",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
},
{
"type":"string",
"value":"test"
}
],
"sliceRecordCount":1000000
}
},
"writer":{
"name":"selectdbwriter",
"parameter":{
"loadUrl":[
"xxx:35871"
],
"loadProps":{
"file.type":"json",
"file.strip_outer_array":"true"
},
"database":"db1",
"column":[
"k1",
"k2",
"k3",
"k4",
"k5"
],
"username":"admin",
"password":"SelectDB2022",
"postSql":[
],
"preSql":[
],
"connection":[
{
"jdbcUrl":"jdbc:mysql://xxx:32386/cl_test",
"table":[
"test_selectdb"
],
"selectedDatabase":"cl_test"
}
],
"maxBatchRows":200000,
"maxBatchByteSize":53687091200
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"byte":10485760
}
}
}
}

96
selectdbwriter/pom.xml Normal file
View File

@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>selectdbwriter</artifactId>
<name>selectdbwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id/>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/selectdbwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>selectdbwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/selectdbwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/selectdbwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,23 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class BaseResponse<T> {
private int code;
private String msg;
private T data;
private int count;
public int getCode() {
return code;
}
public String getMsg() {
return msg;
}
public T getData(){
return data;
}
}

View File

@ -0,0 +1,26 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CopyIntoResp extends BaseResponse{
private String code;
private String exception;
private Map<String,String> result;
public String getDataCode() {
return code;
}
public String getException() {
return exception;
}
public Map<String, String> getResult() {
return result;
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import java.util.Map;
import java.util.StringJoiner;
public class CopySQLBuilder {
private final static String COPY_SYNC = "copy.async";
private final String fileName;
private final Keys options;
private Map<String, Object> properties;
public CopySQLBuilder(Keys options, String fileName) {
this.options=options;
this.fileName=fileName;
this.properties=options.getLoadProps();
}
public String buildCopySQL(){
StringBuilder sb = new StringBuilder();
sb.append("COPY INTO ")
.append(options.getDatabase() + "." + options.getTable())
.append(" FROM @~('").append(fileName).append("') ")
.append("PROPERTIES (");
//copy into must be sync
properties.put(COPY_SYNC,false);
StringJoiner props = new StringJoiner(",");
for(Map.Entry<String,Object> entry : properties.entrySet()){
String key = String.valueOf(entry.getKey());
String value = String.valueOf(entry.getValue());
String prop = String.format("'%s'='%s'",key,value);
props.add(prop);
}
sb.append(props).append(" )");
return sb.toString();
}
}

View File

@ -0,0 +1,54 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.google.common.base.Strings;
import java.io.StringWriter;
public class DelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp, String dSp) throws RuntimeException {
if ( Strings.isNullOrEmpty(sp)) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private static byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private static byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}

View File

@ -0,0 +1,51 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpPost;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class HttpPostBuilder {
String url;
Map<String, String> header;
HttpEntity httpEntity;
public HttpPostBuilder() {
header = new HashMap<>();
}
public HttpPostBuilder setUrl(String url) {
this.url = url;
return this;
}
public HttpPostBuilder addCommonHeader() {
header.put(HttpHeaders.EXPECT, "100-continue");
return this;
}
public HttpPostBuilder baseAuth(String user, String password) {
final String authInfo = user + ":" + password;
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
return this;
}
public HttpPostBuilder setEntity(HttpEntity httpEntity) {
this.httpEntity = httpEntity;
return this;
}
public HttpPost build() {
SelectdbUtil.checkNotNull(url);
SelectdbUtil.checkNotNull(httpEntity);
HttpPost put = new HttpPost(url);
header.forEach(put::setHeader);
put.setEntity(httpEntity);
return put;
}
}

View File

@ -0,0 +1,65 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class HttpPutBuilder {
String url;
Map<String, String> header;
HttpEntity httpEntity;
public HttpPutBuilder() {
header = new HashMap<>();
}
public HttpPutBuilder setUrl(String url) {
this.url = url;
return this;
}
public HttpPutBuilder addFileName(String fileName){
header.put("fileName", fileName);
return this;
}
public HttpPutBuilder setEmptyEntity() {
try {
this.httpEntity = new StringEntity("");
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return this;
}
public HttpPutBuilder addCommonHeader() {
header.put(HttpHeaders.EXPECT, "100-continue");
return this;
}
public HttpPutBuilder baseAuth(String user, String password) {
final String authInfo = user + ":" + password;
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
return this;
}
public HttpPutBuilder setEntity(HttpEntity httpEntity) {
this.httpEntity = httpEntity;
return this;
}
public HttpPut build() {
SelectdbUtil.checkNotNull(url);
SelectdbUtil.checkNotNull(httpEntity);
HttpPut put = new HttpPut(url);
header.forEach(put::setHeader);
put.setEntity(httpEntity);
return put;
}
}

View File

@ -0,0 +1,186 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class Keys implements Serializable {
private static final long serialVersionUID = 1l;
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int BATCH_ROWS = 500000;
private static final long DEFAULT_FLUSH_INTERVAL = 30000;
private static final String LOAD_PROPS_FORMAT = "file.type";
public enum StreamLoadFormat {
CSV, JSON;
}
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String DATABASE = "connection[0].selectedDatabase";
private static final String TABLE = "connection[0].table[0]";
private static final String COLUMN = "column";
private static final String PRE_SQL = "preSql";
private static final String POST_SQL = "postSql";
private static final String JDBC_URL = "connection[0].jdbcUrl";
private static final String LABEL_PREFIX = "labelPrefix";
private static final String MAX_BATCH_ROWS = "maxBatchRows";
private static final String MAX_BATCH_SIZE = "batchSize";
private static final String FLUSH_INTERVAL = "flushInterval";
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";
private static final String DEFAULT_LABEL_PREFIX = "datax_selectdb_writer_";
private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M
private static final String CLUSTER_NAME = "clusterName";
private static final String MAX_RETRIES = "maxRetries";
private final Configuration options;
private List<String> infoSchemaColumns;
private List<String> userSetColumns;
private boolean isWildcardColumn;
public Keys ( Configuration options) {
this.options = options;
this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) {
this.isWildcardColumn = true;
}
}
public void doPretreatment() {
validateRequired();
validateStreamLoadUrl();
}
public String getJdbcUrl() {
return options.getString(JDBC_URL);
}
public String getDatabase() {
return options.getString(DATABASE);
}
public String getTable() {
return options.getString(TABLE);
}
public String getUsername() {
return options.getString(USERNAME);
}
public String getPassword() {
return options.getString(PASSWORD);
}
public String getClusterName(){
return options.getString(CLUSTER_NAME);
}
public String getLabelPrefix() {
String label = options.getString(LABEL_PREFIX);
return null == label ? DEFAULT_LABEL_PREFIX : label;
}
public List<String> getLoadUrlList() {
return options.getList(LOAD_URL, String.class);
}
public List<String> getColumns() {
if (isWildcardColumn) {
return this.infoSchemaColumns;
}
return this.userSetColumns;
}
public boolean isWildcardColumn() {
return this.isWildcardColumn;
}
public void setInfoCchemaColumns(List<String> cols) {
this.infoSchemaColumns = cols;
}
public List<String> getPreSqlList() {
return options.getList(PRE_SQL, String.class);
}
public List<String> getPostSqlList() {
return options.getList(POST_SQL, String.class);
}
public Map<String, Object> getLoadProps() {
return options.getMap(LOAD_PROPS);
}
public int getMaxRetries() {
Integer retries = options.getInt(MAX_RETRIES);
return null == retries ? DEFAULT_MAX_RETRIES : retries;
}
public int getBatchRows() {
Integer rows = options.getInt(MAX_BATCH_ROWS);
return null == rows ? BATCH_ROWS : rows;
}
public long getBatchSize() {
Long size = options.getLong(MAX_BATCH_SIZE);
return null == size ? DEFAULT_MAX_BATCH_SIZE : size;
}
public long getFlushInterval() {
Long interval = options.getLong(FLUSH_INTERVAL);
return null == interval ? DEFAULT_FLUSH_INTERVAL : interval;
}
public int getFlushQueueLength() {
Integer len = options.getInt(FLUSH_QUEUE_LENGTH);
return null == len ? 1 : len;
}
public StreamLoadFormat getStreamLoadFormat() {
Map<String, Object> loadProps = getLoadProps();
if (null == loadProps) {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
}
private void validateStreamLoadUrl() {
List<String> urlList = getLoadUrlList();
for (String host : urlList) {
if (host.split(":").length < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
}
}
}
private void validateRequired() {
final String[] requiredOptionKeys = new String[]{
USERNAME,
DATABASE,
TABLE,
COLUMN,
LOAD_URL
};
for (String optionKey : requiredOptionKeys) {
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
}
}
}

View File

@ -0,0 +1,23 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.element.Column;
public class SelectdbBaseCodec {
protected String convertionField( Column col) {
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
return null;
}
if ( Column.Type.BOOL == col.getType()) {
return String.valueOf(col.asLong());
}
if ( Column.Type.BYTES == col.getType()) {
byte[] bts = (byte[])col.getRawData();
long value = 0;
for (int i = 0; i < bts.length; i++) {
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
}
return String.valueOf(value);
}
return col.asString();
}
}

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.element.Record;
import java.io.Serializable;
public interface SelectdbCodec extends Serializable {
String codec( Record row);
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import java.util.Map;
public class SelectdbCodecFactory {
public SelectdbCodecFactory (){
}
public static SelectdbCodec createCodec( Keys writerOptions) {
if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
return new SelectdbCsvCodec (null == props || !props.containsKey("file.column_separator") ? null : String.valueOf(props.get("file.column_separator")));
}
if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new SelectdbJsonCodec (writerOptions.getColumns());
}
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}

View File

@ -0,0 +1,233 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class SelectdbCopyIntoObserver {
private static final Logger LOG = LoggerFactory.getLogger(SelectdbCopyIntoObserver.class);
private Keys options;
private long pos;
public static final int SUCCESS = 0;
public static final String FAIL = "1";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.disableRedirectHandling();
private CloseableHttpClient httpClient;
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
private static final String COMMIT_PATTERN = "%s/copy/query";
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded");
public SelectdbCopyIntoObserver(Keys options) {
this.options = options;
this.httpClient = httpClientBuilder.build();
}
public void streamLoad(WriterTuple data) throws Exception {
String host = getLoadHost();
if (host == null) {
throw new RuntimeException("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
}
String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
put(uploadAddress, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
executeCopy(host,data.getLabel());
}
private String getUploadAddress(String loadUrl, String fileName) throws IOException {
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
.addFileName(fileName)
.addCommonHeader()
.setEmptyEntity()
.baseAuth(options.getUsername(), options.getPassword());
CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
int statusCode = execute.getStatusLine().getStatusCode();
String reason = execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location = execute.getFirstHeader("location");
String uploadAddress = location.getValue();
LOG.info("redirect to s3:{}", uploadAddress);
return uploadAddress;
} else {
HttpEntity entity = execute.getEntity();
String result = entity == null ? null : EntityUtils.toString(entity);
LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
throw new RuntimeException("Could not get the redirected address.");
}
}
private byte[] addRows(List<byte[]> rows, int totalBytes) {
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
byte[] lineDelimiter = DelimiterParser.parse((String) props.get("file.line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (byte[] row : rows) {
bos.put(row);
bos.put(lineDelimiter);
}
return bos.array();
}
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true;
for (byte[] row : rows) {
if (!isFirstElement) {
bos.put(jsonDelimiter);
}
bos.put(row);
isFirstElement = false;
}
bos.put("]".getBytes(StandardCharsets.UTF_8));
return bos.array();
}
throw new RuntimeException("Failed to join rows data, unsupported `file.type` from copy into properties:");
}
public void put(String loadUrl, String fileName, byte[] data) throws IOException {
LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
.addCommonHeader()
.setEntity(new InputStreamEntity(new ByteArrayInputStream(data)));
CloseableHttpResponse response = httpClient.execute(putBuilder.build());
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
LOG.error("upload file {} error, response {}", fileName, result);
throw new SelectdbWriterException("upload file error: " + fileName,true);
}
}
private String getLoadHost() {
List<String> hostList = options.getLoadUrlList();
long tmp = pos + hostList.size();
for (; pos < tmp; pos++) {
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
if (checkConnection(host)) {
return host;
}
}
return null;
}
private boolean checkConnection(String host) {
try {
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(5000);
co.connect();
co.disconnect();
return true;
} catch (Exception e1) {
e1.printStackTrace();
return false;
}
}
/**
* execute copy into
*/
public void executeCopy(String hostPort, String fileName) throws IOException{
long start = System.currentTimeMillis();
CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
String copySQL = copySQLBuilder.buildCopySQL();
LOG.info("build copy SQL is {}", copySQL);
Map<String,String> params = new HashMap<>();
params.put("sql", copySQL);
if(StringUtils.isNotBlank(options.getClusterName())){
params.put("cluster",options.getClusterName());
}
HttpPostBuilder postBuilder = new HttpPostBuilder();
postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
.baseAuth(options.getUsername(), options.getPassword())
.setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
CloseableHttpResponse response = httpClient.execute(postBuilder.build());
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
if (statusCode != 200) {
LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
throw new SelectdbWriterException("commit error with file: " + fileName,true);
} else if (response.getEntity() != null){
loadResult = EntityUtils.toString(response.getEntity());
boolean success = handleCommitResponse(loadResult);
if(success){
LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
}else{
throw new SelectdbWriterException("commit fail",true);
}
}
}
public boolean handleCommitResponse(String loadResult) throws IOException {
BaseResponse<CopyIntoResp> baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse<CopyIntoResp>>(){});
if(baseResponse.getCode() == SUCCESS){
CopyIntoResp dataResp = baseResponse.getData();
if(FAIL.equals(dataResp.getDataCode())){
LOG.error("copy into execute failed, reason:{}", loadResult);
return false;
}else{
Map<String, String> result = dataResp.getResult();
if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){
LOG.error("copy into load failed, reason:{}", loadResult);
return false;
}else{
return true;
}
}
}else{
LOG.error("commit failed, reason:{}", loadResult);
return false;
}
}
public static boolean isCommitted(String msg) {
return COMMITTED_PATTERN.matcher(msg).matches();
}
public void close() throws IOException {
if (null != httpClient) {
try {
httpClient.close();
} catch (IOException e) {
LOG.error("Closing httpClient failed.", e);
throw new RuntimeException("Closing httpClient failed.", e);
}
}
}
}

View File

@ -0,0 +1,27 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.element.Record;
public class SelectdbCsvCodec extends SelectdbBaseCodec implements SelectdbCodec {
private static final long serialVersionUID = 1L;
private final String columnSeparator;
public SelectdbCsvCodec ( String sp) {
this.columnSeparator = DelimiterParser.parse(sp, "\t");
}
@Override
public String codec( Record row) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < row.getColumnNumber(); i++) {
String value = convertionField(row.getColumn(i));
sb.append(null == value ? "\\N" : value);
if (i < row.getColumnNumber() - 1) {
sb.append(columnSeparator);
}
}
return sb.toString();
}
}

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SelectdbJsonCodec extends SelectdbBaseCodec implements SelectdbCodec {
private static final long serialVersionUID = 1L;
private final List<String> fieldNames;
public SelectdbJsonCodec ( List<String> fieldNames) {
this.fieldNames = fieldNames;
}
@Override
public String codec( Record row) {
if (null == fieldNames) {
return "";
}
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
int idx = 0;
for (String fieldName : fieldNames) {
rowMap.put(fieldName, convertionField(row.getColumn(idx)));
idx++;
}
return JSON.toJSONString(rowMap);
}
}

View File

@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* jdbc util
*/
public class SelectdbUtil {
private static final Logger LOG = LoggerFactory.getLogger(SelectdbUtil.class);
private SelectdbUtil() {}
public static List<String> getDorisTableColumns( Connection conn, String databaseName, String tableName) {
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
List<String> columns = new ArrayList<> ();
ResultSet rs = null;
try {
rs = DBUtil.query(conn, currentSql);
while (DBUtil.asyncResultSetNext(rs)) {
String colName = rs.getString("COLUMN_NAME");
columns.add(colName);
}
return columns;
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(rs, null, null);
}
}
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
if (null == preOrPostSqls) {
return Collections.emptyList();
}
List<String> renderedSqls = new ArrayList<>();
for (String sql : preOrPostSqls) {
if (! Strings.isNullOrEmpty(sql)) {
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
}
}
return renderedSqls;
}
public static void executeSqls(Connection conn, List<String> sqls) {
Statement stmt = null;
String currentSql = null;
try {
stmt = conn.createStatement();
for (String sql : sqls) {
currentSql = sql;
DBUtil.executeSqlWithoutResultSet(stmt, sql);
}
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(null, stmt, null);
}
}
public static void preCheckPrePareSQL( Keys options) {
String table = options.getTable();
List<String> preSqls = options.getPreSqlList();
List<String> renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(preSqls, table);
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
for (String sql : renderedPreSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch ( ParserException e) {
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
public static void preCheckPostSQL( Keys options) {
String table = options.getTable();
List<String> postSqls = options.getPostSqlList();
List<String> renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(postSqls, table);
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
for(String sql : renderedPostSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch (ParserException e){
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
public static <T> T checkNotNull(T reference) {
if (reference == null) {
throw new NullPointerException();
} else {
return reference;
}
}
}

View File

@ -0,0 +1,149 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
/**
* doris data writer
*/
public class SelectdbWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private Keys options;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
options = new Keys (super.getPluginJobConf());
options.doPretreatment();
}
@Override
public void preCheck(){
this.init();
SelectdbUtil.preCheckPrePareSQL(options);
SelectdbUtil.preCheckPostSQL(options);
}
@Override
public void prepare() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
SelectdbUtil.executeSqls(conn, renderedPreSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(originalConfig);
}
return configurations;
}
@Override
public void post() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
SelectdbUtil.executeSqls(conn, renderedPostSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private SelectdbWriterManager writerManager;
private Keys options;
private SelectdbCodec rowCodec;
@Override
public void init() {
options = new Keys (super.getPluginJobConf());
if (options.isWildcardColumn()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
List<String> columns = SelectdbUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
options.setInfoCchemaColumns(columns);
}
writerManager = new SelectdbWriterManager(options);
rowCodec = SelectdbCodecFactory.createCodec(options);
}
@Override
public void prepare() {
}
public void startWrite(RecordReceiver recordReceiver) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != options.getColumns().size()) {
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"There is an error in the column configuration information. " +
"This is because you have configured a task where the number of fields to be read from the source:%s " +
"is not equal to the number of fields to be written to the destination table:%s. " +
"Please check your configuration and make changes.",
record.getColumnNumber(),
options.getColumns().size()));
}
writerManager.writeRecord(rowCodec.codec(record));
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void post() {
try {
writerManager.close();
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void destroy() {}
@Override
public boolean supportFailOver(){
return false;
}
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.selectdbwriter;
import java.io.IOException;
import java.util.Map;
public class SelectdbWriterException extends RuntimeException {
private boolean reCreateLabel;
public SelectdbWriterException() {
super();
}
public SelectdbWriterException(String message) {
super(message);
}
public SelectdbWriterException(String message, boolean reCreateLabel) {
super(message);
this.reCreateLabel = reCreateLabel;
}
public SelectdbWriterException(String message, Throwable cause) {
super(message, cause);
}
public SelectdbWriterException(Throwable cause) {
super(cause);
}
protected SelectdbWriterException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public boolean needReCreateLabel() {
return reCreateLabel;
}
}

View File

@ -0,0 +1,210 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.selectdbwriter;
import com.google.common.base.Strings;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class SelectdbWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(SelectdbWriterManager.class);
private final SelectdbCopyIntoObserver visitor;
private final Keys options;
private final List<byte[]> buffer = new ArrayList<>();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
private volatile Exception flushException;
private final LinkedBlockingDeque<WriterTuple> flushQueue;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
public SelectdbWriterManager(Keys options) {
this.options = options;
this.visitor = new SelectdbCopyIntoObserver(options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
}
public void startScheduler() {
stopScheduler();
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());
this.scheduledFuture = this.scheduler.schedule(() -> {
synchronized (SelectdbWriterManager.this) {
if (!closed) {
try {
String label = createBatchLabel();
LOG.info(String.format("Selectdb interval Sinking triggered: label[%s].", label));
if (batchCount == 0) {
startScheduler();
}
flush(label, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, options.getFlushInterval(), TimeUnit.MILLISECONDS);
}
public void stopScheduler() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += bts.length;
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("buffer Sinking triggered: rows[%d] label [%s].", batchCount, label));
flush(label, false);
}
} catch (Exception e) {
throw new SelectdbWriterException("Writing records to Doris failed.", e);
}
}
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
checkFlushException();
if (batchCount == 0) {
if (waitUtilDone) {
waitAsyncFlushingDone();
}
return;
}
flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer)));
if (waitUtilDone) {
// wait the last flush
waitAsyncFlushingDone();
}
buffer.clear();
batchCount = 0;
batchSize = 0;
}
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
try {
String label = createBatchLabel();
if (batchCount > 0)
LOG.debug(String.format("Selectdb Sink is about to close: label[%s].", label));
flush(label, true);
} catch (Exception e) {
throw new RuntimeException("Writing records to Selectdb failed.", e);
} finally {
this.visitor.close();
}
}
checkFlushException();
}
public String createBatchLabel() {
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
sb.append(options.getLabelPrefix());
}
return sb.append(UUID.randomUUID().toString())
.toString();
}
private void startAsyncFlushing() {
// start flush thread
Thread flushThread = new Thread(new Runnable() {
public void run() {
while (true) {
try {
asyncFlush();
} catch (Exception e) {
flushException = e;
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
}
private void waitAsyncFlushingDone() throws InterruptedException {
// wait previous flushings
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
flushQueue.put(new WriterTuple("", 0l, null));
}
checkFlushException();
}
private void asyncFlush() throws Exception {
WriterTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
stopScheduler();
for (int i = 0; i <= options.getMaxRetries(); i++) {
try {
// copy into
visitor.streamLoad(flushData);
startScheduler();
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
if (i >= options.getMaxRetries()) {
throw new RuntimeException(e);
}
if (e instanceof SelectdbWriterException && ((SelectdbWriterException)e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
}
try {
Thread.sleep(1000l * Math.min(i + 1, 100));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to flush, interrupted while doing another attempt", e);
}
}
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to selectdb failed.", flushException);
}
}
}

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.plugin.writer.selectdbwriter;
import java.util.List;
public class WriterTuple {
private String label;
private Long bytes;
private List<byte[]> rows;
public WriterTuple ( String label, Long bytes, List<byte[]> rows){
this.label = label;
this.rows = rows;
this.bytes = bytes;
}
public String getLabel() { return label; }
public void setLabel(String label) { this.label = label; }
public Long getBytes() { return bytes; }
public List<byte[]> getRows() { return rows; }
}

View File

@ -0,0 +1,6 @@
{
"name": "selectdbwriter",
"class": "com.alibaba.datax.plugin.writer.selectdbwriter.SelectdbWriter",
"description": "selectdb writer plugin",
"developer": "selectdb"
}

View File

@ -0,0 +1,19 @@
{
"name": "selectdbwriter",
"parameter": {
"username": "",
"password": "",
"column": [],
"preSql": [],
"postSql": [],
"loadUrl": [],
"loadProps": {},
"connection": [
{
"jdbcUrl": "",
"selectedDatabase": "",
"table": []
}
]
}
}