Merge pull request #1490 from dingxiaobo/starrocks_plugin

Merge StarRocks plugin
This commit is contained in:
Trafalgar 2022-08-26 14:18:30 +08:00 committed by GitHub
commit ced5a454b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1538 additions and 2 deletions

View File

@ -231,6 +231,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>starrockswriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>drdswriter/target/datax/</directory>
<includes>

View File

@ -80,6 +80,7 @@
<!-- writer -->
<module>mysqlwriter</module>
<module>starrockswriter</module>
<module>drdswriter</module>
<module>oraclewriter</module>
<module>sqlserverwriter</module>
@ -87,8 +88,6 @@
<module>kingbaseeswriter</module>
<module>adswriter</module>
<module>oceanbasev10writer</module>
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<module>adbpgwriter</module>
<module>hologresjdbcwriter</module>
<module>rdbmswriter</module>
@ -114,6 +113,8 @@
<module>tsdbwriter</module>
<module>gdbwriter</module>
<module>oscarwriter</module>
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>

View File

@ -0,0 +1,218 @@
# DataX StarRocksWriter
---
## 1 快速介绍
StarRocksWriter 插件实现了写入数据到 StarRocks 主库的目的表的功能。在底层实现上, StarRocksWriter 通过Streamload以csv格式导入数据至StarRocks。
## 2 实现原理
StarRocksWriter 通过Streamload以csv格式导入数据至StarRocks 内部将`reader`读取的数据进行缓存后批量导入至StarRocks以提高写入性能。
## 3 功能说明
### 3.1 配置样例
* 这里使用一份从内存Mysql读取数据后导入至StarRocks。
```json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column": [ "k1", "k2", "v1", "v2" ],
"connection": [
{
"table": [ "table1", "table2" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test1"
]
},
{
"table": [ "table3", "table4" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test2"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"database": "xxxx",
"table": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
"loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"],
"loadProps": {}
}
}
}
]
}
}
```
### 3.2 参数说明
* **username**
* 描述StarRocks数据库的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述StarRocks数据库的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **database**
* 描述StarRocks表的数据库名称。
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述StarRocks表的表名称。
* 必选:是 <br />
* 默认值:无 <br />
* **loadUrl**
* 描述StarRocks FE的地址用于Streamload可以为多个fe地址`fe_ip:fe_http_port`。
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
**column配置项必须指定不能留空**
注意:我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
* 必选:是 <br />
* 默认值:否 <br />
* **preSql**
* 描述:写入数据到目的表前,会先执行这里的标准语句。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **postSql**
* 描述:写入数据到目的表后,会执行这里的标准语句。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **jdbcUrl**
* 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **maxBatchRows**
* 描述单次StreamLoad导入的最大行数 <br />
* 必选:否 <br />
* 默认值500000 (50W) <br />
* **maxBatchSize**
* 描述单次StreamLoad导入的最大字节数。 <br />
* 必选:否 <br />
* 默认值104857600 (100M)
* **flushInterval**
* 描述上一次StreamLoad结束至下一次开始的时间间隔单位ms<br />
* 必选:否 <br />
* 默认值300000 (ms)
* **loadProps**
* 描述StreamLoad 的请求参数详情参照StreamLoad介绍页面。 <br />
* 必选:否 <br />
* 默认值:无 <br />
### 3.3 类型转换
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
如需更改列分隔符, 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
```
如需更改导入格式为`json` 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"format": "json",
"strip_outer_array": true
}
```
## 4 性能报告
## 5 约束限制
## FAQ

162
starrockswriter/pom.xml Executable file
View File

@ -0,0 +1,162 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>starrockswriter</artifactId>
<name>starrockswriter</name>
<version>release</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.alibaba.fastjson</pattern>
<shadedPattern>com.starrocks.shade.com.alibaba.fastjson</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>com.starrocks.shade.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>com.starrocks.shade.org.apache.commons</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
<include>org.apache.commons:commons-lang3</include>
<include>commons-codec:commons-codec</include>
<include>commons-logging:*</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:httpcore</include>
<include>com.alibaba:fastjson</include>
</includes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/starrockswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>starrockswriter-release.jar</include>
</includes>
<outputDirectory>plugin/writer/starrockswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/starrockswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,146 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksISerializer;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksSerializerFactory;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.util.StarRocksWriterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
public class StarRocksWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private StarRocksWriterOptions options;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
options = new StarRocksWriterOptions(super.getPluginJobConf());
options.doPretreatment();
}
@Override
public void preCheck(){
this.init();
StarRocksWriterUtil.preCheckPrePareSQL(options);
StarRocksWriterUtil.preCheckPostSQL(options);
}
@Override
public void prepare() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
StarRocksWriterUtil.executeSqls(conn, renderedPreSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(originalConfig);
}
return configurations;
}
@Override
public void post() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
StarRocksWriterUtil.executeSqls(conn, renderedPostSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private StarRocksWriterManager writerManager;
private StarRocksWriterOptions options;
private StarRocksISerializer rowSerializer;
@Override
public void init() {
options = new StarRocksWriterOptions(super.getPluginJobConf());
if (options.isWildcardColumn()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
List<String> columns = StarRocksWriterUtil.getStarRocksColumns(conn, options.getDatabase(), options.getTable());
options.setInfoCchemaColumns(columns);
}
writerManager = new StarRocksWriterManager(options);
rowSerializer = StarRocksSerializerFactory.createSerializer(options);
}
@Override
public void prepare() {
}
public void startWrite(RecordReceiver recordReceiver) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != options.getColumns().size()) {
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
options.getColumns().size()));
}
writerManager.writeRecord(rowSerializer.serialize(record));
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void post() {
try {
writerManager.close();
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void destroy() {}
@Override
public boolean supportFailOver(){
return false;
}
}
}

View File

@ -0,0 +1,175 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter;
import java.io.Serializable;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class StarRocksWriterOptions implements Serializable {
private static final long serialVersionUID = 1l;
private static final long KILO_BYTES_SCALE = 1024l;
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
private static final int MAX_RETRIES = 3;
private static final int BATCH_ROWS = 500000;
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
private static final long FLUSH_INTERVAL = 300000;
private static final String KEY_LOAD_PROPS_FORMAT = "format";
public enum StreamLoadFormat {
CSV, JSON;
}
private static final String KEY_USERNAME = "username";
private static final String KEY_PASSWORD = "password";
private static final String KEY_DATABASE = "database";
private static final String KEY_TABLE = "table";
private static final String KEY_COLUMN = "column";
private static final String KEY_PRE_SQL = "preSql";
private static final String KEY_POST_SQL = "postSql";
private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_LABEL_PREFIX = "labelPrefix";
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
private static final String KEY_FLUSH_INTERVAL = "flushInterval";
private static final String KEY_LOAD_URL = "loadUrl";
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String KEY_LOAD_PROPS = "loadProps";
private final Configuration options;
private List<String> infoCchemaColumns;
private List<String> userSetColumns;
private boolean isWildcardColumn;
public StarRocksWriterOptions(Configuration options) {
this.options = options;
this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) {
this.isWildcardColumn = true;
}
}
public void doPretreatment() {
validateRequired();
validateStreamLoadUrl();
}
public String getJdbcUrl() {
return options.getString(KEY_JDBC_URL);
}
public String getDatabase() {
return options.getString(KEY_DATABASE);
}
public String getTable() {
return options.getString(KEY_TABLE);
}
public String getUsername() {
return options.getString(KEY_USERNAME);
}
public String getPassword() {
return options.getString(KEY_PASSWORD);
}
public String getLabelPrefix() {
return options.getString(KEY_LABEL_PREFIX);
}
public List<String> getLoadUrlList() {
return options.getList(KEY_LOAD_URL, String.class);
}
public List<String> getColumns() {
if (isWildcardColumn) {
return this.infoCchemaColumns;
}
return this.userSetColumns;
}
public boolean isWildcardColumn() {
return this.isWildcardColumn;
}
public void setInfoCchemaColumns(List<String> cols) {
this.infoCchemaColumns = cols;
}
public List<String> getPreSqlList() {
return options.getList(KEY_PRE_SQL, String.class);
}
public List<String> getPostSqlList() {
return options.getList(KEY_POST_SQL, String.class);
}
public Map<String, Object> getLoadProps() {
return options.getMap(KEY_LOAD_PROPS);
}
public int getMaxRetries() {
return MAX_RETRIES;
}
public int getBatchRows() {
Integer rows = options.getInt(KEY_MAX_BATCH_ROWS);
return null == rows ? BATCH_ROWS : rows;
}
public long getBatchSize() {
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
return null == size ? BATCH_BYTES : size;
}
public long getFlushInterval() {
Long interval = options.getLong(KEY_FLUSH_INTERVAL);
return null == interval ? FLUSH_INTERVAL : interval;
}
public int getFlushQueueLength() {
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
return null == len ? 1 : len;
}
public StreamLoadFormat getStreamLoadFormat() {
Map<String, Object> loadProps = getLoadProps();
if (null == loadProps) {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
}
private void validateStreamLoadUrl() {
List<String> urlList = getLoadUrlList();
for (String host : urlList) {
if (host.split(":").length < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"loadUrl的格式不正确请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
}
}
}
private void validateRequired() {
final String[] requiredOptionKeys = new String[]{
KEY_USERNAME,
KEY_DATABASE,
KEY_TABLE,
KEY_COLUMN,
KEY_LOAD_URL
};
for (String optionKey : requiredOptionKeys) {
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
}
}
}

View File

@ -0,0 +1,21 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
import java.util.List;
public class StarRocksFlushTuple {
private String label;
private Long bytes;
private List<byte[]> rows;
public StarRocksFlushTuple(String label, Long bytes, List<byte[]> rows) {
this.label = label;
this.bytes = bytes;
this.rows = rows;
}
public String getLabel() { return label; }
public void setLabel(String label) { this.label = label; }
public Long getBytes() { return bytes; }
public List<byte[]> getRows() { return rows; }
}

View File

@ -0,0 +1,33 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
import java.io.IOException;
import java.util.Map;
public class StarRocksStreamLoadFailedException extends IOException {
static final long serialVersionUID = 1L;
private final Map<String, Object> response;
private boolean reCreateLabel;
public StarRocksStreamLoadFailedException(String message, Map<String, Object> response) {
super(message);
this.response = response;
}
public StarRocksStreamLoadFailedException(String message, Map<String, Object> response, boolean reCreateLabel) {
super(message);
this.response = response;
this.reCreateLabel = reCreateLabel;
}
public Map<String, Object> getFailedResponse() {
return response;
}
public boolean needReCreateLabel() {
return reCreateLabel;
}
}

View File

@ -0,0 +1,241 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksDelimiterParser;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class StarRocksStreamLoadVisitor {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
private final StarRocksWriterOptions writerOptions;
private long pos;
private static final String RESULT_FAILED = "Fail";
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
private static final String RESULT_LABEL_PREPARE = "PREPARE";
private static final String RESULT_LABEL_ABORTED = "ABORTED";
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) {
this.writerOptions = writerOptions;
}
public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
String host = getAvailableHost();
if (null == host) {
throw new IOException("None of the host in `load_url` could be connected.");
}
String loadUrl = new StringBuilder(host)
.append("/api/")
.append(writerOptions.getDatabase())
.append("/")
.append(writerOptions.getTable())
.append("/_stream_load")
.toString();
LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
Map<String, Object> loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue()));
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to StarRocks: unknown result status.");
}
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
throw new IOException(
new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString()
);
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
// has to block-checking the state to get the final result
checkLabelState(host, flushData.getLabel());
}
}
private String getAvailableHost() {
List<String> hostList = writerOptions.getLoadUrlList();
long tmp = pos + hostList.size();
for (; pos < tmp; pos++) {
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
if (tryHttpConnection(host)) {
return host;
}
}
return null;
}
private boolean tryHttpConnection(String host) {
try {
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(1000);
co.connect();
co.disconnect();
return true;
} catch (Exception e1) {
LOG.warn("Failed to connect to address:{}", host, e1);
return false;
}
}
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = (writerOptions.getLoadProps() == null ? new HashMap<>() : writerOptions.getLoadProps());
byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (byte[] row : rows) {
bos.put(row);
bos.put(lineDelimiter);
}
return bos.array();
}
if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true;
for (byte[] row : rows) {
if (!isFirstElement) {
bos.put(jsonDelimiter);
}
bos.put(row);
isFirstElement = false;
}
bos.put("]".getBytes(StandardCharsets.UTF_8));
return bos.array();
}
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
}
@SuppressWarnings("unchecked")
private void checkLabelState(String host, String label) throws IOException {
int idx = 0;
while(true) {
try {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
} catch (InterruptedException ex) {
break;
}
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(writerOptions.getDatabase()).append("/get_load_state?label=").append(label).toString());
httpGet.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
httpGet.setHeader("Connection", "close");
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null) {
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s].\n", label), null);
}
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
String labelState = (String)result.get("state");
if (null == labelState) {
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
switch(labelState) {
case LAEBL_STATE_VISIBLE:
case LAEBL_STATE_COMMITTED:
return;
case RESULT_LABEL_PREPARE:
continue;
case RESULT_LABEL_ABORTED:
throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " +
"label[%s] state[%s]\n", label, labelState), null, true);
case RESULT_LABEL_UNKNOWN:
default:
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"label[%s] state[%s]\n", label, labelState), null);
}
}
}
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(loadUrl);
List<String> cols = writerOptions.getColumns();
if (null != cols && !cols.isEmpty() && StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
}
if (null != writerOptions.getLoadProps()) {
for (Map.Entry<String, Object> entry : writerOptions.getLoadProps().entrySet()) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label);
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null)
return null;
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
}
}
}
private String getBasicAuthHeader(String username, String password) {
String auth = username + ":" + password;
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
}
private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (200 != code) {
LOG.warn("Request failed with code:{}", code);
return null;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
LOG.warn("Request failed with empty response.");
return null;
}
return respEntity;
}
}

View File

@ -0,0 +1,195 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Strings;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
public class StarRocksWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterManager.class);
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private final StarRocksWriterOptions writerOptions;
private final List<byte[]> buffer = new ArrayList<>();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
private volatile Exception flushException;
private final LinkedBlockingDeque<StarRocksFlushTuple> flushQueue;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
public StarRocksWriterManager(StarRocksWriterOptions writerOptions) {
this.writerOptions = writerOptions;
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions);
flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
}
public void startScheduler() {
stopScheduler();
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("starrocks-interval-flush").daemon(true).build());
this.scheduledFuture = this.scheduler.schedule(() -> {
synchronized (StarRocksWriterManager.this) {
if (!closed) {
try {
String label = createBatchLabel();
LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", label));
if (batchCount == 0) {
startScheduler();
}
flush(label, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, writerOptions.getFlushInterval(), TimeUnit.MILLISECONDS);
}
public void stopScheduler() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += bts.length;
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
flush(label, false);
}
} catch (Exception e) {
throw new IOException("Writing records to StarRocks failed.", e);
}
}
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
checkFlushException();
if (batchCount == 0) {
if (waitUtilDone) {
waitAsyncFlushingDone();
}
return;
}
flushQueue.put(new StarRocksFlushTuple(label, batchSize, new ArrayList<>(buffer)));
if (waitUtilDone) {
// wait the last flush
waitAsyncFlushingDone();
}
buffer.clear();
batchCount = 0;
batchSize = 0;
}
public synchronized void close() {
if (!closed) {
closed = true;
try {
String label = createBatchLabel();
if (batchCount > 0) LOG.debug(String.format("StarRocks Sink is about to close: label[%s].", label));
flush(label, true);
} catch (Exception e) {
throw new RuntimeException("Writing records to StarRocks failed.", e);
}
}
checkFlushException();
}
public String createBatchLabel() {
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(writerOptions.getLabelPrefix())) {
sb.append(writerOptions.getLabelPrefix());
}
return sb.append(UUID.randomUUID().toString())
.toString();
}
private void startAsyncFlushing() {
// start flush thread
Thread flushThread = new Thread(new Runnable(){
public void run() {
while(true) {
try {
asyncFlush();
} catch (Exception e) {
flushException = e;
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
}
private void waitAsyncFlushingDone() throws InterruptedException {
// wait previous flushings
for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
flushQueue.put(new StarRocksFlushTuple("", 0l, null));
}
checkFlushException();
}
private void asyncFlush() throws Exception {
StarRocksFlushTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
stopScheduler();
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
try {
// flush to StarRocks with stream load
starrocksStreamLoadVisitor.doStreamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler();
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e);
if (i >= writerOptions.getMaxRetries()) {
throw new IOException(e);
}
if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException)e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
}
try {
Thread.sleep(1000l * Math.min(i + 1, 10));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
}
}
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to StarRocks failed.", flushException);
}
}
}

View File

@ -0,0 +1,26 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Column.Type;
public class StarRocksBaseSerializer {
protected String fieldConvertion(Column col) {
if (null == col.getRawData() || Type.NULL == col.getType()) {
return null;
}
if (Type.BOOL == col.getType()) {
return String.valueOf(col.asLong());
}
if (Type.BYTES == col.getType()) {
byte[] bts = (byte[])col.getRawData();
long value = 0;
for (int i = 0; i < bts.length; i++) {
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
}
return String.valueOf(value);
}
return col.asString();
}
}

View File

@ -0,0 +1,32 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import java.io.StringWriter;
import com.alibaba.datax.common.element.Record;
import com.google.common.base.Strings;
public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements StarRocksISerializer {
private static final long serialVersionUID = 1L;
private final String columnSeparator;
public StarRocksCsvSerializer(String sp) {
this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t");
}
@Override
public String serialize(Record row) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < row.getColumnNumber(); i++) {
String value = fieldConvertion(row.getColumn(i));
sb.append(null == value ? "\\N" : value);
if (i < row.getColumnNumber() - 1) {
sb.append(columnSeparator);
}
}
return sb.toString();
}
}

View File

@ -0,0 +1,55 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import java.io.StringWriter;
import com.google.common.base.Strings;
public class StarRocksDelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp, String dSp) throws RuntimeException {
if (Strings.isNullOrEmpty(sp)) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private static byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private static byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}

View File

@ -0,0 +1,11 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import java.io.Serializable;
import com.alibaba.datax.common.element.Record;
public interface StarRocksISerializer extends Serializable {
String serialize(Record row);
}

View File

@ -0,0 +1,34 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements StarRocksISerializer {
private static final long serialVersionUID = 1L;
private final List<String> fieldNames;
public StarRocksJsonSerializer(List<String> fieldNames) {
this.fieldNames = fieldNames;
}
@Override
public String serialize(Record row) {
if (null == fieldNames) {
return "";
}
Map<String, Object> rowMap = new HashMap<>(fieldNames.size());
int idx = 0;
for (String fieldName : fieldNames) {
rowMap.put(fieldName, fieldConvertion(row.getColumn(idx)));
idx++;
}
return JSON.toJSONString(rowMap);
}
}

View File

@ -0,0 +1,22 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.row;
import java.util.Map;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
public class StarRocksSerializerFactory {
private StarRocksSerializerFactory() {}
public static StarRocksISerializer createSerializer(StarRocksWriterOptions writerOptions) {
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
return new StarRocksCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
}
if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new StarRocksJsonSerializer(writerOptions.getColumns());
}
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}

View File

@ -0,0 +1,102 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.util;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;
public final class StarRocksWriterUtil {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterUtil.class);
private StarRocksWriterUtil() {}
public static List<String> getStarRocksColumns(Connection conn, String databaseName, String tableName) {
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
List<String> columns = new ArrayList<>();
ResultSet rs = null;
try {
rs = DBUtil.query(conn, currentSql);
while (DBUtil.asyncResultSetNext(rs)) {
String colName = rs.getString("COLUMN_NAME");
columns.add(colName);
}
return columns;
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(rs, null, null);
}
}
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
if (null == preOrPostSqls) {
return Collections.emptyList();
}
List<String> renderedSqls = new ArrayList<>();
for (String sql : preOrPostSqls) {
if (!Strings.isNullOrEmpty(sql)) {
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
}
}
return renderedSqls;
}
public static void executeSqls(Connection conn, List<String> sqls) {
Statement stmt = null;
String currentSql = null;
try {
stmt = conn.createStatement();
for (String sql : sqls) {
currentSql = sql;
DBUtil.executeSqlWithoutResultSet(stmt, sql);
}
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(null, stmt, null);
}
}
public static void preCheckPrePareSQL(StarRocksWriterOptions options) {
String table = options.getTable();
List<String> preSqls = options.getPreSqlList();
List<String> renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(preSqls, table);
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
for (String sql : renderedPreSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch (ParserException e) {
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
public static void preCheckPostSQL(StarRocksWriterOptions options) {
String table = options.getTable();
List<String> postSqls = options.getPostSqlList();
List<String> renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(postSqls, table);
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
for(String sql : renderedPostSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch (ParserException e){
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
}

View File

@ -0,0 +1,6 @@
{
"name": "starrockswriter",
"class": "com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriter",
"description": "useScene: prod. mechanism: StarRocksStreamLoad. warn: The more you know about the database, the less problems you encounter.",
"developer": "starrocks"
}

View File

@ -0,0 +1,14 @@
{
"name": "starrockswriter",
"parameter": {
"username": "",
"password": "",
"database": "",
"table": "",
"column": [],
"preSql": [],
"postSql": [],
"jdbcUrl": "",
"loadUrl": []
}
}