mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:59:51 +08:00
Merge 80b9e5f73f
into 0824b45c5e
This commit is contained in:
commit
a2b16be5aa
@ -63,6 +63,21 @@ under the License.
|
|||||||
<artifactId>httpclient</artifactId>
|
<artifactId>httpclient</artifactId>
|
||||||
<version>4.5.13</version>
|
<version>4.5.13</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-annotations</artifactId>
|
||||||
|
<version>2.13.3</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-core</artifactId>
|
||||||
|
<version>2.13.3</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>2.13.3</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
@ -0,0 +1,23 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class BaseResponse<T> {
|
||||||
|
private int code;
|
||||||
|
private String msg;
|
||||||
|
private T data;
|
||||||
|
private int count;
|
||||||
|
|
||||||
|
public int getCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMsg() {
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T getData(){
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class CopyIntoResp extends BaseResponse{
|
||||||
|
private String code;
|
||||||
|
private String exception;
|
||||||
|
|
||||||
|
private Map<String,String> result;
|
||||||
|
|
||||||
|
public String getDataCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getException() {
|
||||||
|
return exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
|
public class CopySQLBuilder {
|
||||||
|
private final static String COPY_SYNC = "copy.async";
|
||||||
|
private final static String FIELD_DELIMITER_KEY = "column_separator";
|
||||||
|
private final static String COPY_FIELD_DELIMITER_KEY = "file.column_separator";
|
||||||
|
private final static String FIELD_DELIMITER_DEFAULT = "\t";
|
||||||
|
private final static String LINE_DELIMITER_KEY = "line_delimiter";
|
||||||
|
private final static String COPY_LINE_DELIMITER_KEY = "file.line_delimiter";
|
||||||
|
private final static String LINE_DELIMITER_DEFAULT = "\n";
|
||||||
|
private final static String COLUMNS = "columns";
|
||||||
|
private static final String STRIP_OUT_ARRAY = "strip_outer_array";
|
||||||
|
private final String fileName;
|
||||||
|
private final Keys options;
|
||||||
|
private Map<String, Object> properties;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public CopySQLBuilder(Keys options, String fileName) {
|
||||||
|
this.options=options;
|
||||||
|
this.fileName=fileName;
|
||||||
|
this.properties=options.getLoadProps();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String buildCopySQL(){
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("COPY INTO ")
|
||||||
|
.append(options.getDatabase() + "." + options.getTable());
|
||||||
|
|
||||||
|
if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) {
|
||||||
|
sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS))
|
||||||
|
.append(" FROM @~('").append(fileName).append("') ) ")
|
||||||
|
.append("PROPERTIES (");
|
||||||
|
} else {
|
||||||
|
sb.append(" FROM @~('").append(fileName).append("') ")
|
||||||
|
.append("PROPERTIES (");
|
||||||
|
}
|
||||||
|
|
||||||
|
//copy into must be sync
|
||||||
|
properties.put(COPY_SYNC,false);
|
||||||
|
StringJoiner props = new StringJoiner(",");
|
||||||
|
for(Map.Entry<String,Object> entry : properties.entrySet()){
|
||||||
|
String key = concatPropPrefix(String.valueOf(entry.getKey()));
|
||||||
|
String value = "";
|
||||||
|
switch (key){
|
||||||
|
case COPY_FIELD_DELIMITER_KEY:
|
||||||
|
value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT);
|
||||||
|
break;
|
||||||
|
case COPY_LINE_DELIMITER_KEY:
|
||||||
|
value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
value = String.valueOf(entry.getValue());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if(!key.equals(COLUMNS)){
|
||||||
|
String prop = String.format("'%s'='%s'", key, value);
|
||||||
|
props.add(prop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.append(props).append(" )");
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static final List<String> PREFIX_LIST =
|
||||||
|
Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY);
|
||||||
|
|
||||||
|
private String concatPropPrefix(String key) {
|
||||||
|
if (PREFIX_LIST.contains(key)) {
|
||||||
|
return "file." + key;
|
||||||
|
}
|
||||||
|
if ("format".equalsIgnoreCase(key)) {
|
||||||
|
return "file.type";
|
||||||
|
}
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,172 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.http.Header;
|
||||||
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.entity.ByteArrayEntity;
|
||||||
|
import org.apache.http.entity.StringEntity;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClientBuilder;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
public class DorisCopyIntoObserver {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(DorisCopyIntoObserver.class);
|
||||||
|
|
||||||
|
private Keys options;
|
||||||
|
private long pos;
|
||||||
|
public static final int SUCCESS = 0;
|
||||||
|
public static final String FAIL = "1";
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
private final HttpClientBuilder httpClientBuilder = HttpClients
|
||||||
|
.custom()
|
||||||
|
.disableRedirectHandling();
|
||||||
|
private CloseableHttpClient httpClient;
|
||||||
|
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
|
||||||
|
private static final String COMMIT_PATTERN = "%s/copy/query";
|
||||||
|
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");
|
||||||
|
|
||||||
|
public DorisCopyIntoObserver(Keys options) {
|
||||||
|
this.options = options;
|
||||||
|
this.httpClient = httpClientBuilder.build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void streamLoad(WriterTuple data) throws Exception {
|
||||||
|
String host = DorisUtil.getLoadHost(options);
|
||||||
|
String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
|
||||||
|
String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
|
||||||
|
put(uploadAddress, data.getLabel(), DorisUtil.addRows(options, data.getRows(), data.getBytes().intValue()));
|
||||||
|
executeCopy(host, data.getLabel());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getUploadAddress(String loadUrl, String fileName) throws IOException {
|
||||||
|
HttpPutBuilder putBuilder = new HttpPutBuilder();
|
||||||
|
putBuilder.setUrl(loadUrl)
|
||||||
|
.addFileName(fileName)
|
||||||
|
.addCommonHeader()
|
||||||
|
.setEmptyEntity()
|
||||||
|
.baseAuth(options.getUsername(), options.getPassword());
|
||||||
|
CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
|
||||||
|
int statusCode = execute.getStatusLine().getStatusCode();
|
||||||
|
String reason = execute.getStatusLine().getReasonPhrase();
|
||||||
|
if (statusCode == 307) {
|
||||||
|
Header location = execute.getFirstHeader("location");
|
||||||
|
String uploadAddress = location.getValue();
|
||||||
|
LOG.info("redirect to s3:{}", uploadAddress);
|
||||||
|
return uploadAddress;
|
||||||
|
} else {
|
||||||
|
HttpEntity entity = execute.getEntity();
|
||||||
|
String result = entity == null ? null : EntityUtils.toString(entity);
|
||||||
|
LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
|
||||||
|
throw new RuntimeException("Could not get the redirected address.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void put(String loadUrl, String fileName, byte[] data) throws IOException {
|
||||||
|
LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
|
||||||
|
HttpPutBuilder putBuilder = new HttpPutBuilder();
|
||||||
|
putBuilder.setUrl(loadUrl)
|
||||||
|
.addCommonHeader()
|
||||||
|
.setEntity(new ByteArrayEntity(data));
|
||||||
|
CloseableHttpResponse response = httpClient.execute(putBuilder.build());
|
||||||
|
final int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode != 200) {
|
||||||
|
String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
|
||||||
|
LOG.error("upload file {} error, response {}", fileName, result);
|
||||||
|
throw new DorisWriterExcetion("upload file error: " + fileName, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* execute copy into
|
||||||
|
*/
|
||||||
|
public void executeCopy(String hostPort, String fileName) throws IOException {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
|
||||||
|
String copySQL = copySQLBuilder.buildCopySQL();
|
||||||
|
LOG.info("build copy SQL is {}", copySQL);
|
||||||
|
Map<String, String> params = new HashMap<>();
|
||||||
|
params.put("sql", copySQL);
|
||||||
|
if (StringUtils.isNotBlank(options.getClusterName())) {
|
||||||
|
params.put("cluster", options.getClusterName());
|
||||||
|
}
|
||||||
|
HttpPostBuilder postBuilder = new HttpPostBuilder();
|
||||||
|
postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
|
||||||
|
.baseAuth(options.getUsername(), options.getPassword())
|
||||||
|
.setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
|
||||||
|
|
||||||
|
CloseableHttpResponse response = httpClient.execute(postBuilder.build());
|
||||||
|
final int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
|
||||||
|
String loadResult = "";
|
||||||
|
if (statusCode != 200) {
|
||||||
|
LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
|
||||||
|
throw new DorisWriterExcetion("commit error with file: " + fileName, true);
|
||||||
|
} else if (response.getEntity() != null) {
|
||||||
|
loadResult = EntityUtils.toString(response.getEntity());
|
||||||
|
boolean success = handleCommitResponse(loadResult);
|
||||||
|
if (success) {
|
||||||
|
LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
|
||||||
|
} else {
|
||||||
|
LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult);
|
||||||
|
String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s",
|
||||||
|
statusCode, reasonPhrase, loadResult, copySQL);
|
||||||
|
throw new DorisWriterExcetion(copyErrMsg, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean handleCommitResponse(String loadResult) throws IOException {
|
||||||
|
BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse>() {
|
||||||
|
});
|
||||||
|
if (baseResponse.getCode() == SUCCESS) {
|
||||||
|
CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class);
|
||||||
|
if (FAIL.equals(dataResp.getDataCode())) {
|
||||||
|
LOG.error("copy into execute failed, reason:{}", loadResult);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
Map<String, String> result = dataResp.getResult();
|
||||||
|
if (DorisUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))) {
|
||||||
|
LOG.error("copy into load failed, reason:{}", loadResult);
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.error("commit failed, reason:{}", loadResult);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isCommitted(String msg) {
|
||||||
|
return COMMITTED_PATTERN.matcher(msg).matches();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (null != httpClient) {
|
||||||
|
try {
|
||||||
|
httpClient.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Closing httpClient failed.", e);
|
||||||
|
throw new RuntimeException("Closing httpClient failed.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.net.URLDecoder;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -48,11 +49,20 @@ public class DorisStreamLoadObserver {
|
|||||||
this.options = options;
|
this.options = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void streamLoad(WriterTuple data) throws Exception {
|
public String urlDecode(String outBuffer) {
|
||||||
String host = getLoadHost();
|
String data = outBuffer;
|
||||||
if(host == null){
|
try {
|
||||||
throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
|
data = data.replaceAll("%(?![0-9a-fA-F]{2})", "%25");
|
||||||
|
data = data.replaceAll("\\+", "%2B");
|
||||||
|
data = URLDecoder.decode(data, "utf-8");
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void streamLoad(WriterTuple data) throws Exception {
|
||||||
|
String host = DorisUtil.getLoadHost(options);
|
||||||
String loadUrl = new StringBuilder(host)
|
String loadUrl = new StringBuilder(host)
|
||||||
.append("/api/")
|
.append("/api/")
|
||||||
.append(options.getDatabase())
|
.append(options.getDatabase())
|
||||||
@ -61,7 +71,8 @@ public class DorisStreamLoadObserver {
|
|||||||
.append("/_stream_load")
|
.append("/_stream_load")
|
||||||
.toString();
|
.toString();
|
||||||
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
|
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
|
||||||
Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
|
loadUrl = urlDecode(loadUrl);
|
||||||
|
Map<String, Object> loadResult = put(loadUrl, data.getLabel(), DorisUtil.addRows(options,data.getRows(), data.getBytes().intValue()));
|
||||||
LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
|
LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
|
||||||
final String keyStatus = "Status";
|
final String keyStatus = "Status";
|
||||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
||||||
@ -123,35 +134,7 @@ public class DorisStreamLoadObserver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] addRows(List<byte[]> rows, int totalBytes) {
|
|
||||||
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
|
||||||
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
|
||||||
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
|
||||||
for (byte[] row : rows) {
|
|
||||||
bos.put(row);
|
|
||||||
bos.put(lineDelimiter);
|
|
||||||
}
|
|
||||||
return bos.array();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
|
||||||
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
|
||||||
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
|
||||||
boolean isFirstElement = true;
|
|
||||||
for (byte[] row : rows) {
|
|
||||||
if (!isFirstElement) {
|
|
||||||
bos.put(jsonDelimiter);
|
|
||||||
}
|
|
||||||
bos.put(row);
|
|
||||||
isFirstElement = false;
|
|
||||||
}
|
|
||||||
bos.put("]".getBytes(StandardCharsets.UTF_8));
|
|
||||||
return bos.array();
|
|
||||||
}
|
|
||||||
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
|
||||||
}
|
|
||||||
private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
|
private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
|
||||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
||||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
||||||
@ -208,28 +191,4 @@ public class DorisStreamLoadObserver {
|
|||||||
}
|
}
|
||||||
return respEntity;
|
return respEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getLoadHost() {
|
|
||||||
List<String> hostList = options.getLoadUrlList();
|
|
||||||
Collections.shuffle(hostList);
|
|
||||||
String host = new StringBuilder("http://").append(hostList.get((0))).toString();
|
|
||||||
if (checkConnection(host)){
|
|
||||||
return host;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean checkConnection(String host) {
|
|
||||||
try {
|
|
||||||
URL url = new URL(host);
|
|
||||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
|
||||||
co.setConnectTimeout(5000);
|
|
||||||
co.connect();
|
|
||||||
co.disconnect();
|
|
||||||
return true;
|
|
||||||
} catch (Exception e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -5,16 +5,26 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|||||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||||
import com.alibaba.druid.sql.parser.ParserException;
|
import com.alibaba.druid.sql.parser.ParserException;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClientBuilder;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* jdbc util
|
* jdbc util
|
||||||
@ -102,4 +112,96 @@ public class DorisUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> T checkNotNull(T reference) {
|
||||||
|
if (reference == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
} else {
|
||||||
|
return reference;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getLoadHost(Keys options) throws IOException {
|
||||||
|
List<String> hostList = options.getLoadUrlList();
|
||||||
|
for (int i = 0; i < hostList.size(); i++) {
|
||||||
|
String host = new StringBuilder("http://").append(hostList.get((i))).toString();
|
||||||
|
if (checkConnection(host)) {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static boolean checkConnection(String host) {
|
||||||
|
try {
|
||||||
|
URL url = new URL(host);
|
||||||
|
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
||||||
|
co.setConnectTimeout(5000);
|
||||||
|
co.connect();
|
||||||
|
co.disconnect();
|
||||||
|
return true;
|
||||||
|
} catch (Exception e1) {
|
||||||
|
LOG.error("The connection failed, host is {}", host);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static boolean checkIsStreamLoad(Keys options) {
|
||||||
|
final HttpClientBuilder httpClientBuilder = HttpClients
|
||||||
|
.custom()
|
||||||
|
.disableRedirectHandling();
|
||||||
|
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||||
|
String url = getLoadHost(options) + "/copy/query";
|
||||||
|
HttpPost httpPost = new HttpPost(url);
|
||||||
|
try (CloseableHttpResponse resp = httpclient.execute(httpPost)) {
|
||||||
|
if (resp.getStatusLine().getStatusCode() == 200) {
|
||||||
|
Map<String, Object> result = (Map<String, Object>) JSON.parse(EntityUtils.toString(resp.getEntity()));
|
||||||
|
if (result != null && (int) result.get("code") == 401) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static byte[] addRows(Keys options, List<byte[]> rows, int totalBytes) {
|
||||||
|
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||||
|
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
|
||||||
|
byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
||||||
|
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
||||||
|
for (byte[] row : rows) {
|
||||||
|
bos.put(row);
|
||||||
|
bos.put(lineDelimiter);
|
||||||
|
}
|
||||||
|
return bos.array();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
||||||
|
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
||||||
|
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
||||||
|
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
||||||
|
boolean isFirstElement = true;
|
||||||
|
for (byte[] row : rows) {
|
||||||
|
if (!isFirstElement) {
|
||||||
|
bos.put(jsonDelimiter);
|
||||||
|
}
|
||||||
|
bos.put(row);
|
||||||
|
isFirstElement = false;
|
||||||
|
}
|
||||||
|
bos.put("]".getBytes(StandardCharsets.UTF_8));
|
||||||
|
return bos.array();
|
||||||
|
}
|
||||||
|
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isNullOrEmpty(Map<?, ?> map) {
|
||||||
|
return map == null || map.isEmpty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,15 +5,20 @@ import java.util.Map;
|
|||||||
|
|
||||||
public class DorisWriterExcetion extends IOException {
|
public class DorisWriterExcetion extends IOException {
|
||||||
|
|
||||||
private final Map<String, Object> response;
|
private Map<String, Object> response;
|
||||||
private boolean reCreateLabel;
|
private boolean reCreateLabel;
|
||||||
|
|
||||||
public DorisWriterExcetion ( String message, Map<String, Object> response) {
|
public DorisWriterExcetion(String message, Map<String, Object> response) {
|
||||||
super(message);
|
super(message);
|
||||||
this.response = response;
|
this.response = response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DorisWriterExcetion ( String message, Map<String, Object> response, boolean reCreateLabel) {
|
public DorisWriterExcetion(String message, boolean reCreateLabel) {
|
||||||
|
super(message);
|
||||||
|
this.reCreateLabel = reCreateLabel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DorisWriterExcetion(String message, Map<String, Object> response, boolean reCreateLabel) {
|
||||||
super(message);
|
super(message);
|
||||||
this.response = response;
|
this.response = response;
|
||||||
this.reCreateLabel = reCreateLabel;
|
this.reCreateLabel = reCreateLabel;
|
||||||
|
@ -20,7 +20,8 @@ public class DorisWriterManager {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
||||||
|
|
||||||
private final DorisStreamLoadObserver visitor;
|
private final DorisStreamLoadObserver streamLoadObserver;
|
||||||
|
private final DorisCopyIntoObserver copyIntoObserver;
|
||||||
private final Keys options;
|
private final Keys options;
|
||||||
private final List<byte[]> buffer = new ArrayList<> ();
|
private final List<byte[]> buffer = new ArrayList<> ();
|
||||||
private int batchCount = 0;
|
private int batchCount = 0;
|
||||||
@ -33,7 +34,8 @@ public class DorisWriterManager {
|
|||||||
|
|
||||||
public DorisWriterManager( Keys options) {
|
public DorisWriterManager( Keys options) {
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.visitor = new DorisStreamLoadObserver (options);
|
this.streamLoadObserver = new DorisStreamLoadObserver (options);
|
||||||
|
this.copyIntoObserver = new DorisCopyIntoObserver(options);
|
||||||
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
||||||
this.startScheduler();
|
this.startScheduler();
|
||||||
this.startAsyncFlushing();
|
this.startAsyncFlushing();
|
||||||
@ -160,7 +162,11 @@ public class DorisWriterManager {
|
|||||||
for (int i = 0; i <= options.getMaxRetries(); i++) {
|
for (int i = 0; i <= options.getMaxRetries(); i++) {
|
||||||
try {
|
try {
|
||||||
// flush to Doris with stream load
|
// flush to Doris with stream load
|
||||||
visitor.streamLoad(flushData);
|
if (DorisUtil.checkIsStreamLoad(options)) {
|
||||||
|
streamLoadObserver.streamLoad(flushData);
|
||||||
|
} else {
|
||||||
|
copyIntoObserver.streamLoad(flushData);
|
||||||
|
}
|
||||||
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
|
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
|
||||||
startScheduler();
|
startScheduler();
|
||||||
break;
|
break;
|
||||||
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
|
public class HttpPostBuilder {
|
||||||
|
String url;
|
||||||
|
Map<String, String> header;
|
||||||
|
HttpEntity httpEntity;
|
||||||
|
public HttpPostBuilder() {
|
||||||
|
header = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPostBuilder setUrl(String url) {
|
||||||
|
this.url = url;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPostBuilder addCommonHeader() {
|
||||||
|
header.put(HttpHeaders.EXPECT, "100-continue");
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPostBuilder baseAuth(String user, String password) {
|
||||||
|
final String authInfo = user + ":" + password;
|
||||||
|
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
|
||||||
|
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPostBuilder setEntity(HttpEntity httpEntity) {
|
||||||
|
this.httpEntity = httpEntity;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPost build() {
|
||||||
|
DorisUtil.checkNotNull(url);
|
||||||
|
DorisUtil.checkNotNull(httpEntity);
|
||||||
|
HttpPost put = new HttpPost(url);
|
||||||
|
header.forEach(put::setHeader);
|
||||||
|
put.setEntity(httpEntity);
|
||||||
|
return put;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,65 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
|
import org.apache.http.client.methods.HttpPut;
|
||||||
|
import org.apache.http.entity.StringEntity;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class HttpPutBuilder {
|
||||||
|
String url;
|
||||||
|
Map<String, String> header;
|
||||||
|
HttpEntity httpEntity;
|
||||||
|
public HttpPutBuilder() {
|
||||||
|
header = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder setUrl(String url) {
|
||||||
|
this.url = url;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder addFileName(String fileName){
|
||||||
|
header.put("fileName", fileName);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder setEmptyEntity() {
|
||||||
|
try {
|
||||||
|
this.httpEntity = new StringEntity("");
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalArgumentException(e);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder addCommonHeader() {
|
||||||
|
header.put(HttpHeaders.EXPECT, "100-continue");
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder baseAuth(String user, String password) {
|
||||||
|
final String authInfo = user + ":" + password;
|
||||||
|
byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
|
||||||
|
header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPutBuilder setEntity(HttpEntity httpEntity) {
|
||||||
|
this.httpEntity = httpEntity;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpPut build() {
|
||||||
|
DorisUtil.checkNotNull(url);
|
||||||
|
DorisUtil.checkNotNull(httpEntity);
|
||||||
|
HttpPut put = new HttpPut(url);
|
||||||
|
header.forEach(put::setHeader);
|
||||||
|
put.setEntity(httpEntity);
|
||||||
|
return put;
|
||||||
|
}
|
||||||
|
}
|
@ -36,6 +36,8 @@ public class Keys implements Serializable {
|
|||||||
private static final String LOAD_URL = "loadUrl";
|
private static final String LOAD_URL = "loadUrl";
|
||||||
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
||||||
private static final String LOAD_PROPS = "loadProps";
|
private static final String LOAD_PROPS = "loadProps";
|
||||||
|
private static final String CLUSTER_NAME = "clusterName";
|
||||||
|
|
||||||
|
|
||||||
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
|
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
|
||||||
|
|
||||||
@ -89,6 +91,10 @@ public class Keys implements Serializable {
|
|||||||
return options.getList(LOAD_URL, String.class);
|
return options.getList(LOAD_URL, String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getClusterName() {
|
||||||
|
return options.getString(CLUSTER_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
public List<String> getColumns() {
|
public List<String> getColumns() {
|
||||||
if (isWildcardColumn) {
|
if (isWildcardColumn) {
|
||||||
return this.infoSchemaColumns;
|
return this.infoSchemaColumns;
|
||||||
|
Loading…
Reference in New Issue
Block a user