add row_delimiter parameter

This commit is contained in:
fariel 2021-04-30 18:31:25 +08:00
parent 342c044aaa
commit a8f70b1f2c
4 changed files with 82 additions and 50 deletions

View File

@ -157,6 +157,22 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。
* 默认值:无 <br />
* **maxBatchRows**
* 描述单次StreamLoad导入的最大行数 <br />
* 必选:否 <br />
* 默认值500000 (50W) <br />
* **maxBatchSize**
* 描述单次StreamLoad导入的最大字节数。 <br />
* 必选:否 <br />
* 默认值104857600 (100M)
* **loadProps**
* 描述StreamLoad 的请求参数详情参照StreamLoad介绍页面。 <br />
@ -172,7 +188,8 @@ DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。
如需更改列分隔符, 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"column_separator": "\\x01"
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
```

View File

@ -3,10 +3,12 @@ package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisDelimiterParser;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
@ -93,7 +95,12 @@ public class DorisStreamLoadVisitor {
private byte[] joinRows(List<String> rows) {
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
return String.join("\n", rows).getBytes(StandardCharsets.UTF_8);
Map<String, Object> props = writerOptions.getLoadProps();
String lineDelimiter = "\n";
if (null != props && props.containsKey("row_delimiter")) {
lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n");
}
return (String.join(lineDelimiter, rows) + lineDelimiter).getBytes(StandardCharsets.UTF_8);
}
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new StringBuilder("[").append(String.join(",", rows)).append("]").toString().getBytes(StandardCharsets.UTF_8);

View File

@ -10,12 +10,10 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer
private static final long serialVersionUID = 1L;
private final String HEX_STRING = "0123456789ABCDEF";
private final String columnSeparator;
public DorisCsvSerializer(String sp) {
this.columnSeparator = parseByteSeparator(sp);
this.columnSeparator = DorisDelimiterParser.parse(sp, "\t");
}
@Override
@ -31,49 +29,4 @@ public class DorisCsvSerializer extends DorisBaseSerializer implements DorisISer
return sb.toString();
}
private String parseByteSeparator(String sp) {
if (Strings.isNullOrEmpty(sp)) {
// `\t` by default
return "\t";
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse column_separator: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse column_separator: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse column_separator: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}

View File

@ -0,0 +1,55 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
import java.io.StringWriter;
import com.google.common.base.Strings;
public class DorisDelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp, String dSp) throws RuntimeException {
if (Strings.isNullOrEmpty(sp)) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private static byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private static byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}