From 54d5d792b9f1118a0a4650fa246c3343ba2af642 Mon Sep 17 00:00:00 2001 From: caoliang <245623257@qq.com> Date: Fri, 2 Dec 2022 18:33:53 +0800 Subject: [PATCH] Add selectdbwriter plugin --- package.xml | 7 + pom.xml | 1 + selectdbwriter/doc/selectdbwriter.md | 428 ++++++++++++++++++ selectdbwriter/doc/stream2selectdb.json | 93 ++++ selectdbwriter/pom.xml | 96 ++++ selectdbwriter/src/main/assembly/package.xml | 52 +++ .../writer/selectdbwriter/BaseResponse.java | 23 + .../writer/selectdbwriter/CopyIntoResp.java | 26 ++ .../writer/selectdbwriter/CopySQLBuilder.java | 40 ++ .../selectdbwriter/DelimiterParser.java | 54 +++ .../selectdbwriter/HttpPostBuilder.java | 51 +++ .../writer/selectdbwriter/HttpPutBuilder.java | 65 +++ .../plugin/writer/selectdbwriter/Keys.java | 186 ++++++++ .../selectdbwriter/SelectdbBaseCodec.java | 23 + .../writer/selectdbwriter/SelectdbCodec.java | 10 + .../selectdbwriter/SelectdbCodecFactory.java | 19 + .../SelectdbCopyIntoObserver.java | 233 ++++++++++ .../selectdbwriter/SelectdbCsvCodec.java | 27 ++ .../selectdbwriter/SelectdbJsonCodec.java | 33 ++ .../writer/selectdbwriter/SelectdbUtil.java | 130 ++++++ .../writer/selectdbwriter/SelectdbWriter.java | 149 ++++++ .../SelectdbWriterException.java | 58 +++ .../selectdbwriter/SelectdbWriterManager.java | 210 +++++++++ .../writer/selectdbwriter/WriterTuple.java | 22 + selectdbwriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 19 + 26 files changed, 2061 insertions(+) create mode 100644 selectdbwriter/doc/selectdbwriter.md create mode 100644 selectdbwriter/doc/stream2selectdb.json create mode 100644 selectdbwriter/pom.xml create mode 100644 selectdbwriter/src/main/assembly/package.xml create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/BaseResponse.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopyIntoResp.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/DelimiterParser.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPostBuilder.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPutBuilder.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/Keys.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbBaseCodec.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodec.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodecFactory.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCsvCodec.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbJsonCodec.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriter.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterException.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterManager.java create mode 100644 selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/WriterTuple.java create mode 100644 selectdbwriter/src/main/resources/plugin.json create mode 100644 selectdbwriter/src/main/resources/plugin_job_template.json diff --git a/package.xml b/package.xml index 96b52c30..6d0d1f51 100755 --- a/package.xml +++ b/package.xml @@ -483,5 +483,12 @@ datax + + selectdbwriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index 228e850e..161f96cb 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,7 @@ cassandrawriter clickhousewriter doriswriter + selectdbwriter adbmysqlwriter diff --git a/selectdbwriter/doc/selectdbwriter.md b/selectdbwriter/doc/selectdbwriter.md new file mode 100644 index 00000000..ce4c1f23 --- /dev/null +++ b/selectdbwriter/doc/selectdbwriter.md @@ -0,0 +1,428 @@ +# SelectdbWriter 插件文档 + +## 1 快速介绍 +SelectdbWriter支持将大批量数据写入SELECTDB中。 + +## 2 实现原理 +SelectdbWriter 通过调用selectdb api (/copy/upload),返回一个重定向的S3地址,使用Http向S3地址发送字节流,设置参数达到要求时执行copy into + +## 3 编译 + +1. 运行 init-env.sh + +2. 编译 selectdbwriter: + +i. 单独编译 selectdbwriter 插件: + + ```text + mvn clean install -pl plugin-rdbms-util,selectdbwriter -DskipTests + ``` + + +ii.编译整个 DataX 项目: + + ```text + mvn package assembly:assembly -Dmaven.test.skip=true + ``` +产出在 target/datax/datax/. +hdfsreader, hdfswriter and oscarwriter 这三个插件需要额外的jar包。如果你并不需要这些插件,可以在 DataX/pom.xml 中删除这些插件的模块。 + + +iii.编译错误 + +如遇到如下编译错误: + ```text + Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT + ``` + +可尝试以下方式解决: + +a.下载 alibaba-datax-maven-m2-20210928.tar.gz + +b.解压后,将得到的 alibaba/datax/ 目录,拷贝到所使用的 maven 对应的 .m2/repository/com/alibaba/ 下。 + +c.再次尝试编译。 + +## 3 功能说明 + +### 3.1 配置样例 + +这里是一份从Stream读取数据后导入至selectdb的配置文件。 + +``` +{ + "job":{ + "content":[ + { + "reader":{ + "name":"streamreader", + "parameter":{ + "column":[ + { + "type":"string", + "random":"0,31" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"long", + "random":"0,5" + }, + { + "type":"string", + "random":"0,10" + }, + { + "type":"string", + "random":"0,5" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"string", + "random":"0,21" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"long", + "random":"0,20" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"string", + "random":"0,10" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"long", + "random":"0,10" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"string", + "random":"0,100" + }, + { + "type":"string", + "random":"0,1" + }, + { + "type":"long", + "random":"0,1" + }, + { + "type":"string", + "random":"0,64" + }, + { + "type":"string", + "random":"0,20" + }, + { + "type":"string", + "random":"0,31" + }, + { + "type":"long", + "random":"0,3" + }, + { + "type":"long", + "random":"0,3" + }, + { + "type":"long", + "random":"0,19" + }, + { + "type":"date", + "random":"2022-01-01 12:00:00,2023-01-01 12:00:00" + }, + { + "type":"string", + "random":"0,1" + } + ], + "sliceRecordCount":10 + } + }, + "writer":{ + "name":"selectdbwriter", + "parameter":{ + "loadUrl":[ + "xxx:47150" + ], + "loadProps":{ + "file.type":"json", + "file.strip_outer_array":"true" + }, + "column":[ + "id", + "table_id", + "table_no", + "table_name", + "table_status", + "no_disturb", + "dinner_type", + "member_id", + "reserve_bill_no", + "pre_order_no", + "queue_num", + "person_num", + "open_time", + "open_time_format", + "order_time", + "order_time_format", + "table_bill_id", + "offer_time", + "offer_time_format", + "confirm_bill_time", + "confirm_bill_time_format", + "bill_time", + "bill_time_format", + "clear_time", + "clear_time_format", + "table_message", + "bill_close", + "table_type", + "pad_mac", + "company_id", + "shop_id", + "is_sync", + "table_split_no", + "ts", + "ts_format", + "dr" + ], + "username":"admin", + "password":"SelectDB2022", + "postSql":[ + + ], + "preSql":[ + + ], + "connection":[ + { + "jdbcUrl":"jdbc:mysql://xxx:34142/cl_test", + "table":[ + "ods_pos_pro_table_dynamic_delta_v4" + ], + "selectedDatabase":"cl_test" + } + ], + "maxBatchRows":1000000, + "maxBatchByteSize":536870912000 + } + } + } + ], + "setting":{ + "errorLimit":{ + "percentage":0.02, + "record":0 + }, + "speed":{ + "channel":5 + } + } + } +} + +``` + +### 3.2 参数说明 + +```text + **jdbcUrl** + + - 描述:selectdb 的 JDBC 连接串,用户执行 preSql 或 postSQL。 + - 必选:是 + - 默认值:无 + +* **loadUrl** + + - 描述:作为 selecdb 的连接目标。格式为 "ip:port"。其中 IP 是 selectdb的private-link,port 是selectdb 集群的 http_port + - 必选:是 + - 默认值:无 + +* **username** + + - 描述:访问selectdb数据库的用户名 + - 必选:是 + - 默认值:无 + +* **password** + + - 描述:访问selectdb数据库的密码 + - 必选:否 + - 默认值:空 + +* **connection.selectedDatabase** + - 描述:需要写入的selectdb数据库名称。 + - 必选:是 + - 默认值:无 + +* **connection.table** + - 描述:需要写入的selectdb表名称。 + - 必选:是 + - 默认值:无 + +* **column** + + - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 + - 必选:是 + - 默认值:否 + +* **preSql** + + - 描述:写入数据到目的表前,会先执行这里的标准语句。 + - 必选:否 + - 默认值:无 + +* **postSql** + + - 描述:写入数据到目的表后,会执行这里的标准语句。 + - 必选:否 + - 默认值:无 + + +* **maxBatchRows** + + - 描述:每批次导入数据的最大行数。和 **maxBatchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 + - 必选:否 + - 默认值:500000 + +* **batchSize** + + - 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 + - 必选:否 + - 默认值:90M + +* **maxRetries** + + - 描述:每批次导入数据失败后的重试次数。 + - 必选:否 + - 默认值:3 + +* **labelPrefix** + + - 描述:每批次上传文件的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入 + - 必选:否 + - 默认值:`datax_selectdb_writer_` + +* **loadProps** + + - 描述:COPY INOT 的请求参数 + + 这里包括导入的数据格式:file.type等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分 + + - 必选:否 + + - 默认值:无 + +* **clusterName** + + - 描述:selectdb could 集群名称 + + - 必选:否 + + - 默认值:无 + +* **flushQueueLength** + + - 描述:队列长度 + + - 必选:否 + + - 默认值:1 + +* **flushInterval** + + - 描述:数据写入批次的时间间隔,如果maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,会执行导入。 + + - 必选:否 + + - 默认值:30000ms +``` + +### 类型转换 + +默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行Selectdb导入操作。 + +默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可: + +```json +"loadProps": { + "file.column_separator": "\\x01", + "file.line_delimiter": "\\x02" +} +``` + +如需更改导入格式为`json`, 则正确配置 `loadProps` 即可: +```json +"loadProps": { + "file.type": "json", + "file.strip_outer_array": true +} +``` \ No newline at end of file diff --git a/selectdbwriter/doc/stream2selectdb.json b/selectdbwriter/doc/stream2selectdb.json new file mode 100644 index 00000000..94038810 --- /dev/null +++ b/selectdbwriter/doc/stream2selectdb.json @@ -0,0 +1,93 @@ +{ + "core":{ + "transport":{ + "channel":{ + "speed":{ + "byte":10485760 + } + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name":"streamreader", + "parameter":{ + "column":[ + { + "type":"string", + "value":"DataX" + }, + { + "type":"int", + "value":19890604 + }, + { + "type":"date", + "value":"1989-06-04 00:00:00" + }, + { + "type":"bool", + "value":true + }, + { + "type":"string", + "value":"test" + } + ], + "sliceRecordCount":1000000 + } + }, + "writer":{ + "name":"selectdbwriter", + "parameter":{ + "loadUrl":[ + "xxx:35871" + ], + "loadProps":{ + "file.type":"json", + "file.strip_outer_array":"true" + }, + "database":"db1", + "column":[ + "k1", + "k2", + "k3", + "k4", + "k5" + ], + "username":"admin", + "password":"SelectDB2022", + "postSql":[ + + ], + "preSql":[ + + ], + "connection":[ + { + "jdbcUrl":"jdbc:mysql://xxx:32386/cl_test", + "table":[ + "test_selectdb" + ], + "selectedDatabase":"cl_test" + } + ], + "maxBatchRows":200000, + "maxBatchByteSize":53687091200 + } + } + } + ], + "setting":{ + "errorLimit":{ + "percentage":0.02, + "record":0 + }, + "speed":{ + "byte":10485760 + } + } + } +} \ No newline at end of file diff --git a/selectdbwriter/pom.xml b/selectdbwriter/pom.xml new file mode 100644 index 00000000..fd2a19f7 --- /dev/null +++ b/selectdbwriter/pom.xml @@ -0,0 +1,96 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + selectdbwriter + selectdbwriter + jar + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + mysql + mysql-connector-java + ${mysql.driver.version} + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + com.fasterxml.jackson.core + jackson-annotations + 2.13.3 + + + com.fasterxml.jackson.core + jackson-core + 2.13.3 + + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/selectdbwriter/src/main/assembly/package.xml b/selectdbwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..a6084837 --- /dev/null +++ b/selectdbwriter/src/main/assembly/package.xml @@ -0,0 +1,52 @@ + + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/selectdbwriter + + + target/ + + selectdbwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/selectdbwriter + + + + + false + plugin/writer/selectdbwriter/libs + runtime + + + diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/BaseResponse.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/BaseResponse.java new file mode 100644 index 00000000..c02f725f --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/BaseResponse.java @@ -0,0 +1,23 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class BaseResponse { + private int code; + private String msg; + private T data; + private int count; + + public int getCode() { + return code; + } + + public String getMsg() { + return msg; + } + + public T getData(){ + return data; + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopyIntoResp.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopyIntoResp.java new file mode 100644 index 00000000..4da002ac --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopyIntoResp.java @@ -0,0 +1,26 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class CopyIntoResp extends BaseResponse{ + private String code; + private String exception; + + private Map result; + + public String getDataCode() { + return code; + } + + public String getException() { + return exception; + } + + public Map getResult() { + return result; + } + +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java new file mode 100644 index 00000000..62910d5d --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + + +import java.util.Map; +import java.util.StringJoiner; + +public class CopySQLBuilder { + private final static String COPY_SYNC = "copy.async"; + private final String fileName; + private final Keys options; + private Map properties; + + + + public CopySQLBuilder(Keys options, String fileName) { + this.options=options; + this.fileName=fileName; + this.properties=options.getLoadProps(); + } + + public String buildCopySQL(){ + StringBuilder sb = new StringBuilder(); + sb.append("COPY INTO ") + .append(options.getDatabase() + "." + options.getTable()) + .append(" FROM @~('").append(fileName).append("') ") + .append("PROPERTIES ("); + + //copy into must be sync + properties.put(COPY_SYNC,false); + StringJoiner props = new StringJoiner(","); + for(Map.Entry entry : properties.entrySet()){ + String key = String.valueOf(entry.getKey()); + String value = String.valueOf(entry.getValue()); + String prop = String.format("'%s'='%s'",key,value); + props.add(prop); + } + sb.append(props).append(" )"); + return sb.toString(); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/DelimiterParser.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/DelimiterParser.java new file mode 100644 index 00000000..10572eae --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/DelimiterParser.java @@ -0,0 +1,54 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.google.common.base.Strings; + +import java.io.StringWriter; + +public class DelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + public static String parse(String sp, String dSp) throws RuntimeException { + if ( Strings.isNullOrEmpty(sp)) { + return dSp; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPostBuilder.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPostBuilder.java new file mode 100644 index 00000000..9471debb --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPostBuilder.java @@ -0,0 +1,51 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + + +public class HttpPostBuilder { + String url; + Map header; + HttpEntity httpEntity; + public HttpPostBuilder() { + header = new HashMap<>(); + } + + public HttpPostBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPostBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPostBuilder baseAuth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded)); + return this; + } + + public HttpPostBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPost build() { + SelectdbUtil.checkNotNull(url); + SelectdbUtil.checkNotNull(httpEntity); + HttpPost put = new HttpPost(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPutBuilder.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPutBuilder.java new file mode 100644 index 00000000..59d7dbca --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/HttpPutBuilder.java @@ -0,0 +1,65 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class HttpPutBuilder { + String url; + Map header; + HttpEntity httpEntity; + public HttpPutBuilder() { + header = new HashMap<>(); + } + + public HttpPutBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPutBuilder addFileName(String fileName){ + header.put("fileName", fileName); + return this; + } + + public HttpPutBuilder setEmptyEntity() { + try { + this.httpEntity = new StringEntity(""); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + return this; + } + + public HttpPutBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPutBuilder baseAuth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded)); + return this; + } + + public HttpPutBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPut build() { + SelectdbUtil.checkNotNull(url); + SelectdbUtil.checkNotNull(httpEntity); + HttpPut put = new HttpPut(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/Keys.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/Keys.java new file mode 100644 index 00000000..6c767d93 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/Keys.java @@ -0,0 +1,186 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class Keys implements Serializable { + + private static final long serialVersionUID = 1l; + private static final int DEFAULT_MAX_RETRIES = 3; + private static final int BATCH_ROWS = 500000; + private static final long DEFAULT_FLUSH_INTERVAL = 30000; + + private static final String LOAD_PROPS_FORMAT = "file.type"; + public enum StreamLoadFormat { + CSV, JSON; + } + + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String DATABASE = "connection[0].selectedDatabase"; + private static final String TABLE = "connection[0].table[0]"; + private static final String COLUMN = "column"; + private static final String PRE_SQL = "preSql"; + private static final String POST_SQL = "postSql"; + private static final String JDBC_URL = "connection[0].jdbcUrl"; + private static final String LABEL_PREFIX = "labelPrefix"; + private static final String MAX_BATCH_ROWS = "maxBatchRows"; + private static final String MAX_BATCH_SIZE = "batchSize"; + private static final String FLUSH_INTERVAL = "flushInterval"; + private static final String LOAD_URL = "loadUrl"; + private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength"; + private static final String LOAD_PROPS = "loadProps"; + + private static final String DEFAULT_LABEL_PREFIX = "datax_selectdb_writer_"; + + private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M + + private static final String CLUSTER_NAME = "clusterName"; + + private static final String MAX_RETRIES = "maxRetries"; + private final Configuration options; + + private List infoSchemaColumns; + private List userSetColumns; + private boolean isWildcardColumn; + + public Keys ( Configuration options) { + this.options = options; + this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) { + this.isWildcardColumn = true; + } + } + + public void doPretreatment() { + validateRequired(); + validateStreamLoadUrl(); + } + + public String getJdbcUrl() { + return options.getString(JDBC_URL); + } + + public String getDatabase() { + return options.getString(DATABASE); + } + + public String getTable() { + return options.getString(TABLE); + } + + public String getUsername() { + return options.getString(USERNAME); + } + + public String getPassword() { + return options.getString(PASSWORD); + } + + public String getClusterName(){ + return options.getString(CLUSTER_NAME); + } + + public String getLabelPrefix() { + String label = options.getString(LABEL_PREFIX); + return null == label ? DEFAULT_LABEL_PREFIX : label; + } + + public List getLoadUrlList() { + return options.getList(LOAD_URL, String.class); + } + + public List getColumns() { + if (isWildcardColumn) { + return this.infoSchemaColumns; + } + return this.userSetColumns; + } + + public boolean isWildcardColumn() { + return this.isWildcardColumn; + } + + public void setInfoCchemaColumns(List cols) { + this.infoSchemaColumns = cols; + } + + public List getPreSqlList() { + return options.getList(PRE_SQL, String.class); + } + + public List getPostSqlList() { + return options.getList(POST_SQL, String.class); + } + + public Map getLoadProps() { + return options.getMap(LOAD_PROPS); + } + + public int getMaxRetries() { + Integer retries = options.getInt(MAX_RETRIES); + return null == retries ? DEFAULT_MAX_RETRIES : retries; + } + + public int getBatchRows() { + Integer rows = options.getInt(MAX_BATCH_ROWS); + return null == rows ? BATCH_ROWS : rows; + } + + public long getBatchSize() { + Long size = options.getLong(MAX_BATCH_SIZE); + return null == size ? DEFAULT_MAX_BATCH_SIZE : size; + } + + public long getFlushInterval() { + Long interval = options.getLong(FLUSH_INTERVAL); + return null == interval ? DEFAULT_FLUSH_INTERVAL : interval; + } + + public int getFlushQueueLength() { + Integer len = options.getInt(FLUSH_QUEUE_LENGTH); + return null == len ? 1 : len; + } + + + public StreamLoadFormat getStreamLoadFormat() { + Map loadProps = getLoadProps(); + if (null == loadProps) { + return StreamLoadFormat.CSV; + } + if (loadProps.containsKey(LOAD_PROPS_FORMAT) + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) { + return StreamLoadFormat.JSON; + } + return StreamLoadFormat.CSV; + } + + private void validateStreamLoadUrl() { + List urlList = getLoadUrlList(); + for (String host : urlList) { + if (host.split(":").length < 2) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`]."); + } + } + } + + private void validateRequired() { + final String[] requiredOptionKeys = new String[]{ + USERNAME, + DATABASE, + TABLE, + COLUMN, + LOAD_URL + }; + for (String optionKey : requiredOptionKeys) { + options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); + } + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbBaseCodec.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbBaseCodec.java new file mode 100644 index 00000000..d2fc1224 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbBaseCodec.java @@ -0,0 +1,23 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.element.Column; + +public class SelectdbBaseCodec { + protected String convertionField( Column col) { + if (null == col.getRawData() || Column.Type.NULL == col.getType()) { + return null; + } + if ( Column.Type.BOOL == col.getType()) { + return String.valueOf(col.asLong()); + } + if ( Column.Type.BYTES == col.getType()) { + byte[] bts = (byte[])col.getRawData(); + long value = 0; + for (int i = 0; i < bts.length; i++) { + value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); + } + return String.valueOf(value); + } + return col.asString(); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodec.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodec.java new file mode 100644 index 00000000..b7e9d6ae --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodec.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.element.Record; + +import java.io.Serializable; + +public interface SelectdbCodec extends Serializable { + + String codec( Record row); +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodecFactory.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodecFactory.java new file mode 100644 index 00000000..567f4c0b --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCodecFactory.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import java.util.Map; + +public class SelectdbCodecFactory { + public SelectdbCodecFactory (){ + + } + public static SelectdbCodec createCodec( Keys writerOptions) { + if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + Map props = writerOptions.getLoadProps(); + return new SelectdbCsvCodec (null == props || !props.containsKey("file.column_separator") ? null : String.valueOf(props.get("file.column_separator"))); + } + if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new SelectdbJsonCodec (writerOptions.getColumns()); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java new file mode 100644 index 00000000..c9228b22 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java @@ -0,0 +1,233 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class SelectdbCopyIntoObserver { + private static final Logger LOG = LoggerFactory.getLogger(SelectdbCopyIntoObserver.class); + + private Keys options; + private long pos; + public static final int SUCCESS = 0; + public static final String FAIL = "1"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .disableRedirectHandling(); + private CloseableHttpClient httpClient; + private static final String UPLOAD_URL_PATTERN = "%s/copy/upload"; + private static final String COMMIT_PATTERN = "%s/copy/query"; + private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded"); + + + public SelectdbCopyIntoObserver(Keys options) { + this.options = options; + this.httpClient = httpClientBuilder.build(); + + } + + public void streamLoad(WriterTuple data) throws Exception { + String host = getLoadHost(); + if (host == null) { + throw new RuntimeException("load_url cannot be empty, or the host cannot connect.Please check your configuration."); + } + String loadUrl = String.format(UPLOAD_URL_PATTERN, host); + String uploadAddress = getUploadAddress(loadUrl, data.getLabel()); + put(uploadAddress, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue())); + executeCopy(host,data.getLabel()); + + } + + private String getUploadAddress(String loadUrl, String fileName) throws IOException { + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder.setUrl(loadUrl) + .addFileName(fileName) + .addCommonHeader() + .setEmptyEntity() + .baseAuth(options.getUsername(), options.getPassword()); + CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build()); + int statusCode = execute.getStatusLine().getStatusCode(); + String reason = execute.getStatusLine().getReasonPhrase(); + if (statusCode == 307) { + Header location = execute.getFirstHeader("location"); + String uploadAddress = location.getValue(); + LOG.info("redirect to s3:{}", uploadAddress); + return uploadAddress; + } else { + HttpEntity entity = execute.getEntity(); + String result = entity == null ? null : EntityUtils.toString(entity); + LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result); + throw new RuntimeException("Could not get the redirected address."); + } + + } + + private byte[] addRows(List rows, int totalBytes) { + if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + Map props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps()); + byte[] lineDelimiter = DelimiterParser.parse((String) props.get("file.line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new RuntimeException("Failed to join rows data, unsupported `file.type` from copy into properties:"); + } + + public void put(String loadUrl, String fileName, byte[] data) throws IOException { + LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length)); + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder.setUrl(loadUrl) + .addCommonHeader() + .setEntity(new InputStreamEntity(new ByteArrayInputStream(data))); + CloseableHttpResponse response = httpClient.execute(putBuilder.build()); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != 200) { + String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity()); + LOG.error("upload file {} error, response {}", fileName, result); + throw new SelectdbWriterException("upload file error: " + fileName,true); + } + } + + private String getLoadHost() { + List hostList = options.getLoadUrlList(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); + if (checkConnection(host)) { + return host; + } + } + return null; + } + + private boolean checkConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(5000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + e1.printStackTrace(); + return false; + } + } + + + /** + * execute copy into + */ + public void executeCopy(String hostPort, String fileName) throws IOException{ + long start = System.currentTimeMillis(); + CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName); + String copySQL = copySQLBuilder.buildCopySQL(); + LOG.info("build copy SQL is {}", copySQL); + Map params = new HashMap<>(); + params.put("sql", copySQL); + if(StringUtils.isNotBlank(options.getClusterName())){ + params.put("cluster",options.getClusterName()); + } + HttpPostBuilder postBuilder = new HttpPostBuilder(); + postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort)) + .baseAuth(options.getUsername(), options.getPassword()) + .setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params))); + + CloseableHttpResponse response = httpClient.execute(postBuilder.build()); + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + String loadResult = ""; + if (statusCode != 200) { + LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase); + throw new SelectdbWriterException("commit error with file: " + fileName,true); + } else if (response.getEntity() != null){ + loadResult = EntityUtils.toString(response.getEntity()); + boolean success = handleCommitResponse(loadResult); + if(success){ + LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult); + }else{ + throw new SelectdbWriterException("commit fail",true); + } + } + } + + public boolean handleCommitResponse(String loadResult) throws IOException { + BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference>(){}); + if(baseResponse.getCode() == SUCCESS){ + CopyIntoResp dataResp = baseResponse.getData(); + if(FAIL.equals(dataResp.getDataCode())){ + LOG.error("copy into execute failed, reason:{}", loadResult); + return false; + }else{ + Map result = dataResp.getResult(); + if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){ + LOG.error("copy into load failed, reason:{}", loadResult); + return false; + }else{ + return true; + } + } + }else{ + LOG.error("commit failed, reason:{}", loadResult); + return false; + } + } + + public static boolean isCommitted(String msg) { + return COMMITTED_PATTERN.matcher(msg).matches(); + } + + + public void close() throws IOException { + if (null != httpClient) { + try { + httpClient.close(); + } catch (IOException e) { + LOG.error("Closing httpClient failed.", e); + throw new RuntimeException("Closing httpClient failed.", e); + } + } + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCsvCodec.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCsvCodec.java new file mode 100644 index 00000000..57cad84d --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCsvCodec.java @@ -0,0 +1,27 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.element.Record; + +public class SelectdbCsvCodec extends SelectdbBaseCodec implements SelectdbCodec { + + private static final long serialVersionUID = 1L; + + private final String columnSeparator; + + public SelectdbCsvCodec ( String sp) { + this.columnSeparator = DelimiterParser.parse(sp, "\t"); + } + + @Override + public String codec( Record row) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.getColumnNumber(); i++) { + String value = convertionField(row.getColumn(i)); + sb.append(null == value ? "\\N" : value); + if (i < row.getColumnNumber() - 1) { + sb.append(columnSeparator); + } + } + return sb.toString(); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbJsonCodec.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbJsonCodec.java new file mode 100644 index 00000000..ed7e332f --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbJsonCodec.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSON; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SelectdbJsonCodec extends SelectdbBaseCodec implements SelectdbCodec { + + private static final long serialVersionUID = 1L; + + private final List fieldNames; + + public SelectdbJsonCodec ( List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public String codec( Record row) { + if (null == fieldNames) { + return ""; + } + Map rowMap = new HashMap<> (fieldNames.size()); + int idx = 0; + for (String fieldName : fieldNames) { + rowMap.put(fieldName, convertionField(row.getColumn(idx))); + idx++; + } + return JSON.toJSONString(rowMap); + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java new file mode 100644 index 00000000..2938167d --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.util.RdbmsException; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.druid.sql.parser.ParserException; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * jdbc util + */ +public class SelectdbUtil { + private static final Logger LOG = LoggerFactory.getLogger(SelectdbUtil.class); + + private SelectdbUtil() {} + + public static List getDorisTableColumns( Connection conn, String databaseName, String tableName) { + String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName); + List columns = new ArrayList<> (); + ResultSet rs = null; + try { + rs = DBUtil.query(conn, currentSql); + while (DBUtil.asyncResultSetNext(rs)) { + String colName = rs.getString("COLUMN_NAME"); + columns.add(colName); + } + return columns; + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(rs, null, null); + } + } + + public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { + if (null == preOrPostSqls) { + return Collections.emptyList(); + } + List renderedSqls = new ArrayList<>(); + for (String sql : preOrPostSqls) { + if (! Strings.isNullOrEmpty(sql)) { + renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); + } + } + return renderedSqls; + } + + public static void executeSqls(Connection conn, List sqls) { + Statement stmt = null; + String currentSql = null; + try { + stmt = conn.createStatement(); + for (String sql : sqls) { + currentSql = sql; + DBUtil.executeSqlWithoutResultSet(stmt, sql); + } + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(null, stmt, null); + } + } + + public static void preCheckPrePareSQL( Keys options) { + String table = options.getTable(); + List preSqls = options.getPreSqlList(); + List renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(preSqls, table); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls)); + for (String sql : renderedPreSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch ( ParserException e) { + throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } + + public static void preCheckPostSQL( Keys options) { + String table = options.getTable(); + List postSqls = options.getPostSqlList(); + List renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(postSqls, table); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); + for(String sql : renderedPostSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch (ParserException e){ + throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } + + public static T checkNotNull(T reference) { + if (reference == null) { + throw new NullPointerException(); + } else { + return reference; + } + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriter.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriter.java new file mode 100644 index 00000000..2b91f122 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriter.java @@ -0,0 +1,149 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + +/** + * doris data writer + */ +public class SelectdbWriter extends Writer { + + public static class Job extends Writer.Job { + + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig = null; + private Keys options; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + options = new Keys (super.getPluginJobConf()); + options.doPretreatment(); + } + + @Override + public void preCheck(){ + this.init(); + SelectdbUtil.preCheckPrePareSQL(options); + SelectdbUtil.preCheckPostSQL(options); + } + + @Override + public void prepare() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPreSqls = SelectdbUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable()); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl); + SelectdbUtil.executeSqls(conn, renderedPreSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(originalConfig); + } + return configurations; + } + + @Override + public void post() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPostSqls = SelectdbUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + SelectdbUtil.executeSqls(conn, renderedPostSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public void destroy() { + } + + } + + public static class Task extends Writer.Task { + private SelectdbWriterManager writerManager; + private Keys options; + private SelectdbCodec rowCodec; + + @Override + public void init() { + options = new Keys (super.getPluginJobConf()); + if (options.isWildcardColumn()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword()); + List columns = SelectdbUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable()); + options.setInfoCchemaColumns(columns); + } + writerManager = new SelectdbWriterManager(options); + rowCodec = SelectdbCodecFactory.createCodec(options); + } + + @Override + public void prepare() { + } + + public void startWrite(RecordReceiver recordReceiver) { + try { + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != options.getColumns().size()) { + throw DataXException + .asDataXException( + DBUtilErrorCode.CONF_ERROR, + String.format( + "There is an error in the column configuration information. " + + "This is because you have configured a task where the number of fields to be read from the source:%s " + + "is not equal to the number of fields to be written to the destination table:%s. " + + "Please check your configuration and make changes.", + record.getColumnNumber(), + options.getColumns().size())); + } + writerManager.writeRecord(rowCodec.codec(record)); + } + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void post() { + try { + writerManager.close(); + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void destroy() {} + + @Override + public boolean supportFailOver(){ + return false; + } + } + + +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterException.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterException.java new file mode 100644 index 00000000..727d7db9 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterException.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import java.io.IOException; +import java.util.Map; + +public class SelectdbWriterException extends RuntimeException { + + private boolean reCreateLabel; + + + public SelectdbWriterException() { + super(); + } + + public SelectdbWriterException(String message) { + super(message); + } + + public SelectdbWriterException(String message, boolean reCreateLabel) { + super(message); + this.reCreateLabel = reCreateLabel; + } + + public SelectdbWriterException(String message, Throwable cause) { + super(message, cause); + } + + public SelectdbWriterException(Throwable cause) { + super(cause); + } + + protected SelectdbWriterException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterManager.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterManager.java new file mode 100644 index 00000000..bbb41702 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbWriterManager.java @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import com.google.common.base.Strings; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class SelectdbWriterManager { + + private static final Logger LOG = LoggerFactory.getLogger(SelectdbWriterManager.class); + + private final SelectdbCopyIntoObserver visitor; + private final Keys options; + private final List buffer = new ArrayList<>(); + private int batchCount = 0; + private long batchSize = 0; + private volatile boolean closed = false; + private volatile Exception flushException; + private final LinkedBlockingDeque flushQueue; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + + public SelectdbWriterManager(Keys options) { + this.options = options; + this.visitor = new SelectdbCopyIntoObserver(options); + flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); + this.startScheduler(); + this.startAsyncFlushing(); + } + + public void startScheduler() { + stopScheduler(); + this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build()); + this.scheduledFuture = this.scheduler.schedule(() -> { + synchronized (SelectdbWriterManager.this) { + if (!closed) { + try { + String label = createBatchLabel(); + LOG.info(String.format("Selectdb interval Sinking triggered: label[%s].", label)); + if (batchCount == 0) { + startScheduler(); + } + flush(label, false); + } catch (Exception e) { + flushException = e; + } + } + } + }, options.getFlushInterval(), TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + } + + public final synchronized void writeRecord(String record) throws IOException { + checkFlushException(); + try { + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + buffer.add(bts); + batchCount++; + batchSize += bts.length; + if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) { + String label = createBatchLabel(); + LOG.debug(String.format("buffer Sinking triggered: rows[%d] label [%s].", batchCount, label)); + flush(label, false); + } + } catch (Exception e) { + throw new SelectdbWriterException("Writing records to Doris failed.", e); + } + } + + public synchronized void flush(String label, boolean waitUtilDone) throws Exception { + checkFlushException(); + if (batchCount == 0) { + if (waitUtilDone) { + waitAsyncFlushingDone(); + } + return; + } + flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); + if (waitUtilDone) { + // wait the last flush + waitAsyncFlushingDone(); + } + buffer.clear(); + batchCount = 0; + batchSize = 0; + } + + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + try { + String label = createBatchLabel(); + if (batchCount > 0) + LOG.debug(String.format("Selectdb Sink is about to close: label[%s].", label)); + flush(label, true); + } catch (Exception e) { + throw new RuntimeException("Writing records to Selectdb failed.", e); + } finally { + this.visitor.close(); + } + } + checkFlushException(); + } + + public String createBatchLabel() { + StringBuilder sb = new StringBuilder(); + if (!Strings.isNullOrEmpty(options.getLabelPrefix())) { + sb.append(options.getLabelPrefix()); + } + return sb.append(UUID.randomUUID().toString()) + .toString(); + } + + private void startAsyncFlushing() { + // start flush thread + Thread flushThread = new Thread(new Runnable() { + public void run() { + while (true) { + try { + asyncFlush(); + } catch (Exception e) { + flushException = e; + } + } + } + }); + flushThread.setDaemon(true); + flushThread.start(); + } + + private void waitAsyncFlushingDone() throws InterruptedException { + // wait previous flushings + for (int i = 0; i <= options.getFlushQueueLength(); i++) { + flushQueue.put(new WriterTuple("", 0l, null)); + } + checkFlushException(); + } + + private void asyncFlush() throws Exception { + WriterTuple flushData = flushQueue.take(); + if (Strings.isNullOrEmpty(flushData.getLabel())) { + return; + } + stopScheduler(); + for (int i = 0; i <= options.getMaxRetries(); i++) { + try { + // copy into + visitor.streamLoad(flushData); + startScheduler(); + break; + } catch (Exception e) { + LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e); + if (i >= options.getMaxRetries()) { + throw new RuntimeException(e); + } + if (e instanceof SelectdbWriterException && ((SelectdbWriterException)e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); + flushData.setLabel(newLabel); + } + try { + Thread.sleep(1000l * Math.min(i + 1, 100)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to flush, interrupted while doing another attempt", e); + } + } + } + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to selectdb failed.", flushException); + } + } +} diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/WriterTuple.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/WriterTuple.java new file mode 100644 index 00000000..483ade05 --- /dev/null +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/WriterTuple.java @@ -0,0 +1,22 @@ +package com.alibaba.datax.plugin.writer.selectdbwriter; + +import java.util.List; + +public class WriterTuple { + private String label; + private Long bytes; + private List rows; + + + public WriterTuple ( String label, Long bytes, List rows){ + this.label = label; + this.rows = rows; + this.bytes = bytes; + } + + public String getLabel() { return label; } + public void setLabel(String label) { this.label = label; } + public Long getBytes() { return bytes; } + public List getRows() { return rows; } + +} diff --git a/selectdbwriter/src/main/resources/plugin.json b/selectdbwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..4b84a945 --- /dev/null +++ b/selectdbwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "selectdbwriter", + "class": "com.alibaba.datax.plugin.writer.selectdbwriter.SelectdbWriter", + "description": "selectdb writer plugin", + "developer": "selectdb" +} \ No newline at end of file diff --git a/selectdbwriter/src/main/resources/plugin_job_template.json b/selectdbwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..c603b7e0 --- /dev/null +++ b/selectdbwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,19 @@ +{ + "name": "selectdbwriter", + "parameter": { + "username": "", + "password": "", + "column": [], + "preSql": [], + "postSql": [], + "loadUrl": [], + "loadProps": {}, + "connection": [ + { + "jdbcUrl": "", + "selectedDatabase": "", + "table": [] + } + ] + } +} \ No newline at end of file