From a8f70b1f2c69619e911338307e555d011100f869 Mon Sep 17 00:00:00 2001 From: fariel Date: Fri, 30 Apr 2021 18:31:25 +0800 Subject: [PATCH] add row_delimiter parameter --- doriswriter/doc/doriswriter.md | 19 ++++++- .../manager/DorisStreamLoadVisitor.java | 9 ++- .../doriswriter/row/DorisCsvSerializer.java | 49 +---------------- .../doriswriter/row/DorisDelimiterParser.java | 55 +++++++++++++++++++ 4 files changed, 82 insertions(+), 50 deletions(-) create mode 100644 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index 80b2229b..fbe6e4bb 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -157,6 +157,22 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。 * 默认值:无
+* **maxBatchRows** + + * 描述:单次StreamLoad导入的最大行数
+ + * 必选:否
+ + * 默认值:500000 (50W)
+ +* **maxBatchSize** + + * 描述:单次StreamLoad导入的最大字节数。
+ + * 必选:否
+ + * 默认值:104857600 (100M) + * **loadProps** * 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
@@ -172,7 +188,8 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。 如需更改列分隔符, 则正确配置 `loadProps` 即可: ```json "loadProps": { - "column_separator": "\\x01" + "column_separator": "\\x01", + "row_delimiter": "\\x02" } ``` diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java index 7ebfced6..1598d8f4 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java @@ -3,10 +3,12 @@ package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import com.alibaba.fastjson.JSON; import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisDelimiterParser; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; @@ -93,7 +95,12 @@ public class DorisStreamLoadVisitor { private byte[] joinRows(List rows) { if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { - return String.join("\n", rows).getBytes(StandardCharsets.UTF_8); + Map props = writerOptions.getLoadProps(); + String lineDelimiter = "\n"; + if (null != props && props.containsKey("row_delimiter")) { + lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n"); + } + return (String.join(lineDelimiter, rows) + lineDelimiter).getBytes(StandardCharsets.UTF_8); } if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { return new StringBuilder("[").append(String.join(",", rows)).append("]").toString().getBytes(StandardCharsets.UTF_8); diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java index 866c8d87..862e0b73 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java @@ -10,12 +10,10 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer private static final long serialVersionUID = 1L; - private final String HEX_STRING = "0123456789ABCDEF"; - private final String columnSeparator; public DorisCsvSerializer(String sp) { - this.columnSeparator = parseByteSeparator(sp); + this.columnSeparator = DorisDelimiterParser.parse(sp, "\t"); } @Override @@ -31,49 +29,4 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer return sb.toString(); } - private String parseByteSeparator(String sp) { - if (Strings.isNullOrEmpty(sp)) { - // `\t` by default - return "\t"; - } - if (!sp.toUpperCase().startsWith("\\X")) { - return sp; - } - String hexStr = sp.substring(2); - // check hex str - if (hexStr.isEmpty()) { - throw new RuntimeException("Failed to parse column_separator: `Hex str is empty`"); - } - if (hexStr.length() % 2 != 0) { - throw new RuntimeException("Failed to parse column_separator: `Hex str length error`"); - } - for (char hexChar : hexStr.toUpperCase().toCharArray()) { - if (HEX_STRING.indexOf(hexChar) == -1) { - throw new RuntimeException("Failed to parse column_separator: `Hex str format error`"); - } - } - // transform to separator - StringWriter writer = new StringWriter(); - for (byte b : hexStrToBytes(hexStr)) { - writer.append((char) b); - } - return writer.toString(); - } - - private byte[] hexStrToBytes(String hexStr) { - String upperHexStr = hexStr.toUpperCase(); - int length = upperHexStr.length() / 2; - char[] hexChars = upperHexStr.toCharArray(); - byte[] bytes = new byte[length]; - for (int i = 0; i < length; i++) { - int pos = i * 2; - bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); - } - return bytes; - } - - private byte charToByte(char c) { - return (byte) HEX_STRING.indexOf(c); - } - } diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java new file mode 100644 index 00000000..3fd58fa5 --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java @@ -0,0 +1,55 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.row; + +import java.io.StringWriter; + +import com.google.common.base.Strings; + +public class DorisDelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + public static String parse(String sp, String dSp) throws RuntimeException { + if (Strings.isNullOrEmpty(sp)) { + return dSp; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } + +}