From 21ad5d0bf05af0845f89833827458ce82d7e5ae4 Mon Sep 17 00:00:00 2001 From: "jiafeng.zhang" Date: Tue, 15 Nov 2022 12:30:34 +0800 Subject: [PATCH 1/2] doris writer write error doris writer write error --- doriswriter/doc/doriswriter.md | 46 +++++++++---------- .../doriswriter/DorisStreamLoadObserver.java | 7 ++- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index ef5bbe37..cff7da1c 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -72,9 +72,9 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **jdbcUrl** - - 描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。 - - 必选:是 - - 默认值:无 + - 描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。 + - 必选:是 + - 默认值:无 * **loadUrl** @@ -84,31 +84,31 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **username** - - 描述:访问Doris数据库的用户名 - - 必选:是 - - 默认值:无 - + - 描述:访问Doris数据库的用户名 + - 必选:是 + - 默认值:无 + * **password** - - - 描述:访问Doris数据库的密码 - - 必选:否 - - 默认值:空 + + - 描述:访问Doris数据库的密码 + - 必选:否 + - 默认值:空 * **connection.selectedDatabase** - - 描述:需要写入的Doris数据库名称。 - - 必选:是 - - 默认值:无 - + - 描述:需要写入的Doris数据库名称。 + - 必选:是 + - 默认值:无 + * **connection.table** - 描述:需要写入的Doris表名称。 - 必选:是 - 默认值:无 - + * **column** - - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 - - 必选:是 - - 默认值:否 + - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 + - 必选:是 + - 默认值:否 * **preSql** @@ -165,16 +165,16 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter ```json "loadProps": { - "column_separator": "\\x01", - "row_delimiter": "\\x02" + "column_separator": "\\x01", + "line_delimiter": "\\x02" } ``` 如需更改导入格式为`json`, 则正确配置 `loadProps` 即可: ```json "loadProps": { - "format": "json", - "strip_outer_array": true + "format": "json", + "strip_outer_array": true } ``` diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index 3e4db6cf..0d9ea92f 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.writer.doriswriter; import com.alibaba.fastjson.JSON; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -161,6 +162,8 @@ public class DorisStreamLoadObserver { }); try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { HttpPut httpPut = new HttpPut(loadUrl); + httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); + httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); List cols = options.getColumns(); if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); @@ -172,9 +175,9 @@ public class DorisStreamLoadObserver { } httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("label", label); - httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("two_phase_commit", "false"); httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); - httpPut.setEntity(new ByteArrayEntity (data)); + httpPut.setEntity(new ByteArrayEntity(data)); httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { HttpEntity respEntity = getHttpEntity(resp); From 1b3eb9c016ca65ec1213bdb0907456a4e5f4928b Mon Sep 17 00:00:00 2001 From: "jiafeng.zhang" Date: Tue, 15 Nov 2022 14:32:57 +0800 Subject: [PATCH 2/2] fix --- .../plugin/writer/doriswriter/DorisStreamLoadObserver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index 0d9ea92f..efb3d9db 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -125,7 +125,7 @@ public class DorisStreamLoadObserver { private byte[] addRows(List rows, int totalBytes) { if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); - byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); for (byte[] row : rows) { bos.put(row);