diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml
index aa1e6ff0..52374d30 100644
--- a/doriswriter/pom.xml
+++ b/doriswriter/pom.xml
@@ -63,6 +63,21 @@ under the License.
httpclient
4.5.13
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.13.3
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.13.3
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.3
+
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java
new file mode 100644
index 00000000..15fe26bf
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java
@@ -0,0 +1,23 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BaseResponse {
+ private int code;
+ private String msg;
+ private T data;
+ private int count;
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public T getData(){
+ return data;
+ }
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java
new file mode 100644
index 00000000..83ca128d
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java
@@ -0,0 +1,26 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Map;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyIntoResp extends BaseResponse{
+ private String code;
+ private String exception;
+
+ private Map result;
+
+ public String getDataCode() {
+ return code;
+ }
+
+ public String getException() {
+ return exception;
+ }
+
+ public Map getResult() {
+ return result;
+ }
+
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java
new file mode 100644
index 00000000..abbb7d19
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java
@@ -0,0 +1,83 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+public class CopySQLBuilder {
+ private final static String COPY_SYNC = "copy.async";
+ private final static String FIELD_DELIMITER_KEY = "column_separator";
+ private final static String COPY_FIELD_DELIMITER_KEY = "file.column_separator";
+ private final static String FIELD_DELIMITER_DEFAULT = "\t";
+ private final static String LINE_DELIMITER_KEY = "line_delimiter";
+ private final static String COPY_LINE_DELIMITER_KEY = "file.line_delimiter";
+ private final static String LINE_DELIMITER_DEFAULT = "\n";
+ private final static String COLUMNS = "columns";
+ private static final String STRIP_OUT_ARRAY = "strip_outer_array";
+ private final String fileName;
+ private final Keys options;
+ private Map properties;
+
+
+
+ public CopySQLBuilder(Keys options, String fileName) {
+ this.options=options;
+ this.fileName=fileName;
+ this.properties=options.getLoadProps();
+ }
+
+ public String buildCopySQL(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("COPY INTO ")
+ .append(options.getDatabase() + "." + options.getTable());
+
+ if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) {
+ sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS))
+ .append(" FROM @~('").append(fileName).append("') ) ")
+ .append("PROPERTIES (");
+ } else {
+ sb.append(" FROM @~('").append(fileName).append("') ")
+ .append("PROPERTIES (");
+ }
+
+ //copy into must be sync
+ properties.put(COPY_SYNC,false);
+ StringJoiner props = new StringJoiner(",");
+ for(Map.Entry entry : properties.entrySet()){
+ String key = concatPropPrefix(String.valueOf(entry.getKey()));
+ String value = "";
+ switch (key){
+ case COPY_FIELD_DELIMITER_KEY:
+ value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT);
+ break;
+ case COPY_LINE_DELIMITER_KEY:
+ value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT);
+ break;
+ default:
+ value = String.valueOf(entry.getValue());
+ break;
+ }
+ if(!key.equals(COLUMNS)){
+ String prop = String.format("'%s'='%s'", key, value);
+ props.add(prop);
+ }
+ }
+ sb.append(props).append(" )");
+ return sb.toString();
+ }
+
+ static final List PREFIX_LIST =
+ Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY);
+
+ private String concatPropPrefix(String key) {
+ if (PREFIX_LIST.contains(key)) {
+ return "file." + key;
+ }
+ if ("format".equalsIgnoreCase(key)) {
+ return "file.type";
+ }
+ return key;
+ }
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java
new file mode 100644
index 00000000..95685260
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java
@@ -0,0 +1,172 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class DorisCopyIntoObserver {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisCopyIntoObserver.class);
+
+ private Keys options;
+ private long pos;
+ public static final int SUCCESS = 0;
+ public static final String FAIL = "1";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final HttpClientBuilder httpClientBuilder = HttpClients
+ .custom()
+ .disableRedirectHandling();
+ private CloseableHttpClient httpClient;
+ private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
+ private static final String COMMIT_PATTERN = "%s/copy/query";
+ private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");
+
+ public DorisCopyIntoObserver(Keys options) {
+ this.options = options;
+ this.httpClient = httpClientBuilder.build();
+
+ }
+
+ public void streamLoad(WriterTuple data) throws Exception {
+ String host = DorisUtil.getLoadHost(options);
+ String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
+ String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
+ put(uploadAddress, data.getLabel(), DorisUtil.addRows(options, data.getRows(), data.getBytes().intValue()));
+ executeCopy(host, data.getLabel());
+
+ }
+
+ private String getUploadAddress(String loadUrl, String fileName) throws IOException {
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(loadUrl)
+ .addFileName(fileName)
+ .addCommonHeader()
+ .setEmptyEntity()
+ .baseAuth(options.getUsername(), options.getPassword());
+ CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
+ int statusCode = execute.getStatusLine().getStatusCode();
+ String reason = execute.getStatusLine().getReasonPhrase();
+ if (statusCode == 307) {
+ Header location = execute.getFirstHeader("location");
+ String uploadAddress = location.getValue();
+ LOG.info("redirect to s3:{}", uploadAddress);
+ return uploadAddress;
+ } else {
+ HttpEntity entity = execute.getEntity();
+ String result = entity == null ? null : EntityUtils.toString(entity);
+ LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
+ throw new RuntimeException("Could not get the redirected address.");
+ }
+
+ }
+
+
+ public void put(String loadUrl, String fileName, byte[] data) throws IOException {
+ LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(loadUrl)
+ .addCommonHeader()
+ .setEntity(new ByteArrayEntity(data));
+ CloseableHttpResponse response = httpClient.execute(putBuilder.build());
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
+ LOG.error("upload file {} error, response {}", fileName, result);
+ throw new DorisWriterExcetion("upload file error: " + fileName, true);
+ }
+ }
+
+
+ /**
+ * execute copy into
+ */
+ public void executeCopy(String hostPort, String fileName) throws IOException {
+ long start = System.currentTimeMillis();
+ CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
+ String copySQL = copySQLBuilder.buildCopySQL();
+ LOG.info("build copy SQL is {}", copySQL);
+ Map params = new HashMap<>();
+ params.put("sql", copySQL);
+ if (StringUtils.isNotBlank(options.getClusterName())) {
+ params.put("cluster", options.getClusterName());
+ }
+ HttpPostBuilder postBuilder = new HttpPostBuilder();
+ postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
+ .baseAuth(options.getUsername(), options.getPassword())
+ .setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
+
+ CloseableHttpResponse response = httpClient.execute(postBuilder.build());
+ final int statusCode = response.getStatusLine().getStatusCode();
+ final String reasonPhrase = response.getStatusLine().getReasonPhrase();
+ String loadResult = "";
+ if (statusCode != 200) {
+ LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
+ throw new DorisWriterExcetion("commit error with file: " + fileName, true);
+ } else if (response.getEntity() != null) {
+ loadResult = EntityUtils.toString(response.getEntity());
+ boolean success = handleCommitResponse(loadResult);
+ if (success) {
+ LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
+ } else {
+ LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult);
+ String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s",
+ statusCode, reasonPhrase, loadResult, copySQL);
+ throw new DorisWriterExcetion(copyErrMsg, true);
+ }
+ }
+ }
+
+ public boolean handleCommitResponse(String loadResult) throws IOException {
+ BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference() {
+ });
+ if (baseResponse.getCode() == SUCCESS) {
+ CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class);
+ if (FAIL.equals(dataResp.getDataCode())) {
+ LOG.error("copy into execute failed, reason:{}", loadResult);
+ return false;
+ } else {
+ Map result = dataResp.getResult();
+ if (DorisUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))) {
+ LOG.error("copy into load failed, reason:{}", loadResult);
+ return false;
+ } else {
+ return true;
+ }
+ }
+ } else {
+ LOG.error("commit failed, reason:{}", loadResult);
+ return false;
+ }
+ }
+
+ public static boolean isCommitted(String msg) {
+ return COMMITTED_PATTERN.matcher(msg).matches();
+ }
+
+
+ public void close() throws IOException {
+ if (null != httpClient) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOG.error("Closing httpClient failed.", e);
+ throw new RuntimeException("Closing httpClient failed.", e);
+ }
+ }
+ }
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
index e1f6e0ee..34a6d83a 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -48,11 +49,20 @@ public class DorisStreamLoadObserver {
this.options = options;
}
- public void streamLoad(WriterTuple data) throws Exception {
- String host = getLoadHost();
- if(host == null){
- throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
+ public String urlDecode(String outBuffer) {
+ String data = outBuffer;
+ try {
+ data = data.replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+ data = data.replaceAll("\\+", "%2B");
+ data = URLDecoder.decode(data, "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ return data;
+ }
+
+ public void streamLoad(WriterTuple data) throws Exception {
+ String host = DorisUtil.getLoadHost(options);
String loadUrl = new StringBuilder(host)
.append("/api/")
.append(options.getDatabase())
@@ -61,7 +71,8 @@ public class DorisStreamLoadObserver {
.append("/_stream_load")
.toString();
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
- Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
+ loadUrl = urlDecode(loadUrl);
+ Map loadResult = put(loadUrl, data.getLabel(), DorisUtil.addRows(options,data.getRows(), data.getBytes().intValue()));
LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
@@ -123,35 +134,7 @@ public class DorisStreamLoadObserver {
}
}
- private byte[] addRows(List rows, int totalBytes) {
- if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
- Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
- byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
- for (byte[] row : rows) {
- bos.put(row);
- bos.put(lineDelimiter);
- }
- return bos.array();
- }
- if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
- bos.put("[".getBytes(StandardCharsets.UTF_8));
- byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
- boolean isFirstElement = true;
- for (byte[] row : rows) {
- if (!isFirstElement) {
- bos.put(jsonDelimiter);
- }
- bos.put(row);
- isFirstElement = false;
- }
- bos.put("]".getBytes(StandardCharsets.UTF_8));
- return bos.array();
- }
- throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
- }
private Map put(String loadUrl, String label, byte[] data) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
@@ -208,28 +191,4 @@ public class DorisStreamLoadObserver {
}
return respEntity;
}
-
- private String getLoadHost() {
- List hostList = options.getLoadUrlList();
- Collections.shuffle(hostList);
- String host = new StringBuilder("http://").append(hostList.get((0))).toString();
- if (checkConnection(host)){
- return host;
- }
- return null;
- }
-
- private boolean checkConnection(String host) {
- try {
- URL url = new URL(host);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(5000);
- co.connect();
- co.disconnect();
- return true;
- } catch (Exception e1) {
- e1.printStackTrace();
- return false;
- }
- }
}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
index 5f5a6f34..7c5ba5b3 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
@@ -5,16 +5,26 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
+import com.alibaba.fastjson2.JSON;
import com.google.common.base.Strings;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
/**
* jdbc util
@@ -102,4 +112,96 @@ public class DorisUtil {
}
}
}
+
+ public static T checkNotNull(T reference) {
+ if (reference == null) {
+ throw new NullPointerException();
+ } else {
+ return reference;
+ }
+ }
+
+ public static String getLoadHost(Keys options) throws IOException {
+ List hostList = options.getLoadUrlList();
+ for (int i = 0; i < hostList.size(); i++) {
+ String host = new StringBuilder("http://").append(hostList.get((i))).toString();
+ if (checkConnection(host)) {
+ return host;
+ }
+ continue;
+ }
+ throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
+ }
+
+
+ private static boolean checkConnection(String host) {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(5000);
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e1) {
+ LOG.error("The connection failed, host is {}", host);
+ return false;
+ }
+ }
+
+
+ public static boolean checkIsStreamLoad(Keys options) {
+ final HttpClientBuilder httpClientBuilder = HttpClients
+ .custom()
+ .disableRedirectHandling();
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ String url = getLoadHost(options) + "/copy/query";
+ HttpPost httpPost = new HttpPost(url);
+ try (CloseableHttpResponse resp = httpclient.execute(httpPost)) {
+ if (resp.getStatusLine().getStatusCode() == 200) {
+ Map result = (Map) JSON.parse(EntityUtils.toString(resp.getEntity()));
+ if (result != null && (int) result.get("code") == 401) {
+ return false;
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+
+ public static byte[] addRows(Keys options, List rows, int totalBytes) {
+ if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
+ Map props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
+ byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
+ for (byte[] row : rows) {
+ bos.put(row);
+ bos.put(lineDelimiter);
+ }
+ return bos.array();
+ }
+
+ if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
+ bos.put("[".getBytes(StandardCharsets.UTF_8));
+ byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+ boolean isFirstElement = true;
+ for (byte[] row : rows) {
+ if (!isFirstElement) {
+ bos.put(jsonDelimiter);
+ }
+ bos.put(row);
+ isFirstElement = false;
+ }
+ bos.put("]".getBytes(StandardCharsets.UTF_8));
+ return bos.array();
+ }
+ throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
+ }
+
+ public static boolean isNullOrEmpty(Map, ?> map) {
+ return map == null || map.isEmpty();
+ }
}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
index 7797d79f..eae39a71 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
@@ -5,15 +5,20 @@ import java.util.Map;
public class DorisWriterExcetion extends IOException {
- private final Map response;
+ private Map response;
private boolean reCreateLabel;
- public DorisWriterExcetion ( String message, Map response) {
+ public DorisWriterExcetion(String message, Map response) {
super(message);
this.response = response;
}
- public DorisWriterExcetion ( String message, Map response, boolean reCreateLabel) {
+ public DorisWriterExcetion(String message, boolean reCreateLabel) {
+ super(message);
+ this.reCreateLabel = reCreateLabel;
+ }
+
+ public DorisWriterExcetion(String message, Map response, boolean reCreateLabel) {
super(message);
this.response = response;
this.reCreateLabel = reCreateLabel;
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
index f0ba6b52..c7696425 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
@@ -20,7 +20,8 @@ public class DorisWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
- private final DorisStreamLoadObserver visitor;
+ private final DorisStreamLoadObserver streamLoadObserver;
+ private final DorisCopyIntoObserver copyIntoObserver;
private final Keys options;
private final List buffer = new ArrayList<> ();
private int batchCount = 0;
@@ -33,7 +34,8 @@ public class DorisWriterManager {
public DorisWriterManager( Keys options) {
this.options = options;
- this.visitor = new DorisStreamLoadObserver (options);
+ this.streamLoadObserver = new DorisStreamLoadObserver (options);
+ this.copyIntoObserver = new DorisCopyIntoObserver(options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
@@ -160,7 +162,11 @@ public class DorisWriterManager {
for (int i = 0; i <= options.getMaxRetries(); i++) {
try {
// flush to Doris with stream load
- visitor.streamLoad(flushData);
+ if (DorisUtil.checkIsStreamLoad(options)) {
+ streamLoadObserver.streamLoad(flushData);
+ } else {
+ copyIntoObserver.streamLoad(flushData);
+ }
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler();
break;
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java
new file mode 100644
index 00000000..10e32f1a
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java
@@ -0,0 +1,51 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPost;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class HttpPostBuilder {
+ String url;
+ Map header;
+ HttpEntity httpEntity;
+ public HttpPostBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPostBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPostBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPostBuilder baseAuth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
+ return this;
+ }
+
+ public HttpPostBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPost build() {
+ DorisUtil.checkNotNull(url);
+ DorisUtil.checkNotNull(httpEntity);
+ HttpPost put = new HttpPost(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java
new file mode 100644
index 00000000..546429bc
--- /dev/null
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java
@@ -0,0 +1,65 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPutBuilder {
+ String url;
+ Map header;
+ HttpEntity httpEntity;
+ public HttpPutBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPutBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPutBuilder addFileName(String fileName){
+ header.put("fileName", fileName);
+ return this;
+ }
+
+ public HttpPutBuilder setEmptyEntity() {
+ try {
+ this.httpEntity = new StringEntity("");
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ public HttpPutBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPutBuilder baseAuth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
+ return this;
+ }
+
+ public HttpPutBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPut build() {
+ DorisUtil.checkNotNull(url);
+ DorisUtil.checkNotNull(httpEntity);
+ HttpPut put = new HttpPut(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
index e460e76b..ec2533ea 100644
--- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
+++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
@@ -36,6 +36,8 @@ public class Keys implements Serializable {
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";
+ private static final String CLUSTER_NAME = "clusterName";
+
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
@@ -89,6 +91,10 @@ public class Keys implements Serializable {
return options.getList(LOAD_URL, String.class);
}
+ public String getClusterName() {
+ return options.getString(CLUSTER_NAME);
+ }
+
public List getColumns() {
if (isWildcardColumn) {
return this.infoSchemaColumns;