diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md
new file mode 100644
index 00000000..715afa8c
--- /dev/null
+++ b/doriswriter/doc/doriswriter.md
@@ -0,0 +1,170 @@
+# DataX DorisWriter
+
+
+---
+
+
+## 1 快速介绍
+
+DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。在底层实现上, DorisWriter 通过Streamload以csv格式导入数据至Doris。
+
+
+## 2 实现原理
+
+ DorisWriter 通过Streamload以csv格式导入数据至Doris, 内部将`reader`读取的数据进行缓存后批量导入至Doris,以提高写入性能。
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 这里使用一份从内存Mysql读取数据后导入至Doris。
+
+```json
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 1
+ },
+ "errorLimit": {
+ "record": 0,
+ "percentage": 0
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "xxxx",
+ "password": "xxxx",
+ "column": [ "k1", "k2", "v1", "v2" ],
+ "connection": [
+ {
+ "table": [ "table1", "table2" ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/datax_test1"
+ ]
+ },
+ {
+ "table": [ "table3", "table4" ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/datax_test2"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "doriswriter",
+ "parameter": {
+ "username": "xxxx",
+ "password": "xxxx",
+ "database": "xxxx",
+ "table": "xxxx",
+ "column": ["k1", "k2", "v1", "v2"],
+ "preSql": [],
+ "postSql": [],
+ "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
+ "loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"]
+ }
+ }
+ }
+ ]
+ }
+}
+
+```
+
+
+### 3.2 参数说明
+
+* **username**
+
+ * 描述:Doris数据库的用户名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **password**
+
+ * 描述:Doris数据库的密码
+
+ * 必选:是
+
+ * 默认值:无
+
+* **database**
+
+ * 描述:Doris表的数据库名称。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **table**
+
+ * 描述:Doris表的表名称。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **loadUrl**
+
+ * 描述:Doris FE的地址用于Streamload,可以为多个fe地址,`fe_ip:fe_http_port`。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **column**
+
+ * 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
+
+ **column配置项必须指定,不能留空!**
+
+ 注意:我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
+
+ * 必选:是
+
+ * 默认值:否
+
+* **preSql**
+
+ * 描述:写入数据到目的表前,会先执行这里的标准语句。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **postSql**
+
+ * 描述:写入数据到目的表后,会执行这里的标准语句。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **jdbcUrl**
+
+ * 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。
+
+ * 必选:否
+
+ * 默认值:无
+
+
+### 3.3 类型转换
+
+传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
+
+## 4 性能报告
+
+
+## 5 约束限制
+
+
+## FAQ
diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml
new file mode 100755
index 00000000..2bbf1d22
--- /dev/null
+++ b/doriswriter/pom.xml
@@ -0,0 +1,155 @@
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+ doriswriter
+ doriswriter
+ jar
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ com.alibaba.datax
+ plugin-rdbms-util
+ ${datax-project-version}
+
+
+ commons-codec
+ commons-codec
+ 1.9
+
+
+ commons-logging
+ commons-logging
+ 1.1.1
+
+
+ org.apache.httpcomponents
+ httpcore
+ 4.4.6
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.3
+
+
+ com.alibaba
+ fastjson
+ 1.2.75
+
+
+ mysql
+ mysql-connector-java
+ 5.1.34
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.0.0
+
+
+
+ package
+
+ shade
+
+
+ true
+
+
+ com.alibaba.fastjson
+ com.dorisdb.shade.com.alibaba.fastjson
+
+
+ org.apache.http
+ com.dorisdb.shade.org.apache.http
+
+
+ org.apache.commons
+ com.dorisdb.shade.org.apache.commons
+
+
+
+
+ commons-codec:commons-codec
+ commons-logging:*
+ org.apache.httpcomponents:httpclient
+ org.apache.httpcomponents:httpcore
+ com.alibaba:fastjson
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/doriswriter/src/main/assembly/package.xml b/doriswriter/src/main/assembly/package.xml
new file mode 100755
index 00000000..2e0880e1
--- /dev/null
+++ b/doriswriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/doriswriter
+
+
+ target/
+
+ doriswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/doriswriter
+
+
+
+
+
+ false
+ plugin/writer/doriswriter/libs
+ runtime
+
+
+
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java
new file mode 100755
index 00000000..0cbe579c
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -0,0 +1,144 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.manager.DorisWriterManager;
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.util.DorisWriterUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DorisWriter extends Writer {
+
+ public static class Job extends Writer.Job {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+ private Configuration originalConfig = null;
+ private DorisWriterOptions options;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ options = new DorisWriterOptions(super.getPluginJobConf());
+ options.doPretreatment();
+ }
+
+ @Override
+ public void preCheck(){
+ this.init();
+ DorisWriterUtil.preCheckPrePareSQL(options);
+ DorisWriterUtil.preCheckPostSQL(options);
+ }
+
+ @Override
+ public void prepare() {
+ String username = options.getUsername();
+ String password = options.getPassword();
+ String jdbcUrl = options.getJdbcUrl();
+ List renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
+ if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
+ DorisWriterUtil.executeSqls(conn, renderedPreSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ List configurations = new ArrayList<>(mandatoryNumber);
+ for (int i = 0; i < mandatoryNumber; i++) {
+ configurations.add(originalConfig);
+ }
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ String username = options.getUsername();
+ String password = options.getPassword();
+ String jdbcUrl = options.getJdbcUrl();
+ List renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
+ if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
+ DorisWriterUtil.executeSqls(conn, renderedPostSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ }
+
+ public static class Task extends Writer.Task {
+ private DorisWriterManager writerManager;
+ private DorisWriterOptions options;
+
+ @Override
+ public void init() {
+ options = new DorisWriterOptions(super.getPluginJobConf());
+ writerManager = new DorisWriterManager(options);
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ public void startWrite(RecordReceiver recordReceiver) {
+ try {
+ Record record;
+ while ((record = recordReceiver.getFromReader()) != null) {
+ if (record.getColumnNumber() != options.getColumns().size()) {
+ throw DataXException
+ .asDataXException(
+ DBUtilErrorCode.CONF_ERROR,
+ String.format(
+ "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
+ record.getColumnNumber(),
+ options.getColumns().size()));
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < record.getColumnNumber(); i++) {
+ sb.append(record.getColumn(i).getRawData().toString());
+ if (i < record.getColumnNumber() - 1) {
+ sb.append("\t");
+ }
+ }
+ writerManager.writeRecord(sb.toString());
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
+ }
+
+ @Override
+ public void post() {
+ try {
+ writerManager.flush(writerManager.createBatchLabel());
+ } catch (Exception e) {
+ throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
+ }
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public boolean supportFailOver(){
+ return false;
+ }
+ }
+}
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java
new file mode 100644
index 00000000..1d9cb2be
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java
@@ -0,0 +1,111 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter;
+
+import java.io.Serializable;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+
+import java.util.List;
+
+public class DorisWriterOptions implements Serializable {
+
+ private static final long serialVersionUID = 1l;
+ private static final long KILO_BYTES_SCALE = 1024l;
+ private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
+ private static final int MAX_RETRIES = 1;
+ private static final int BATCH_ROWS = 500000;
+ private static final long BATCH_BYTES = 100 * MEGA_BYTES_SCALE;
+
+ private static final String KEY_USERNAME = "username";
+ private static final String KEY_PASSWORD = "password";
+ private static final String KEY_DATABASE = "database";
+ private static final String KEY_TABLE = "table";
+ private static final String KEY_COLUMN = "column";
+ private static final String KEY_PRE_SQL = "preSql";
+ private static final String KEY_POST_SQL = "postSql";
+ private static final String KEY_JDBC_URL = "jdbcUrl";
+ private static final String KEY_LOAD_URL = "loadUrl";
+
+ private final Configuration options;
+
+ public DorisWriterOptions(Configuration options) {
+ this.options = options;
+ }
+
+ public void doPretreatment() {
+ validateRequired();
+ validateStreamLoadUrl();
+ }
+
+ public String getJdbcUrl() {
+ return options.getString(KEY_JDBC_URL);
+ }
+
+ public String getDatabase() {
+ return options.getString(KEY_DATABASE);
+ }
+
+ public String getTable() {
+ return options.getString(KEY_TABLE);
+ }
+
+ public String getUsername() {
+ return options.getString(KEY_USERNAME);
+ }
+
+ public String getPassword() {
+ return options.getString(KEY_PASSWORD);
+ }
+
+ public List getLoadUrlList() {
+ return options.getList(KEY_LOAD_URL, String.class);
+ }
+
+ public List getColumns() {
+ return options.getList(KEY_COLUMN, String.class);
+ }
+
+ public List getPreSqlList() {
+ return options.getList(KEY_PRE_SQL, String.class);
+ }
+
+ public List getPostSqlList() {
+ return options.getList(KEY_POST_SQL, String.class);
+ }
+
+ public int getMaxRetries() {
+ return MAX_RETRIES;
+ }
+
+ public int getBatchRows() {
+ return BATCH_ROWS;
+ }
+
+ public long getBatchSize() {
+ return BATCH_BYTES;
+ }
+
+ private void validateStreamLoadUrl() {
+ List urlList = getLoadUrlList();
+ for (String host : urlList) {
+ if (host.split(":").length < 2) {
+ throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+ "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
+ }
+ }
+ }
+
+ private void validateRequired() {
+ final String[] requiredOptionKeys = new String[]{
+ KEY_USERNAME,
+ KEY_PASSWORD,
+ KEY_DATABASE,
+ KEY_TABLE,
+ KEY_LOAD_URL
+ };
+ for (String optionKey : requiredOptionKeys) {
+ options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
+ }
+ }
+}
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java
new file mode 100644
index 00000000..a01b906b
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java
@@ -0,0 +1,143 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import com.alibaba.fastjson.JSON;
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class DorisStreamLoadVisitor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class);
+
+ private final DorisWriterOptions writerOptions;
+ private int pos;
+
+ public DorisStreamLoadVisitor(DorisWriterOptions writerOptions) {
+ this.writerOptions = writerOptions;
+ }
+
+ public void doStreamLoad(String label, List labeledRows) throws IOException {
+ String host = getAvailableHost();
+ if (null == host) {
+ throw new IOException("None of the host in `load_url` could be connected.");
+ }
+ String loadUrl = new StringBuilder(host)
+ .append("/api/")
+ .append(writerOptions.getDatabase())
+ .append("/")
+ .append(writerOptions.getTable())
+ .append("/_stream_load")
+ .toString();
+ Map loadResult = doHttpPut(loadUrl, label, joinRows(labeledRows));
+ final String keyStatus = "Status";
+ if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+ throw new IOException("Unable to flush data to doris: unknown result status.");
+ }
+ if (loadResult.get(keyStatus).equals("Fail")) {
+ throw new IOException(
+ new StringBuilder("Failed to flush data to doris.").append(loadResult.get("Message").toString()).toString()
+ );
+ }
+ }
+
+ private String getAvailableHost() {
+ List hostList = writerOptions.getLoadUrlList();
+ if (pos >= hostList.size()) {
+ pos = 0;
+ }
+ for (; pos < hostList.size(); pos++) {
+ String host = new StringBuilder("http://").append(hostList.get(pos)).toString();
+ if (tryHttpConnection(host)) {
+ return host;
+ }
+ }
+ return null;
+ }
+
+ private boolean tryHttpConnection(String host) {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(1000);
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e1) {
+ LOG.warn("Failed to connect to address:{}", host, e1);
+ return false;
+ }
+ }
+
+ private byte[] joinRows(List rows) {
+ return String.join("\n", rows).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
+ LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ });
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ HttpPut httpPut = new HttpPut(loadUrl);
+ List cols = writerOptions.getColumns();
+ if (null != cols && !cols.isEmpty()) {
+ httpPut.setHeader("columns", String.join(",", cols));
+ }
+ httpPut.setHeader("Expect", "100-continue");
+ httpPut.setHeader("label", label);
+ httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
+ httpPut.setEntity(new ByteArrayEntity(data));
+ httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+ int code = resp.getStatusLine().getStatusCode();
+ if (200 != code) {
+ LOG.warn("Request failed with code:{}", code);
+ return null;
+ }
+ HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ LOG.warn("Request failed with empty response.");
+ return null;
+ }
+ return (Map)JSON.parse(EntityUtils.toString(respEntity));
+ }
+ }
+ }
+
+ private String getBasicAuthHeader(String username, String password) {
+ String auth = username + ":" + password;
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes());
+ return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
+ }
+
+}
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java
new file mode 100644
index 00000000..86d370a5
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java
@@ -0,0 +1,113 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
+
+public class DorisWriterManager implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
+
+ private final DorisStreamLoadVisitor dorisStreamLoadVisitor;
+ private final DorisWriterOptions writerOptions;
+
+ private final List buffer = new ArrayList<>();
+ private int batchCount = 0;
+ private long batchSize = 0;
+ private volatile boolean closed = false;
+ private volatile Exception flushException;
+
+ public DorisWriterManager(DorisWriterOptions writerOptions) {
+ this.writerOptions = writerOptions;
+ this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions);
+ }
+
+ public final synchronized void writeRecord(String record) throws IOException {
+ checkFlushException();
+ try {
+ buffer.add(record);
+ batchCount++;
+ batchSize += record.length();
+ if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
+ flush(createBatchLabel());
+ }
+ } catch (Exception e) {
+ throw new IOException("Writing records to Doris failed.", e);
+ }
+ }
+
+ public synchronized void flush(String label) throws IOException {
+ checkFlushException();
+ if (batchCount == 0) {
+ return;
+ }
+ for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
+ try {
+ tryToFlush(label);
+ buffer.clear();
+ batchCount = 0;
+ batchSize = 0;
+ break;
+ } catch (IOException e) {
+ LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e);
+ if (i >= writerOptions.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ try {
+ Thread.sleep(1000l * (i + 1));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Unable to flush, interrupted while doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ if (batchCount > 0) {
+ try {
+ flush(createBatchLabel());
+ } catch (Exception e) {
+ throw new RuntimeException("Writing records to Doris failed.", e);
+ }
+ }
+ }
+ checkFlushException();
+ }
+
+ public String createBatchLabel() {
+ return UUID.randomUUID().toString();
+ }
+
+ public List getBufferedBatchList() {
+ return buffer;
+ }
+
+ public void setBufferedBatchList(List buffer) {
+ this.buffer.clear();
+ this.buffer.addAll(buffer);
+ }
+
+ private void tryToFlush(String label) throws IOException {
+ // flush to Doris with stream load
+ dorisStreamLoadVisitor.doStreamLoad(label, buffer);
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to Doris failed.", flushException);
+ }
+ }
+}
diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java
new file mode 100755
index 00000000..348e519c
--- /dev/null
+++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java
@@ -0,0 +1,83 @@
+package com.dorisdb.connector.datax.plugin.writer.doriswriter.util;
+
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
+import com.google.common.base.Strings;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.*;
+
+public final class DorisWriterUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisWriterUtil.class);
+
+ private DorisWriterUtil() {}
+
+ public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) {
+ if (null == preOrPostSqls) {
+ return Collections.emptyList();
+ }
+ List renderedSqls = new ArrayList<>();
+ for (String sql : preOrPostSqls) {
+ if (!Strings.isNullOrEmpty(sql)) {
+ renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
+ }
+ }
+ return renderedSqls;
+ }
+
+ public static void executeSqls(Connection conn, List sqls) {
+ Statement stmt = null;
+ String currentSql = null;
+ try {
+ stmt = conn.createStatement();
+ for (String sql : sqls) {
+ currentSql = sql;
+ DBUtil.executeSqlWithoutResultSet(stmt, sql);
+ }
+ } catch (Exception e) {
+ throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
+ } finally {
+ DBUtil.closeDBResources(null, stmt, null);
+ }
+ }
+
+ public static void preCheckPrePareSQL(DorisWriterOptions options) {
+ String table = options.getTable();
+ List preSqls = options.getPreSqlList();
+ List renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(preSqls, table);
+ if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
+ LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
+ for (String sql : renderedPreSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ } catch (ParserException e) {
+ throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
+ }
+ }
+ }
+ }
+
+ public static void preCheckPostSQL(DorisWriterOptions options) {
+ String table = options.getTable();
+ List postSqls = options.getPostSqlList();
+ List renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(postSqls, table);
+ if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
+ LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
+ for(String sql : renderedPostSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ } catch (ParserException e){
+ throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
+ }
+ }
+ }
+ }
+}
diff --git a/doriswriter/src/main/resources/plugin.json b/doriswriter/src/main/resources/plugin.json
new file mode 100755
index 00000000..081ddace
--- /dev/null
+++ b/doriswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "doriswriter",
+ "class": "com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriter",
+ "description": "useScene: prod. mechanism: DorisStreamLoad. warn: The more you know about the database, the less problems you encounter.",
+ "developer": "dorisdb"
+}
\ No newline at end of file
diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..ee1744c8
--- /dev/null
+++ b/doriswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,14 @@
+{
+ "name": "doriswriter",
+ "parameter": {
+ "username": "",
+ "password": "",
+ "database": "",
+ "table": "",
+ "column": [],
+ "preSql": [],
+ "postSql": [],
+ "jdbcUrl": "",
+ "loadUrl": []
+ }
+}
\ No newline at end of file
diff --git a/package.xml b/package.xml
index 49e3c4ec..347ee81a 100755
--- a/package.xml
+++ b/package.xml
@@ -189,6 +189,13 @@
datax
+
+ doriswriter/target/datax/
+
+ **/*.*
+
+ datax
+
drdswriter/target/datax/
diff --git a/pom.xml b/pom.xml
index 8f3b827e..96ad4333 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@
mysqlwriter
+ doriswriter
drdswriter
odpswriter
txtfilewriter
@@ -97,7 +98,7 @@
gdbwriter
cassandrawriter
clickhousewriter
- oscarwriter
+
plugin-rdbms-util
plugin-unstructured-storage-util