diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md
index 80b2229b..fbe6e4bb 100644
--- a/doriswriter/doc/doriswriter.md
+++ b/doriswriter/doc/doriswriter.md
@@ -157,6 +157,22 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。
* 默认值:无
+* **maxBatchRows**
+
+ * 描述:单次StreamLoad导入的最大行数
+
+ * 必选:否
+
+ * 默认值:500000 (50W)
+
+* **maxBatchSize**
+
+ * 描述:单次StreamLoad导入的最大字节数。
+
+ * 必选:否
+
+ * 默认值:104857600 (100M)
+
* **loadProps**
* 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
@@ -172,7 +188,8 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。
如需更改列分隔符, 则正确配置 `loadProps` 即可:
```json
"loadProps": {
- "column_separator": "\\x01"
+ "column_separator": "\\x01",
+ "row_delimiter": "\\x02"
}
```
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java
index 7ebfced6..1598d8f4 100644
--- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java
@@ -3,10 +3,12 @@ package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisDelimiterParser;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
@@ -93,7 +95,12 @@ public class DorisStreamLoadVisitor {
private byte[] joinRows(List rows) {
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
- return String.join("\n", rows).getBytes(StandardCharsets.UTF_8);
+ Map props = writerOptions.getLoadProps();
+ String lineDelimiter = "\n";
+ if (null != props && props.containsKey("row_delimiter")) {
+ lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n");
+ }
+ return (String.join(lineDelimiter, rows) + lineDelimiter).getBytes(StandardCharsets.UTF_8);
}
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new StringBuilder("[").append(String.join(",", rows)).append("]").toString().getBytes(StandardCharsets.UTF_8);
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java
index 866c8d87..862e0b73 100644
--- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java
@@ -10,12 +10,10 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer
private static final long serialVersionUID = 1L;
- private final String HEX_STRING = "0123456789ABCDEF";
-
private final String columnSeparator;
public DorisCsvSerializer(String sp) {
- this.columnSeparator = parseByteSeparator(sp);
+ this.columnSeparator = DorisDelimiterParser.parse(sp, "\t");
}
@Override
@@ -31,49 +29,4 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer
return sb.toString();
}
- private String parseByteSeparator(String sp) {
- if (Strings.isNullOrEmpty(sp)) {
- // `\t` by default
- return "\t";
- }
- if (!sp.toUpperCase().startsWith("\\X")) {
- return sp;
- }
- String hexStr = sp.substring(2);
- // check hex str
- if (hexStr.isEmpty()) {
- throw new RuntimeException("Failed to parse column_separator: `Hex str is empty`");
- }
- if (hexStr.length() % 2 != 0) {
- throw new RuntimeException("Failed to parse column_separator: `Hex str length error`");
- }
- for (char hexChar : hexStr.toUpperCase().toCharArray()) {
- if (HEX_STRING.indexOf(hexChar) == -1) {
- throw new RuntimeException("Failed to parse column_separator: `Hex str format error`");
- }
- }
- // transform to separator
- StringWriter writer = new StringWriter();
- for (byte b : hexStrToBytes(hexStr)) {
- writer.append((char) b);
- }
- return writer.toString();
- }
-
- private byte[] hexStrToBytes(String hexStr) {
- String upperHexStr = hexStr.toUpperCase();
- int length = upperHexStr.length() / 2;
- char[] hexChars = upperHexStr.toCharArray();
- byte[] bytes = new byte[length];
- for (int i = 0; i < length; i++) {
- int pos = i * 2;
- bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
- }
- return bytes;
- }
-
- private byte charToByte(char c) {
- return (byte) HEX_STRING.indexOf(c);
- }
-
}
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java
new file mode 100644
index 00000000..3fd58fa5
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisDelimiterParser.java
@@ -0,0 +1,55 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
+
+import java.io.StringWriter;
+
+import com.google.common.base.Strings;
+
+public class DorisDelimiterParser {
+
+ private static final String HEX_STRING = "0123456789ABCDEF";
+
+ public static String parse(String sp, String dSp) throws RuntimeException {
+ if (Strings.isNullOrEmpty(sp)) {
+ return dSp;
+ }
+ if (!sp.toUpperCase().startsWith("\\X")) {
+ return sp;
+ }
+ String hexStr = sp.substring(2);
+ // check hex str
+ if (hexStr.isEmpty()) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
+ }
+ if (hexStr.length() % 2 != 0) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
+ }
+ for (char hexChar : hexStr.toUpperCase().toCharArray()) {
+ if (HEX_STRING.indexOf(hexChar) == -1) {
+ throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
+ }
+ }
+ // transform to separator
+ StringWriter writer = new StringWriter();
+ for (byte b : hexStrToBytes(hexStr)) {
+ writer.append((char) b);
+ }
+ return writer.toString();
+ }
+
+ private static byte[] hexStrToBytes(String hexStr) {
+ String upperHexStr = hexStr.toUpperCase();
+ int length = upperHexStr.length() / 2;
+ char[] hexChars = upperHexStr.toCharArray();
+ byte[] bytes = new byte[length];
+ for (int i = 0; i < length; i++) {
+ int pos = i * 2;
+ bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+ }
+ return bytes;
+ }
+
+ private static byte charToByte(char c) {
+ return (byte) HEX_STRING.indexOf(c);
+ }
+
+}