mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:40:28 +08:00
remove doriswriter
This commit is contained in:
parent
835e5deb74
commit
0582da63a5
@ -1,141 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksISerializer;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksSerializerFactory;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.util.StarRocksWriterUtil;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class StarRocksWriter extends Writer {
|
|
||||||
|
|
||||||
public static class Job extends Writer.Job {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
|
||||||
private Configuration originalConfig = null;
|
|
||||||
private StarRocksWriterOptions options;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void init() {
|
|
||||||
this.originalConfig = super.getPluginJobConf();
|
|
||||||
options = new StarRocksWriterOptions(super.getPluginJobConf());
|
|
||||||
options.doPretreatment();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void preCheck(){
|
|
||||||
this.init();
|
|
||||||
StarRocksWriterUtil.preCheckPrePareSQL(options);
|
|
||||||
StarRocksWriterUtil.preCheckPostSQL(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void prepare() {
|
|
||||||
String username = options.getUsername();
|
|
||||||
String password = options.getPassword();
|
|
||||||
String jdbcUrl = options.getJdbcUrl();
|
|
||||||
List<String> renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
|
|
||||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
|
||||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
|
||||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
|
|
||||||
StarRocksWriterUtil.executeSqls(conn, renderedPreSqls);
|
|
||||||
DBUtil.closeDBResources(null, null, conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Configuration> split(int mandatoryNumber) {
|
|
||||||
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
|
|
||||||
for (int i = 0; i < mandatoryNumber; i++) {
|
|
||||||
configurations.add(originalConfig);
|
|
||||||
}
|
|
||||||
return configurations;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void post() {
|
|
||||||
String username = options.getUsername();
|
|
||||||
String password = options.getPassword();
|
|
||||||
String jdbcUrl = options.getJdbcUrl();
|
|
||||||
List<String> renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
|
||||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
|
||||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
|
||||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
|
||||||
StarRocksWriterUtil.executeSqls(conn, renderedPostSqls);
|
|
||||||
DBUtil.closeDBResources(null, null, conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Task extends Writer.Task {
|
|
||||||
private StarRocksWriterManager writerManager;
|
|
||||||
private StarRocksWriterOptions options;
|
|
||||||
private StarRocksISerializer rowSerializer;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void init() {
|
|
||||||
options = new StarRocksWriterOptions(super.getPluginJobConf());
|
|
||||||
writerManager = new StarRocksWriterManager(options);
|
|
||||||
rowSerializer = StarRocksSerializerFactory.createSerializer(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void prepare() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void startWrite(RecordReceiver recordReceiver) {
|
|
||||||
try {
|
|
||||||
Record record;
|
|
||||||
while ((record = recordReceiver.getFromReader()) != null) {
|
|
||||||
if (record.getColumnNumber() != options.getColumns().size()) {
|
|
||||||
throw DataXException
|
|
||||||
.asDataXException(
|
|
||||||
DBUtilErrorCode.CONF_ERROR,
|
|
||||||
String.format(
|
|
||||||
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
|
|
||||||
record.getColumnNumber(),
|
|
||||||
options.getColumns().size()));
|
|
||||||
}
|
|
||||||
writerManager.writeRecord(rowSerializer.serialize(record));
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void post() {
|
|
||||||
try {
|
|
||||||
writerManager.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean supportFailOver(){
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,146 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class StarRocksWriterOptions implements Serializable {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1l;
|
|
||||||
private static final long KILO_BYTES_SCALE = 1024l;
|
|
||||||
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
|
|
||||||
private static final int MAX_RETRIES = 1;
|
|
||||||
private static final int BATCH_ROWS = 500000;
|
|
||||||
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
|
|
||||||
|
|
||||||
private static final String KEY_LOAD_PROPS_FORMAT = "format";
|
|
||||||
public enum StreamLoadFormat {
|
|
||||||
CSV, JSON;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final String KEY_USERNAME = "username";
|
|
||||||
private static final String KEY_PASSWORD = "password";
|
|
||||||
private static final String KEY_DATABASE = "database";
|
|
||||||
private static final String KEY_TABLE = "table";
|
|
||||||
private static final String KEY_COLUMN = "column";
|
|
||||||
private static final String KEY_PRE_SQL = "preSql";
|
|
||||||
private static final String KEY_POST_SQL = "postSql";
|
|
||||||
private static final String KEY_JDBC_URL = "jdbcUrl";
|
|
||||||
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
|
|
||||||
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
|
|
||||||
private static final String KEY_LOAD_URL = "loadUrl";
|
|
||||||
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
|
||||||
private static final String KEY_LOAD_PROPS = "loadProps";
|
|
||||||
|
|
||||||
private final Configuration options;
|
|
||||||
|
|
||||||
public StarRocksWriterOptions(Configuration options) {
|
|
||||||
this.options = options;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doPretreatment() {
|
|
||||||
validateRequired();
|
|
||||||
validateStreamLoadUrl();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getJdbcUrl() {
|
|
||||||
return options.getString(KEY_JDBC_URL);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDatabase() {
|
|
||||||
return options.getString(KEY_DATABASE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTable() {
|
|
||||||
return options.getString(KEY_TABLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUsername() {
|
|
||||||
return options.getString(KEY_USERNAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPassword() {
|
|
||||||
return options.getString(KEY_PASSWORD);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getLoadUrlList() {
|
|
||||||
return options.getList(KEY_LOAD_URL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getColumns() {
|
|
||||||
return options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getPreSqlList() {
|
|
||||||
return options.getList(KEY_PRE_SQL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getPostSqlList() {
|
|
||||||
return options.getList(KEY_POST_SQL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, Object> getLoadProps() {
|
|
||||||
return options.getMap(KEY_LOAD_PROPS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMaxRetries() {
|
|
||||||
return MAX_RETRIES;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getBatchRows() {
|
|
||||||
Integer rows = options.getInt(KEY_MAX_BATCH_ROWS);
|
|
||||||
return null == rows ? BATCH_ROWS : rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getBatchSize() {
|
|
||||||
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
|
|
||||||
return null == size ? BATCH_BYTES : size;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getFlushQueueLength() {
|
|
||||||
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
|
|
||||||
return null == len ? 1 : len;
|
|
||||||
}
|
|
||||||
|
|
||||||
public StreamLoadFormat getStreamLoadFormat() {
|
|
||||||
Map<String, Object> loadProps = getLoadProps();
|
|
||||||
if (null == loadProps) {
|
|
||||||
return StreamLoadFormat.CSV;
|
|
||||||
}
|
|
||||||
if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
|
|
||||||
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
|
|
||||||
return StreamLoadFormat.JSON;
|
|
||||||
}
|
|
||||||
return StreamLoadFormat.CSV;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateStreamLoadUrl() {
|
|
||||||
List<String> urlList = getLoadUrlList();
|
|
||||||
for (String host : urlList) {
|
|
||||||
if (host.split(":").length < 2) {
|
|
||||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
|
||||||
"loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateRequired() {
|
|
||||||
final String[] requiredOptionKeys = new String[]{
|
|
||||||
KEY_USERNAME,
|
|
||||||
KEY_PASSWORD,
|
|
||||||
KEY_DATABASE,
|
|
||||||
KEY_TABLE,
|
|
||||||
KEY_COLUMN,
|
|
||||||
KEY_LOAD_URL
|
|
||||||
};
|
|
||||||
for (String optionKey : requiredOptionKeys) {
|
|
||||||
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class StarRocksFlushTuple {
|
|
||||||
|
|
||||||
private String label;
|
|
||||||
private Long bytes;
|
|
||||||
private List<String> rows;
|
|
||||||
|
|
||||||
public StarRocksFlushTuple(String label, Long bytes, List<String> rows) {
|
|
||||||
this.label = label;
|
|
||||||
this.bytes = bytes;
|
|
||||||
this.rows = rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLabel() { return label; }
|
|
||||||
public Long getBytes() { return bytes; }
|
|
||||||
public List<String> getRows() { return rows; }
|
|
||||||
}
|
|
@ -1,175 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksDelimiterParser;
|
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
|
||||||
import org.apache.http.HttpEntity;
|
|
||||||
import org.apache.http.client.config.RequestConfig;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
||||||
import org.apache.http.client.methods.HttpPut;
|
|
||||||
import org.apache.http.entity.ByteArrayEntity;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.DefaultRedirectStrategy;
|
|
||||||
import org.apache.http.impl.client.HttpClientBuilder;
|
|
||||||
import org.apache.http.impl.client.HttpClients;
|
|
||||||
import org.apache.http.util.EntityUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
public class StarRocksStreamLoadVisitor {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
|
|
||||||
|
|
||||||
private final StarRocksWriterOptions writerOptions;
|
|
||||||
private int pos;
|
|
||||||
|
|
||||||
public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) {
|
|
||||||
this.writerOptions = writerOptions;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
|
|
||||||
String host = getAvailableHost();
|
|
||||||
if (null == host) {
|
|
||||||
throw new IOException("None of the host in `load_url` could be connected.");
|
|
||||||
}
|
|
||||||
String loadUrl = new StringBuilder(host)
|
|
||||||
.append("/api/")
|
|
||||||
.append(writerOptions.getDatabase())
|
|
||||||
.append("/")
|
|
||||||
.append(writerOptions.getTable())
|
|
||||||
.append("/_stream_load")
|
|
||||||
.toString();
|
|
||||||
LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
|
|
||||||
Map<String, Object> loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue()));
|
|
||||||
final String keyStatus = "Status";
|
|
||||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
|
||||||
throw new IOException("Unable to flush data to StarRocks: unknown result status.");
|
|
||||||
}
|
|
||||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
|
||||||
if (loadResult.get(keyStatus).equals("Fail")) {
|
|
||||||
throw new IOException(
|
|
||||||
new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getAvailableHost() {
|
|
||||||
List<String> hostList = writerOptions.getLoadUrlList();
|
|
||||||
if (pos >= hostList.size()) {
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
for (; pos < hostList.size(); pos++) {
|
|
||||||
String host = new StringBuilder("http://").append(hostList.get(pos)).toString();
|
|
||||||
if (tryHttpConnection(host)) {
|
|
||||||
return host;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean tryHttpConnection(String host) {
|
|
||||||
try {
|
|
||||||
URL url = new URL(host);
|
|
||||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
|
||||||
co.setConnectTimeout(1000);
|
|
||||||
co.connect();
|
|
||||||
co.disconnect();
|
|
||||||
return true;
|
|
||||||
} catch (Exception e1) {
|
|
||||||
LOG.warn("Failed to connect to address:{}", host, e1);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] joinRows(List<String> rows, int totalBytes) {
|
|
||||||
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
Map<String, Object> props = writerOptions.getLoadProps();
|
|
||||||
byte[] lineDelimiter = StarRocksDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n").getBytes(StandardCharsets.UTF_8);
|
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
|
||||||
for (String row : rows) {
|
|
||||||
bos.put(row.getBytes(StandardCharsets.UTF_8));
|
|
||||||
bos.put(lineDelimiter);
|
|
||||||
}
|
|
||||||
return bos.array();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
|
||||||
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
|
||||||
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
|
||||||
boolean isFirstElement = true;
|
|
||||||
for (String row : rows) {
|
|
||||||
if (!isFirstElement) {
|
|
||||||
bos.put(jsonDelimiter);
|
|
||||||
}
|
|
||||||
bos.put(row.getBytes(StandardCharsets.UTF_8));
|
|
||||||
isFirstElement = false;
|
|
||||||
}
|
|
||||||
bos.put("]".getBytes(StandardCharsets.UTF_8));
|
|
||||||
return bos.array();
|
|
||||||
}
|
|
||||||
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
|
|
||||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
|
||||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
|
||||||
.setRedirectStrategy(new DefaultRedirectStrategy() {
|
|
||||||
@Override
|
|
||||||
protected boolean isRedirectable(String method) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
|
||||||
HttpPut httpPut = new HttpPut(loadUrl);
|
|
||||||
List<String> cols = writerOptions.getColumns();
|
|
||||||
if (null != cols && !cols.isEmpty()) {
|
|
||||||
httpPut.setHeader("columns", String.join(",", cols));
|
|
||||||
}
|
|
||||||
if (null != writerOptions.getLoadProps()) {
|
|
||||||
for (Map.Entry<String, Object> entry : writerOptions.getLoadProps().entrySet()) {
|
|
||||||
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
httpPut.setHeader("Expect", "100-continue");
|
|
||||||
httpPut.setHeader("label", label);
|
|
||||||
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
|
||||||
httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
|
|
||||||
httpPut.setEntity(new ByteArrayEntity(data));
|
|
||||||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
|
||||||
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
|
||||||
int code = resp.getStatusLine().getStatusCode();
|
|
||||||
if (200 != code) {
|
|
||||||
LOG.warn("Request failed with code:{}", code);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
HttpEntity respEntity = resp.getEntity();
|
|
||||||
if (null == respEntity) {
|
|
||||||
LOG.warn("Request failed with empty response.");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getBasicAuthHeader(String username, String password) {
|
|
||||||
String auth = username + ":" + password;
|
|
||||||
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes());
|
|
||||||
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,144 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
|
||||||
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
|
|
||||||
public class StarRocksWriterManager {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterManager.class);
|
|
||||||
|
|
||||||
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
|
|
||||||
private final StarRocksWriterOptions writerOptions;
|
|
||||||
|
|
||||||
private final List<String> buffer = new ArrayList<>();
|
|
||||||
private int batchCount = 0;
|
|
||||||
private long batchSize = 0;
|
|
||||||
private volatile boolean closed = false;
|
|
||||||
private volatile Exception flushException;
|
|
||||||
private final LinkedBlockingDeque<StarRocksFlushTuple> flushQueue;
|
|
||||||
|
|
||||||
public StarRocksWriterManager(StarRocksWriterOptions writerOptions) {
|
|
||||||
this.writerOptions = writerOptions;
|
|
||||||
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions);
|
|
||||||
flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
|
|
||||||
this.startAsyncFlushing();
|
|
||||||
}
|
|
||||||
|
|
||||||
public final synchronized void writeRecord(String record) throws IOException {
|
|
||||||
checkFlushException();
|
|
||||||
try {
|
|
||||||
buffer.add(record);
|
|
||||||
batchCount++;
|
|
||||||
batchSize += record.getBytes().length;
|
|
||||||
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
|
|
||||||
String label = createBatchLabel();
|
|
||||||
LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
|
|
||||||
flush(label, false);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException("Writing records to StarRocks failed.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
|
|
||||||
checkFlushException();
|
|
||||||
if (batchCount == 0) {
|
|
||||||
if (waitUtilDone) {
|
|
||||||
waitAsyncFlushingDone();
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
flushQueue.put(new StarRocksFlushTuple(label, batchSize, new ArrayList<>(buffer)));
|
|
||||||
if (waitUtilDone) {
|
|
||||||
// wait the last flush
|
|
||||||
waitAsyncFlushingDone();
|
|
||||||
}
|
|
||||||
buffer.clear();
|
|
||||||
batchCount = 0;
|
|
||||||
batchSize = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void close() {
|
|
||||||
if (!closed) {
|
|
||||||
closed = true;
|
|
||||||
try {
|
|
||||||
String label = createBatchLabel();
|
|
||||||
if (batchCount > 0) LOG.debug(String.format("StarRocks Sink is about to close: label[%s].", label));
|
|
||||||
flush(label, true);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException("Writing records to StarRocks failed.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
checkFlushException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String createBatchLabel() {
|
|
||||||
return UUID.randomUUID().toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startAsyncFlushing() {
|
|
||||||
// start flush thread
|
|
||||||
Thread flushThread = new Thread(new Runnable(){
|
|
||||||
public void run() {
|
|
||||||
while(true) {
|
|
||||||
try {
|
|
||||||
asyncFlush();
|
|
||||||
} catch (Exception e) {
|
|
||||||
flushException = e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
flushThread.setDaemon(true);
|
|
||||||
flushThread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void waitAsyncFlushingDone() throws InterruptedException {
|
|
||||||
// wait previous flushings
|
|
||||||
for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
|
|
||||||
flushQueue.put(new StarRocksFlushTuple("", 0l, null));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void asyncFlush() throws Exception {
|
|
||||||
StarRocksFlushTuple flushData = flushQueue.take();
|
|
||||||
if (Strings.isNullOrEmpty(flushData.getLabel())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
|
|
||||||
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
|
|
||||||
try {
|
|
||||||
// flush to StarRocks with stream load
|
|
||||||
starrocksStreamLoadVisitor.doStreamLoad(flushData);
|
|
||||||
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
|
|
||||||
break;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e);
|
|
||||||
if (i >= writerOptions.getMaxRetries()) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000l * (i + 1));
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkFlushException() {
|
|
||||||
if (flushException != null) {
|
|
||||||
throw new RuntimeException("Writing records to StarRocks failed.", flushException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,18 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
|
||||||
import com.alibaba.datax.common.element.Column.Type;
|
|
||||||
|
|
||||||
public class StarRocksBaseSerializer {
|
|
||||||
|
|
||||||
protected String fieldConvertion(Column col) {
|
|
||||||
if (null == col.getRawData()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (Type.BOOL == col.getType()) {
|
|
||||||
return String.valueOf(col.asLong());
|
|
||||||
}
|
|
||||||
return col.asString();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,32 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import java.io.StringWriter;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
|
|
||||||
public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements StarRocksISerializer {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
private final String columnSeparator;
|
|
||||||
|
|
||||||
public StarRocksCsvSerializer(String sp) {
|
|
||||||
this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String serialize(Record row) {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (int i = 0; i < row.getColumnNumber(); i++) {
|
|
||||||
String value = fieldConvertion(row.getColumn(i));
|
|
||||||
sb.append(null == value ? "\\N" : value);
|
|
||||||
if (i < row.getColumnNumber() - 1) {
|
|
||||||
sb.append(columnSeparator);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,55 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import java.io.StringWriter;
|
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
|
|
||||||
public class StarRocksDelimiterParser {
|
|
||||||
|
|
||||||
private static final String HEX_STRING = "0123456789ABCDEF";
|
|
||||||
|
|
||||||
public static String parse(String sp, String dSp) throws RuntimeException {
|
|
||||||
if (Strings.isNullOrEmpty(sp)) {
|
|
||||||
return dSp;
|
|
||||||
}
|
|
||||||
if (!sp.toUpperCase().startsWith("\\X")) {
|
|
||||||
return sp;
|
|
||||||
}
|
|
||||||
String hexStr = sp.substring(2);
|
|
||||||
// check hex str
|
|
||||||
if (hexStr.isEmpty()) {
|
|
||||||
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
|
|
||||||
}
|
|
||||||
if (hexStr.length() % 2 != 0) {
|
|
||||||
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
|
|
||||||
}
|
|
||||||
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
|
|
||||||
if (HEX_STRING.indexOf(hexChar) == -1) {
|
|
||||||
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// transform to separator
|
|
||||||
StringWriter writer = new StringWriter();
|
|
||||||
for (byte b : hexStrToBytes(hexStr)) {
|
|
||||||
writer.append((char) b);
|
|
||||||
}
|
|
||||||
return writer.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] hexStrToBytes(String hexStr) {
|
|
||||||
String upperHexStr = hexStr.toUpperCase();
|
|
||||||
int length = upperHexStr.length() / 2;
|
|
||||||
char[] hexChars = upperHexStr.toCharArray();
|
|
||||||
byte[] bytes = new byte[length];
|
|
||||||
for (int i = 0; i < length; i++) {
|
|
||||||
int pos = i * 2;
|
|
||||||
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
|
|
||||||
}
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte charToByte(char c) {
|
|
||||||
return (byte) HEX_STRING.indexOf(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,11 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
|
|
||||||
public interface StarRocksISerializer extends Serializable {
|
|
||||||
|
|
||||||
String serialize(Record row);
|
|
||||||
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
|
|
||||||
public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements StarRocksISerializer {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
private final List<String> fieldNames;
|
|
||||||
|
|
||||||
public StarRocksJsonSerializer(List<String> fieldNames) {
|
|
||||||
this.fieldNames = fieldNames;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String serialize(Record row) {
|
|
||||||
if (null == fieldNames) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
Map<String, Object> rowMap = new HashMap<>(fieldNames.size());
|
|
||||||
int idx = 0;
|
|
||||||
for (String fieldName : fieldNames) {
|
|
||||||
rowMap.put(fieldName, fieldConvertion(row.getColumn(idx)));
|
|
||||||
idx++;
|
|
||||||
}
|
|
||||||
return JSON.toJSONString(rowMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,22 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
|
|
||||||
|
|
||||||
public class StarRocksSerializerFactory {
|
|
||||||
|
|
||||||
private StarRocksSerializerFactory() {}
|
|
||||||
|
|
||||||
public static StarRocksISerializer createSerializer(StarRocksWriterOptions writerOptions) {
|
|
||||||
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
Map<String, Object> props = writerOptions.getLoadProps();
|
|
||||||
return new StarRocksCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
|
|
||||||
}
|
|
||||||
if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
return new StarRocksJsonSerializer(writerOptions.getColumns());
|
|
||||||
}
|
|
||||||
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,83 +0,0 @@
|
|||||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.util;
|
|
||||||
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
|
||||||
import com.alibaba.druid.sql.parser.ParserException;
|
|
||||||
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
|
||||||
import java.sql.Statement;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
public final class StarRocksWriterUtil {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterUtil.class);
|
|
||||||
|
|
||||||
private StarRocksWriterUtil() {}
|
|
||||||
|
|
||||||
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
|
|
||||||
if (null == preOrPostSqls) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
List<String> renderedSqls = new ArrayList<>();
|
|
||||||
for (String sql : preOrPostSqls) {
|
|
||||||
if (!Strings.isNullOrEmpty(sql)) {
|
|
||||||
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return renderedSqls;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void executeSqls(Connection conn, List<String> sqls) {
|
|
||||||
Statement stmt = null;
|
|
||||||
String currentSql = null;
|
|
||||||
try {
|
|
||||||
stmt = conn.createStatement();
|
|
||||||
for (String sql : sqls) {
|
|
||||||
currentSql = sql;
|
|
||||||
DBUtil.executeSqlWithoutResultSet(stmt, sql);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
|
||||||
} finally {
|
|
||||||
DBUtil.closeDBResources(null, stmt, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void preCheckPrePareSQL(StarRocksWriterOptions options) {
|
|
||||||
String table = options.getTable();
|
|
||||||
List<String> preSqls = options.getPreSqlList();
|
|
||||||
List<String> renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(preSqls, table);
|
|
||||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
|
||||||
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
|
|
||||||
for (String sql : renderedPreSqls) {
|
|
||||||
try {
|
|
||||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
|
||||||
} catch (ParserException e) {
|
|
||||||
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void preCheckPostSQL(StarRocksWriterOptions options) {
|
|
||||||
String table = options.getTable();
|
|
||||||
List<String> postSqls = options.getPostSqlList();
|
|
||||||
List<String> renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(postSqls, table);
|
|
||||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
|
||||||
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
|
|
||||||
for(String sql : renderedPostSqls) {
|
|
||||||
try {
|
|
||||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
|
||||||
} catch (ParserException e){
|
|
||||||
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user