doris writer write error

doris writer write error
This commit is contained in:
jiafeng.zhang 2022-11-15 12:30:34 +08:00
parent b72c57e7ac
commit 21ad5d0bf0
2 changed files with 28 additions and 25 deletions

View File

@ -72,9 +72,9 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
* **jdbcUrl** * **jdbcUrl**
- 描述Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。 - 描述Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。
- 必选:是 - 必选:是
- 默认值:无 - 默认值:无
* **loadUrl** * **loadUrl**
@ -84,20 +84,20 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
* **username** * **username**
- 描述访问Doris数据库的用户名 - 描述访问Doris数据库的用户名
- 必选:是 - 必选:是
- 默认值:无 - 默认值:无
* **password** * **password**
- 描述访问Doris数据库的密码 - 描述访问Doris数据库的密码
- 必选:否 - 必选:否
- 默认值:空 - 默认值:空
* **connection.selectedDatabase** * **connection.selectedDatabase**
- 描述需要写入的Doris数据库名称。 - 描述需要写入的Doris数据库名称。
- 必选:是 - 必选:是
- 默认值:无 - 默认值:无
* **connection.table** * **connection.table**
- 描述需要写入的Doris表名称。 - 描述需要写入的Doris表名称。
@ -106,9 +106,9 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
* **column** * **column**
- 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
- 必选:是 - 必选:是
- 默认值:否 - 默认值:否
* **preSql** * **preSql**
@ -165,16 +165,16 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
```json ```json
"loadProps": { "loadProps": {
"column_separator": "\\x01", "column_separator": "\\x01",
"row_delimiter": "\\x02" "line_delimiter": "\\x02"
} }
``` ```
如需更改导入格式为`json` 则正确配置 `loadProps` 即可: 如需更改导入格式为`json` 则正确配置 `loadProps` 即可:
```json ```json
"loadProps": { "loadProps": {
"format": "json", "format": "json",
"strip_outer_array": true "strip_outer_array": true
} }
``` ```

View File

@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
@ -161,6 +162,8 @@ public class DorisStreamLoadObserver {
}); });
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(loadUrl); HttpPut httpPut = new HttpPut(loadUrl);
httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH);
httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
List<String> cols = options.getColumns(); List<String> cols = options.getColumns();
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
@ -172,9 +175,9 @@ public class DorisStreamLoadObserver {
} }
httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label); httpPut.setHeader("label", label);
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); httpPut.setHeader("two_phase_commit", "false");
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
httpPut.setEntity(new ByteArrayEntity (data)); httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
HttpEntity respEntity = getHttpEntity(resp); HttpEntity respEntity = getHttpEntity(resp);