mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 06:21:01 +08:00
Merge pull request #1603 from hf200012/doriswriter-doc-fix
doris writer write error
This commit is contained in:
commit
3e13d6a788
@ -166,7 +166,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
```json
|
```json
|
||||||
"loadProps": {
|
"loadProps": {
|
||||||
"column_separator": "\\x01",
|
"column_separator": "\\x01",
|
||||||
"row_delimiter": "\\x02"
|
"line_delimiter": "\\x02"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package com.alibaba.datax.plugin.writer.doriswriter;
|
|||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
@ -124,7 +125,7 @@ public class DorisStreamLoadObserver {
|
|||||||
private byte[] addRows(List<byte[]> rows, int totalBytes) {
|
private byte[] addRows(List<byte[]> rows, int totalBytes) {
|
||||||
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||||
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
||||||
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
||||||
for (byte[] row : rows) {
|
for (byte[] row : rows) {
|
||||||
bos.put(row);
|
bos.put(row);
|
||||||
@ -161,6 +162,8 @@ public class DorisStreamLoadObserver {
|
|||||||
});
|
});
|
||||||
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||||
HttpPut httpPut = new HttpPut(loadUrl);
|
HttpPut httpPut = new HttpPut(loadUrl);
|
||||||
|
httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH);
|
||||||
|
httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
|
||||||
List<String> cols = options.getColumns();
|
List<String> cols = options.getColumns();
|
||||||
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||||
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
|
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
|
||||||
@ -172,9 +175,9 @@ public class DorisStreamLoadObserver {
|
|||||||
}
|
}
|
||||||
httpPut.setHeader("Expect", "100-continue");
|
httpPut.setHeader("Expect", "100-continue");
|
||||||
httpPut.setHeader("label", label);
|
httpPut.setHeader("label", label);
|
||||||
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
httpPut.setHeader("two_phase_commit", "false");
|
||||||
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
|
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
|
||||||
httpPut.setEntity(new ByteArrayEntity (data));
|
httpPut.setEntity(new ByteArrayEntity(data));
|
||||||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
||||||
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
||||||
HttpEntity respEntity = getHttpEntity(resp);
|
HttpEntity respEntity = getHttpEntity(resp);
|
||||||
|
Loading…
Reference in New Issue
Block a user