From 9f5d9a6317367b28a4257bfc3654e469064275e2 Mon Sep 17 00:00:00 2001 From: fariel Date: Wed, 10 Mar 2021 13:34:02 +0800 Subject: [PATCH] add doriswriter --- doriswriter/doc/doriswriter.md | 170 ++++++++++++++++++ doriswriter/pom.xml | 155 ++++++++++++++++ doriswriter/src/main/assembly/package.xml | 35 ++++ .../writer/doriswriter/DorisWriter.java | 144 +++++++++++++++ .../doriswriter/DorisWriterOptions.java | 111 ++++++++++++ .../manager/DorisStreamLoadVisitor.java | 143 +++++++++++++++ .../manager/DorisWriterManager.java | 113 ++++++++++++ .../doriswriter/util/DorisWriterUtil.java | 83 +++++++++ doriswriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 14 ++ package.xml | 7 + pom.xml | 3 +- 12 files changed, 983 insertions(+), 1 deletion(-) create mode 100644 doriswriter/doc/doriswriter.md create mode 100755 doriswriter/pom.xml create mode 100755 doriswriter/src/main/assembly/package.xml create mode 100755 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java create mode 100644 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java create mode 100644 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java create mode 100644 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java create mode 100755 doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java create mode 100755 doriswriter/src/main/resources/plugin.json create mode 100644 doriswriter/src/main/resources/plugin_job_template.json diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md new file mode 100644 index 00000000..715afa8c --- /dev/null +++ b/doriswriter/doc/doriswriter.md @@ -0,0 +1,170 @@ +# DataX DorisWriter + + +--- + + +## 1 快速介绍 + +DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。在底层实现上, DorisWriter 通过Streamload以csv格式导入数据至Doris。 + + +## 2 实现原理 + + DorisWriter 通过Streamload以csv格式导入数据至Doris, 内部将`reader`读取的数据进行缓存后批量导入至Doris,以提高写入性能。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 这里使用一份从内存Mysql读取数据后导入至Doris。 + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 1 + }, + "errorLimit": { + "record": 0, + "percentage": 0 + } + }, + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "xxxx", + "password": "xxxx", + "column": [ "k1", "k2", "v1", "v2" ], + "connection": [ + { + "table": [ "table1", "table2" ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/datax_test1" + ] + }, + { + "table": [ "table3", "table4" ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/datax_test2" + ] + } + ] + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "username": "xxxx", + "password": "xxxx", + "database": "xxxx", + "table": "xxxx", + "column": ["k1", "k2", "v1", "v2"], + "preSql": [], + "postSql": [], + "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/", + "loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"] + } + } + } + ] + } +} + +``` + + +### 3.2 参数说明 + +* **username** + + * 描述:Doris数据库的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:Doris数据库的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **database** + + * 描述:Doris表的数据库名称。 + + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:Doris表的表名称。 + + * 必选:是
+ + * 默认值:无
+ +* **loadUrl** + + * 描述:Doris FE的地址用于Streamload,可以为多个fe地址,`fe_ip:fe_http_port`。 + + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 + + **column配置项必须指定,不能留空!** + + 注意:我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败 + + * 必选:是
+ + * 默认值:否
+ +* **preSql** + + * 描述:写入数据到目的表前,会先执行这里的标准语句。
+ + * 必选:否
+ + * 默认值:无
+ +* **postSql** + + * 描述:写入数据到目的表后,会执行这里的标准语句。
+ + * 必选:否
+ + * 默认值:无
+ +* **jdbcUrl** + + * 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。
+ + * 必选:否
+ + * 默认值:无
+ + +### 3.3 类型转换 + +传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。 + +## 4 性能报告 + + +## 5 约束限制 + + +## FAQ diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml new file mode 100755 index 00000000..2bbf1d22 --- /dev/null +++ b/doriswriter/pom.xml @@ -0,0 +1,155 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + doriswriter + doriswriter + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + commons-codec + commons-codec + 1.9 + + + commons-logging + commons-logging + 1.1.1 + + + org.apache.httpcomponents + httpcore + 4.4.6 + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + com.alibaba + fastjson + 1.2.75 + + + mysql + mysql-connector-java + 5.1.34 + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + true + + + com.alibaba.fastjson + com.dorisdb.shade.com.alibaba.fastjson + + + org.apache.http + com.dorisdb.shade.org.apache.http + + + org.apache.commons + com.dorisdb.shade.org.apache.commons + + + + + commons-codec:commons-codec + commons-logging:* + org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore + com.alibaba:fastjson + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/doriswriter/src/main/assembly/package.xml b/doriswriter/src/main/assembly/package.xml new file mode 100755 index 00000000..2e0880e1 --- /dev/null +++ b/doriswriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/doriswriter + + + target/ + + doriswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/doriswriter + + + + + + false + plugin/writer/doriswriter/libs + runtime + + + diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java new file mode 100755 index 00000000..0cbe579c --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java @@ -0,0 +1,144 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.manager.DorisWriterManager; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.util.DorisWriterUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + +public class DorisWriter extends Writer { + + public static class Job extends Writer.Job { + + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig = null; + private DorisWriterOptions options; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + options = new DorisWriterOptions(super.getPluginJobConf()); + options.doPretreatment(); + } + + @Override + public void preCheck(){ + this.init(); + DorisWriterUtil.preCheckPrePareSQL(options); + DorisWriterUtil.preCheckPostSQL(options); + } + + @Override + public void prepare() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable()); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl); + DorisWriterUtil.executeSqls(conn, renderedPreSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(originalConfig); + } + return configurations; + } + + @Override + public void post() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + DorisWriterUtil.executeSqls(conn, renderedPostSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public void destroy() { + } + + } + + public static class Task extends Writer.Task { + private DorisWriterManager writerManager; + private DorisWriterOptions options; + + @Override + public void init() { + options = new DorisWriterOptions(super.getPluginJobConf()); + writerManager = new DorisWriterManager(options); + } + + @Override + public void prepare() { + } + + public void startWrite(RecordReceiver recordReceiver) { + try { + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != options.getColumns().size()) { + throw DataXException + .asDataXException( + DBUtilErrorCode.CONF_ERROR, + String.format( + "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", + record.getColumnNumber(), + options.getColumns().size())); + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < record.getColumnNumber(); i++) { + sb.append(record.getColumn(i).getRawData().toString()); + if (i < record.getColumnNumber() - 1) { + sb.append("\t"); + } + } + writerManager.writeRecord(sb.toString()); + } + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void post() { + try { + writerManager.flush(writerManager.createBatchLabel()); + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void destroy() {} + + @Override + public boolean supportFailOver(){ + return false; + } + } +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java new file mode 100644 index 00000000..1d9cb2be --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java @@ -0,0 +1,111 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter; + +import java.io.Serializable; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; + +import java.util.List; + +public class DorisWriterOptions implements Serializable { + + private static final long serialVersionUID = 1l; + private static final long KILO_BYTES_SCALE = 1024l; + private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE; + private static final int MAX_RETRIES = 1; + private static final int BATCH_ROWS = 500000; + private static final long BATCH_BYTES = 100 * MEGA_BYTES_SCALE; + + private static final String KEY_USERNAME = "username"; + private static final String KEY_PASSWORD = "password"; + private static final String KEY_DATABASE = "database"; + private static final String KEY_TABLE = "table"; + private static final String KEY_COLUMN = "column"; + private static final String KEY_PRE_SQL = "preSql"; + private static final String KEY_POST_SQL = "postSql"; + private static final String KEY_JDBC_URL = "jdbcUrl"; + private static final String KEY_LOAD_URL = "loadUrl"; + + private final Configuration options; + + public DorisWriterOptions(Configuration options) { + this.options = options; + } + + public void doPretreatment() { + validateRequired(); + validateStreamLoadUrl(); + } + + public String getJdbcUrl() { + return options.getString(KEY_JDBC_URL); + } + + public String getDatabase() { + return options.getString(KEY_DATABASE); + } + + public String getTable() { + return options.getString(KEY_TABLE); + } + + public String getUsername() { + return options.getString(KEY_USERNAME); + } + + public String getPassword() { + return options.getString(KEY_PASSWORD); + } + + public List getLoadUrlList() { + return options.getList(KEY_LOAD_URL, String.class); + } + + public List getColumns() { + return options.getList(KEY_COLUMN, String.class); + } + + public List getPreSqlList() { + return options.getList(KEY_PRE_SQL, String.class); + } + + public List getPostSqlList() { + return options.getList(KEY_POST_SQL, String.class); + } + + public int getMaxRetries() { + return MAX_RETRIES; + } + + public int getBatchRows() { + return BATCH_ROWS; + } + + public long getBatchSize() { + return BATCH_BYTES; + } + + private void validateStreamLoadUrl() { + List urlList = getLoadUrlList(); + for (String host : urlList) { + if (host.split(":").length < 2) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。"); + } + } + } + + private void validateRequired() { + final String[] requiredOptionKeys = new String[]{ + KEY_USERNAME, + KEY_PASSWORD, + KEY_DATABASE, + KEY_TABLE, + KEY_LOAD_URL + }; + for (String optionKey : requiredOptionKeys) { + options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); + } + } +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java new file mode 100644 index 00000000..a01b906b --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java @@ -0,0 +1,143 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager; + +import java.io.IOException; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +import com.alibaba.fastjson.JSON; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + + +public class DorisStreamLoadVisitor implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class); + + private final DorisWriterOptions writerOptions; + private int pos; + + public DorisStreamLoadVisitor(DorisWriterOptions writerOptions) { + this.writerOptions = writerOptions; + } + + public void doStreamLoad(String label, List labeledRows) throws IOException { + String host = getAvailableHost(); + if (null == host) { + throw new IOException("None of the host in `load_url` could be connected."); + } + String loadUrl = new StringBuilder(host) + .append("/api/") + .append(writerOptions.getDatabase()) + .append("/") + .append(writerOptions.getTable()) + .append("/_stream_load") + .toString(); + Map loadResult = doHttpPut(loadUrl, label, joinRows(labeledRows)); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new IOException("Unable to flush data to doris: unknown result status."); + } + if (loadResult.get(keyStatus).equals("Fail")) { + throw new IOException( + new StringBuilder("Failed to flush data to doris.").append(loadResult.get("Message").toString()).toString() + ); + } + } + + private String getAvailableHost() { + List hostList = writerOptions.getLoadUrlList(); + if (pos >= hostList.size()) { + pos = 0; + } + for (; pos < hostList.size(); pos++) { + String host = new StringBuilder("http://").append(hostList.get(pos)).toString(); + if (tryHttpConnection(host)) { + return host; + } + } + return null; + } + + private boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(1000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + LOG.warn("Failed to connect to address:{}", host, e1); + return false; + } + } + + private byte[] joinRows(List rows) { + return String.join("\n", rows).getBytes(StandardCharsets.UTF_8); + } + + @SuppressWarnings("unchecked") + private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { + LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(loadUrl); + List cols = writerOptions.getColumns(); + if (null != cols && !cols.isEmpty()) { + httpPut.setHeader("columns", String.join(",", cols)); + } + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("label", label); + httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword())); + httpPut.setEntity(new ByteArrayEntity(data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return (Map)JSON.parse(EntityUtils.toString(respEntity)); + } + } + } + + private String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes()); + return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); + } + +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java new file mode 100644 index 00000000..86d370a5 --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java @@ -0,0 +1,113 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; + +public class DorisWriterManager implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class); + + private final DorisStreamLoadVisitor dorisStreamLoadVisitor; + private final DorisWriterOptions writerOptions; + + private final List buffer = new ArrayList<>(); + private int batchCount = 0; + private long batchSize = 0; + private volatile boolean closed = false; + private volatile Exception flushException; + + public DorisWriterManager(DorisWriterOptions writerOptions) { + this.writerOptions = writerOptions; + this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions); + } + + public final synchronized void writeRecord(String record) throws IOException { + checkFlushException(); + try { + buffer.add(record); + batchCount++; + batchSize += record.length(); + if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) { + flush(createBatchLabel()); + } + } catch (Exception e) { + throw new IOException("Writing records to Doris failed.", e); + } + } + + public synchronized void flush(String label) throws IOException { + checkFlushException(); + if (batchCount == 0) { + return; + } + for (int i = 0; i <= writerOptions.getMaxRetries(); i++) { + try { + tryToFlush(label); + buffer.clear(); + batchCount = 0; + batchSize = 0; + break; + } catch (IOException e) { + LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e); + if (i >= writerOptions.getMaxRetries()) { + throw new IOException(e); + } + try { + Thread.sleep(1000l * (i + 1)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Unable to flush, interrupted while doing another attempt", e); + } + } + } + } + + public synchronized void close() { + if (!closed) { + closed = true; + + if (batchCount > 0) { + try { + flush(createBatchLabel()); + } catch (Exception e) { + throw new RuntimeException("Writing records to Doris failed.", e); + } + } + } + checkFlushException(); + } + + public String createBatchLabel() { + return UUID.randomUUID().toString(); + } + + public List getBufferedBatchList() { + return buffer; + } + + public void setBufferedBatchList(List buffer) { + this.buffer.clear(); + this.buffer.addAll(buffer); + } + + private void tryToFlush(String label) throws IOException { + // flush to Doris with stream load + dorisStreamLoadVisitor.doStreamLoad(label, buffer); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to Doris failed.", flushException); + } + } +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java new file mode 100755 index 00000000..348e519c --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/util/DorisWriterUtil.java @@ -0,0 +1,83 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.util; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.util.RdbmsException; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.druid.sql.parser.ParserException; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; +import com.google.common.base.Strings; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.*; + +public final class DorisWriterUtil { + private static final Logger LOG = LoggerFactory.getLogger(DorisWriterUtil.class); + + private DorisWriterUtil() {} + + public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { + if (null == preOrPostSqls) { + return Collections.emptyList(); + } + List renderedSqls = new ArrayList<>(); + for (String sql : preOrPostSqls) { + if (!Strings.isNullOrEmpty(sql)) { + renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); + } + } + return renderedSqls; + } + + public static void executeSqls(Connection conn, List sqls) { + Statement stmt = null; + String currentSql = null; + try { + stmt = conn.createStatement(); + for (String sql : sqls) { + currentSql = sql; + DBUtil.executeSqlWithoutResultSet(stmt, sql); + } + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(null, stmt, null); + } + } + + public static void preCheckPrePareSQL(DorisWriterOptions options) { + String table = options.getTable(); + List preSqls = options.getPreSqlList(); + List renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(preSqls, table); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls)); + for (String sql : renderedPreSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch (ParserException e) { + throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } + + public static void preCheckPostSQL(DorisWriterOptions options) { + String table = options.getTable(); + List postSqls = options.getPostSqlList(); + List renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(postSqls, table); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); + for(String sql : renderedPostSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch (ParserException e){ + throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } +} diff --git a/doriswriter/src/main/resources/plugin.json b/doriswriter/src/main/resources/plugin.json new file mode 100755 index 00000000..081ddace --- /dev/null +++ b/doriswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "doriswriter", + "class": "com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriter", + "description": "useScene: prod. mechanism: DorisStreamLoad. warn: The more you know about the database, the less problems you encounter.", + "developer": "dorisdb" +} \ No newline at end of file diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..ee1744c8 --- /dev/null +++ b/doriswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,14 @@ +{ + "name": "doriswriter", + "parameter": { + "username": "", + "password": "", + "database": "", + "table": "", + "column": [], + "preSql": [], + "postSql": [], + "jdbcUrl": "", + "loadUrl": [] + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 49e3c4ec..347ee81a 100755 --- a/package.xml +++ b/package.xml @@ -189,6 +189,13 @@ datax + + doriswriter/target/datax/ + + **/*.* + + datax + drdswriter/target/datax/ diff --git a/pom.xml b/pom.xml index 8f3b827e..96ad4333 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,7 @@ mysqlwriter + doriswriter drdswriter odpswriter txtfilewriter @@ -97,7 +98,7 @@ gdbwriter cassandrawriter clickhousewriter - oscarwriter + plugin-rdbms-util plugin-unstructured-storage-util