diff --git a/core/pom.xml b/core/pom.xml index 970f95a6..7685001b 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -41,7 +41,7 @@ org.apache.httpcomponents httpclient - 4.5 + 4.5.13 org.apache.httpcomponents diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md new file mode 100644 index 00000000..973d4bcf --- /dev/null +++ b/doriswriter/doc/doriswriter.md @@ -0,0 +1,181 @@ +# DorisWriter 插件文档 + +## 1 快速介绍 +DorisWriter支持将大批量数据写入Doris中。 + +## 2 实现原理 +DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter会将`reader`读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。 + +## 3 功能说明 + +### 3.1 配置样例 + +这里是一份从Stream读取数据后导入至Doris的配置文件。 + +``` +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"], + "connection": [ + { + "jdbcUrl": ["jdbc:mysql://localhost:3306/demo"], + "table": ["employees_1"] + } + ], + "username": "root", + "password": "xxxxx", + "where": "" + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "loadUrl": ["172.16.0.13:8030"], + "loadProps": { + }, + "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"], + "username": "root", + "password": "xxxxxx", + "postSql": ["select count(1) from all_employees_info"], + "preSql": [], + "flushInterval":30000, + "connection": [ + { + "jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo", + "selectedDatabase": "demo", + "table": ["all_employees_info"] + } + ], + "loadProps": { + "format": "json", + "strip_outer_array": true + } + } + } + } + ], + "setting": { + "speed": { + "channel": "1" + } + } + } +} +``` + +### 3.2 参数说明 + +* **jdbcUrl** + + - 描述:Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。 + - 必选:是 + - 默认值:无 + +* **loadUrl** + + - 描述:作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,多个之间使用英文状态的分号隔开:`;`,doriswriter 将以轮询的方式访问。 + - 必选:是 + - 默认值:无 + +* **username** + + - 描述:访问Doris数据库的用户名 + - 必选:是 + - 默认值:无 + +* **password** + + - 描述:访问Doris数据库的密码 + - 必选:否 + - 默认值:空 + +* **connection.selectedDatabase** + - 描述:需要写入的Doris数据库名称。 + - 必选:是 + - 默认值:无 + +* **connection.table** + - 描述:需要写入的Doris表名称。 + - 必选:是 + - 默认值:无 + +* **column** + + - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 + - 必选:是 + - 默认值:否 + +* **preSql** + + - 描述:写入数据到目的表前,会先执行这里的标准语句。 + - 必选:否 + - 默认值:无 + +* **postSql** + + - 描述:写入数据到目的表后,会执行这里的标准语句。 + - 必选:否 + - 默认值:无 + + +* **maxBatchRows** + + - 描述:每批次导入数据的最大行数。和 **maxBatchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 + - 必选:否 + - 默认值:500000 + +* **maxBatchSize** + + - 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 + - 必选:否 + - 默认值:104857600 + +* **maxRetries** + + - 描述:每批次导入数据失败后的重试次数。 + - 必选:否 + - 默认值:0 + +* **labelPrefix** + + - 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入 + - 必选:否 + - 默认值:`datax_doris_writer_` + +* **loadProps** + + - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual) + + 这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息 + + - 必选:否 + + - 默认值:无 + +### 类型转换 + +默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。 + +默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可: + +```json +"loadProps": { + "column_separator": "\\x01", + "row_delimiter": "\\x02" +} +``` + +如需更改导入格式为`json`, 则正确配置 `loadProps` 即可: +```json +"loadProps": { + "format": "json", + "strip_outer_array": true +} +``` + +更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual) \ No newline at end of file diff --git a/doriswriter/doc/mysql2doris.json b/doriswriter/doc/mysql2doris.json new file mode 100644 index 00000000..6992a2be --- /dev/null +++ b/doriswriter/doc/mysql2doris.json @@ -0,0 +1,46 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": ["k1", "k2", "k3"], + "connection": [ + { + "jdbcUrl": ["jdbc:mysql://192.168.10.10:3306/db1"], + "table": ["t1"] + } + ], + "username": "root", + "password": "", + "where": "" + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "loadUrl": ["192.168.1.1:8030"], + "loadProps": {}, + "database": "db1", + "column": ["k1", "k2", "k3"], + "username": "root", + "password": "", + "postSql": [], + "preSql": [], + "connection": [ + "jdbcUrl":"jdbc:mysql://192.168.1.1:9030/", + "table":["xxx"], + "selectedDatabase":"xxxx" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": "1" + } + } + } +} diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml new file mode 100644 index 00000000..aa1e6ff0 --- /dev/null +++ b/doriswriter/pom.xml @@ -0,0 +1,99 @@ + + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + doriswriter + doriswriter + jar + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + mysql + mysql-connector-java + ${mysql.driver.version} + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/doriswriter/src/main/assembly/package.xml b/doriswriter/src/main/assembly/package.xml new file mode 100644 index 00000000..71596332 --- /dev/null +++ b/doriswriter/src/main/assembly/package.xml @@ -0,0 +1,52 @@ + + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/doriswriter + + + target/ + + doriswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/doriswriter + + + + + false + plugin/writer/doriswriter/libs + runtime + + + diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java new file mode 100644 index 00000000..e84bd7dd --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java @@ -0,0 +1,54 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.google.common.base.Strings; + +import java.io.StringWriter; + +public class DelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + public static String parse(String sp, String dSp) throws RuntimeException { + if ( Strings.isNullOrEmpty(sp)) { + return dSp; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java new file mode 100644 index 00000000..ee7ded56 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java @@ -0,0 +1,23 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Column; + +public class DorisBaseCodec { + protected String convertionField( Column col) { + if (null == col.getRawData() || Column.Type.NULL == col.getType()) { + return null; + } + if ( Column.Type.BOOL == col.getType()) { + return String.valueOf(col.asLong()); + } + if ( Column.Type.BYTES == col.getType()) { + byte[] bts = (byte[])col.getRawData(); + long value = 0; + for (int i = 0; i < bts.length; i++) { + value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); + } + return String.valueOf(value); + } + return col.asString(); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java new file mode 100644 index 00000000..a2437a1c --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +import java.io.Serializable; + +public interface DorisCodec extends Serializable { + + String codec( Record row); +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java new file mode 100644 index 00000000..22c4b409 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.Map; + +public class DorisCodecFactory { + public DorisCodecFactory (){ + + } + public static DorisCodec createCodec( Keys writerOptions) { + if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + Map props = writerOptions.getLoadProps(); + return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); + } + if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new DorisJsonCodec (writerOptions.getColumns()); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java new file mode 100644 index 00000000..518aa304 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -0,0 +1,27 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec { + + private static final long serialVersionUID = 1L; + + private final String columnSeparator; + + public DorisCsvCodec ( String sp) { + this.columnSeparator = DelimiterParser.parse(sp, "\t"); + } + + @Override + public String codec( Record row) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.getColumnNumber(); i++) { + String value = convertionField(row.getColumn(i)); + sb.append(null == value ? "\\N" : value); + if (i < row.getColumnNumber() - 1) { + sb.append(columnSeparator); + } + } + return sb.toString(); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java new file mode 100644 index 00000000..e6c05733 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSON; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DorisJsonCodec extends DorisBaseCodec implements DorisCodec { + + private static final long serialVersionUID = 1L; + + private final List fieldNames; + + public DorisJsonCodec ( List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public String codec( Record row) { + if (null == fieldNames) { + return ""; + } + Map rowMap = new HashMap<> (fieldNames.size()); + int idx = 0; + for (String fieldName : fieldNames) { + rowMap.put(fieldName, convertionField(row.getColumn(idx))); + idx++; + } + return JSON.toJSONString(rowMap); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java new file mode 100644 index 00000000..3e4db6cf --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -0,0 +1,233 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.fastjson.JSON; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class DorisStreamLoadObserver { + private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class); + + private Keys options; + + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + + public DorisStreamLoadObserver ( Keys options){ + this.options = options; + } + + public void streamLoad(WriterTuple data) throws Exception { + String host = getLoadHost(); + if(host == null){ + throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration."); + } + String loadUrl = new StringBuilder(host) + .append("/api/") + .append(options.getDatabase()) + .append("/") + .append(options.getTable()) + .append("/_stream_load") + .toString(); + LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel()); + Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue())); + LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult)); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new IOException("Unable to flush data to Doris: unknown result status."); + } + LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + throw new IOException( + new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString() + ); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); + checkStreamLoadState(host, data.getLabel()); + } + } + + private void checkStreamLoadState(String host, String label) throws IOException { + int idx = 0; + while(true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + } catch (InterruptedException ex) { + break; + } + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString()); + httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); + httpGet.setHeader("Connection", "close"); + + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s].\n", label), null); + } + Map result = (Map)JSON.parse(EntityUtils.toString(respEntity)); + String labelState = (String)result.get("state"); + if (null == labelState) { + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); + } + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch(labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new DorisWriterExcetion (String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState), null, true); + case RESULT_LABEL_UNKNOWN: + default: + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState), null); + } + } + } + } + } + + private byte[] addRows(List rows, int totalBytes) { + if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); + byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); + } + private Map put(String loadUrl, String label, byte[] data) throws IOException { + LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy () { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(loadUrl); + List cols = options.getColumns(); + if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); + } + if (null != options.getLoadProps()) { + for (Map.Entry entry : options.getLoadProps().entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("label", label); + httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); + httpPut.setEntity(new ByteArrayEntity (data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) + return null; + return (Map)JSON.parse(EntityUtils.toString(respEntity)); + } + } + } + + private String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); + } + + private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + private String getLoadHost() { + List hostList = options.getLoadUrlList(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); + if (checkConnection(host)) { + return host; + } + } + return null; + } + + private boolean checkConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(5000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + e1.printStackTrace(); + return false; + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java new file mode 100644 index 00000000..5f5a6f34 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java @@ -0,0 +1,105 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.util.RdbmsException; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.druid.sql.parser.ParserException; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * jdbc util + */ +public class DorisUtil { + private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class); + + private DorisUtil() {} + + public static List getDorisTableColumns( Connection conn, String databaseName, String tableName) { + String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName); + List columns = new ArrayList<> (); + ResultSet rs = null; + try { + rs = DBUtil.query(conn, currentSql); + while (DBUtil.asyncResultSetNext(rs)) { + String colName = rs.getString("COLUMN_NAME"); + columns.add(colName); + } + return columns; + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(rs, null, null); + } + } + + public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { + if (null == preOrPostSqls) { + return Collections.emptyList(); + } + List renderedSqls = new ArrayList<>(); + for (String sql : preOrPostSqls) { + if (! Strings.isNullOrEmpty(sql)) { + renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); + } + } + return renderedSqls; + } + + public static void executeSqls(Connection conn, List sqls) { + Statement stmt = null; + String currentSql = null; + try { + stmt = conn.createStatement(); + for (String sql : sqls) { + currentSql = sql; + DBUtil.executeSqlWithoutResultSet(stmt, sql); + } + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(null, stmt, null); + } + } + + public static void preCheckPrePareSQL( Keys options) { + String table = options.getTable(); + List preSqls = options.getPreSqlList(); + List renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls)); + for (String sql : renderedPreSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch ( ParserException e) { + throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } + + public static void preCheckPostSQL( Keys options) { + String table = options.getTable(); + List postSqls = options.getPostSqlList(); + List renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); + for(String sql : renderedPostSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch (ParserException e){ + throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java new file mode 100644 index 00000000..b44d5440 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + +/** + * doris data writer + */ +public class DorisWriter extends Writer { + + public static class Job extends Writer.Job { + + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration originalConfig = null; + private Keys options; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + options = new Keys (super.getPluginJobConf()); + options.doPretreatment(); + } + + @Override + public void preCheck(){ + this.init(); + DorisUtil.preCheckPrePareSQL(options); + DorisUtil.preCheckPostSQL(options); + } + + @Override + public void prepare() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPreSqls = DorisUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable()); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl); + DorisUtil.executeSqls(conn, renderedPreSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(originalConfig); + } + return configurations; + } + + @Override + public void post() { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); + LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + DorisUtil.executeSqls(conn, renderedPostSqls); + DBUtil.closeDBResources(null, null, conn); + } + } + + @Override + public void destroy() { + } + + } + + public static class Task extends Writer.Task { + private DorisWriterManager writerManager; + private Keys options; + private DorisCodec rowCodec; + + @Override + public void init() { + options = new Keys (super.getPluginJobConf()); + if (options.isWildcardColumn()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword()); + List columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable()); + options.setInfoCchemaColumns(columns); + } + writerManager = new DorisWriterManager(options); + rowCodec = DorisCodecFactory.createCodec(options); + } + + @Override + public void prepare() { + } + + public void startWrite(RecordReceiver recordReceiver) { + try { + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != options.getColumns().size()) { + throw DataXException + .asDataXException( + DBUtilErrorCode.CONF_ERROR, + String.format( + "There is an error in the column configuration information. " + + "This is because you have configured a task where the number of fields to be read from the source:%s " + + "is not equal to the number of fields to be written to the destination table:%s. " + + "Please check your configuration and make changes.", + record.getColumnNumber(), + options.getColumns().size())); + } + writerManager.writeRecord(rowCodec.codec(record)); + } + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void post() { + try { + writerManager.close(); + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + + @Override + public void destroy() {} + + @Override + public boolean supportFailOver(){ + return false; + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java new file mode 100644 index 00000000..7797d79f --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java @@ -0,0 +1,29 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.io.IOException; +import java.util.Map; + +public class DorisWriterExcetion extends IOException { + + private final Map response; + private boolean reCreateLabel; + + public DorisWriterExcetion ( String message, Map response) { + super(message); + this.response = response; + } + + public DorisWriterExcetion ( String message, Map response, boolean reCreateLabel) { + super(message); + this.response = response; + this.reCreateLabel = reCreateLabel; + } + + public Map getFailedResponse() { + return response; + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java new file mode 100644 index 00000000..f0ba6b52 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java @@ -0,0 +1,192 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.google.common.base.Strings; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class DorisWriterManager { + + private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class); + + private final DorisStreamLoadObserver visitor; + private final Keys options; + private final List buffer = new ArrayList<> (); + private int batchCount = 0; + private long batchSize = 0; + private volatile boolean closed = false; + private volatile Exception flushException; + private final LinkedBlockingDeque< WriterTuple > flushQueue; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + + public DorisWriterManager( Keys options) { + this.options = options; + this.visitor = new DorisStreamLoadObserver (options); + flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); + this.startScheduler(); + this.startAsyncFlushing(); + } + + public void startScheduler() { + stopScheduler(); + this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build()); + this.scheduledFuture = this.scheduler.schedule(() -> { + synchronized (DorisWriterManager.this) { + if (!closed) { + try { + String label = createBatchLabel(); + LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label)); + if (batchCount == 0) { + startScheduler(); + } + flush(label, false); + } catch (Exception e) { + flushException = e; + } + } + } + }, options.getFlushInterval(), TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + } + + public final synchronized void writeRecord(String record) throws IOException { + checkFlushException(); + try { + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + buffer.add(bts); + batchCount++; + batchSize += bts.length; + if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) { + String label = createBatchLabel(); + LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); + flush(label, false); + } + } catch (Exception e) { + throw new IOException("Writing records to Doris failed.", e); + } + } + + public synchronized void flush(String label, boolean waitUtilDone) throws Exception { + checkFlushException(); + if (batchCount == 0) { + if (waitUtilDone) { + waitAsyncFlushingDone(); + } + return; + } + flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer))); + if (waitUtilDone) { + // wait the last flush + waitAsyncFlushingDone(); + } + buffer.clear(); + batchCount = 0; + batchSize = 0; + } + + public synchronized void close() { + if (!closed) { + closed = true; + try { + String label = createBatchLabel(); + if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label)); + flush(label, true); + } catch (Exception e) { + throw new RuntimeException("Writing records to Doris failed.", e); + } + } + checkFlushException(); + } + + public String createBatchLabel() { + StringBuilder sb = new StringBuilder(); + if (! Strings.isNullOrEmpty(options.getLabelPrefix())) { + sb.append(options.getLabelPrefix()); + } + return sb.append(UUID.randomUUID().toString()) + .toString(); + } + + private void startAsyncFlushing() { + // start flush thread + Thread flushThread = new Thread(new Runnable(){ + public void run() { + while(true) { + try { + asyncFlush(); + } catch (Exception e) { + flushException = e; + } + } + } + }); + flushThread.setDaemon(true); + flushThread.start(); + } + + private void waitAsyncFlushingDone() throws InterruptedException { + // wait previous flushings + for (int i = 0; i <= options.getFlushQueueLength(); i++) { + flushQueue.put(new WriterTuple ("", 0l, null)); + } + checkFlushException(); + } + + private void asyncFlush() throws Exception { + WriterTuple flushData = flushQueue.take(); + if (Strings.isNullOrEmpty(flushData.getLabel())) { + return; + } + stopScheduler(); + LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + for (int i = 0; i <= options.getMaxRetries(); i++) { + try { + // flush to Doris with stream load + visitor.streamLoad(flushData); + LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); + startScheduler(); + break; + } catch (Exception e) { + LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e); + if (i >= options.getMaxRetries()) { + throw new IOException(e); + } + if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); + flushData.setLabel(newLabel); + } + try { + Thread.sleep(1000l * Math.min(i + 1, 10)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Unable to flush, interrupted while doing another attempt", e); + } + } + } + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to Doris failed.", flushException); + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java new file mode 100644 index 00000000..01c0e3c6 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java @@ -0,0 +1,177 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class Keys implements Serializable { + + private static final long serialVersionUID = 1l; + private static final int MAX_RETRIES = 3; + private static final int BATCH_ROWS = 500000; + private static final long DEFAULT_FLUSH_INTERVAL = 30000; + + private static final String LOAD_PROPS_FORMAT = "format"; + public enum StreamLoadFormat { + CSV, JSON; + } + + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String DATABASE = "connection[0].selectedDatabase"; + private static final String TABLE = "connection[0].table[0]"; + private static final String COLUMN = "column"; + private static final String PRE_SQL = "preSql"; + private static final String POST_SQL = "postSql"; + private static final String JDBC_URL = "connection[0].jdbcUrl"; + private static final String LABEL_PREFIX = "labelPrefix"; + private static final String MAX_BATCH_ROWS = "maxBatchRows"; + private static final String MAX_BATCH_SIZE = "maxBatchSize"; + private static final String FLUSH_INTERVAL = "flushInterval"; + private static final String LOAD_URL = "loadUrl"; + private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength"; + private static final String LOAD_PROPS = "loadProps"; + + private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; + + private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M + + private final Configuration options; + + private List infoSchemaColumns; + private List userSetColumns; + private boolean isWildcardColumn; + + public Keys ( Configuration options) { + this.options = options; + this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) { + this.isWildcardColumn = true; + } + } + + public void doPretreatment() { + validateRequired(); + validateStreamLoadUrl(); + } + + public String getJdbcUrl() { + return options.getString(JDBC_URL); + } + + public String getDatabase() { + return options.getString(DATABASE); + } + + public String getTable() { + return options.getString(TABLE); + } + + public String getUsername() { + return options.getString(USERNAME); + } + + public String getPassword() { + return options.getString(PASSWORD); + } + + public String getLabelPrefix() { + String label = options.getString(LABEL_PREFIX); + return null == label ? DEFAULT_LABEL_PREFIX : label; + } + + public List getLoadUrlList() { + return options.getList(LOAD_URL, String.class); + } + + public List getColumns() { + if (isWildcardColumn) { + return this.infoSchemaColumns; + } + return this.userSetColumns; + } + + public boolean isWildcardColumn() { + return this.isWildcardColumn; + } + + public void setInfoCchemaColumns(List cols) { + this.infoSchemaColumns = cols; + } + + public List getPreSqlList() { + return options.getList(PRE_SQL, String.class); + } + + public List getPostSqlList() { + return options.getList(POST_SQL, String.class); + } + + public Map getLoadProps() { + return options.getMap(LOAD_PROPS); + } + + public int getMaxRetries() { + return MAX_RETRIES; + } + + public int getBatchRows() { + Integer rows = options.getInt(MAX_BATCH_ROWS); + return null == rows ? BATCH_ROWS : rows; + } + + public long getBatchSize() { + Long size = options.getLong(MAX_BATCH_SIZE); + return null == size ? DEFAULT_MAX_BATCH_SIZE : size; + } + + public long getFlushInterval() { + Long interval = options.getLong(FLUSH_INTERVAL); + return null == interval ? DEFAULT_FLUSH_INTERVAL : interval; + } + + public int getFlushQueueLength() { + Integer len = options.getInt(FLUSH_QUEUE_LENGTH); + return null == len ? 1 : len; + } + + public StreamLoadFormat getStreamLoadFormat() { + Map loadProps = getLoadProps(); + if (null == loadProps) { + return StreamLoadFormat.CSV; + } + if (loadProps.containsKey(LOAD_PROPS_FORMAT) + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) { + return StreamLoadFormat.JSON; + } + return StreamLoadFormat.CSV; + } + + private void validateStreamLoadUrl() { + List urlList = getLoadUrlList(); + for (String host : urlList) { + if (host.split(":").length < 2) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`]."); + } + } + } + + private void validateRequired() { + final String[] requiredOptionKeys = new String[]{ + USERNAME, + DATABASE, + TABLE, + COLUMN, + LOAD_URL + }; + for (String optionKey : requiredOptionKeys) { + options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java new file mode 100644 index 00000000..32e0b341 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.List; + +public class WriterTuple { + private String label; + private Long bytes; + private List rows; + + public WriterTuple ( String label, Long bytes, List rows){ + this.label = label; + this.rows = rows; + this.bytes = bytes; + } + + public String getLabel() { return label; } + public void setLabel(String label) { this.label = label; } + public Long getBytes() { return bytes; } + public List getRows() { return rows; } +} diff --git a/doriswriter/src/main/resources/plugin.json b/doriswriter/src/main/resources/plugin.json new file mode 100644 index 00000000..69dc31a2 --- /dev/null +++ b/doriswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "doriswriter", + "class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter", + "description": "apache doris writer plugin", + "developer": "apche doris" +} diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..0187e539 --- /dev/null +++ b/doriswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,20 @@ +{ + "name": "doriswriter", + "parameter": { + "username": "", + "password": "", + "column": [], + "preSql": [], + "postSql": [], + "beLoadUrl": [], + "loadUrl": [], + "loadProps": {}, + "connection": [ + { + "jdbcUrl": "", + "selectedDatabase": "", + "table": [] + } + ] + } +} \ No newline at end of file diff --git a/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java new file mode 100644 index 00000000..bbb60a0e --- /dev/null +++ b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Maps; +import com.alibaba.datax.common.util.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class TestDorisWriterLoad { + + + // for test + public static void main(String[] args) throws IOException { + /** + * 下面示例使用的建表语句,要首先有一套Ddoris的环境,创建数据库demo,然后使用下面的建表语句创建表 + * 修改feLoadUrl中的IP地址,username,password,然后运行 + * CREATE TABLE `doris_test` ( + * `k1` varchar(30) NULL , + * `k2` varchar(255) NULL, + * `k3` varchar(200) + * ) ENGINE=OLAP + * Duplicate KEY(k1) + * COMMENT "OLAP" + * DISTRIBUTED BY HASH(k1) BUCKETS 1 + * PROPERTIES ( + * "replication_allocation" = "tag.location.default: 1", + * "in_memory" = "false", + * "storage_format" = "V2" + * ) + */ + String json = "{\n" + + " \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" + + " \"column\": [\"k1\", \"k2\", \"k3\"],\n" + + " \"database\": \"demo\",\n" + + " \"jdbcUrl\": \"\",\n" + + " \"loadProps\": {},\n" + + " \"password\": \"12345\",\n" + + " \"postSql\": [],\n" + + " \"preSql\": [],\n" + + " \"table\": \"doris_test\",\n" + + " \"username\": \"root\"\n" + + "}"; + Configuration configuration = Configuration.from(json); + Key key = new Key(configuration); + + DorisWriterEmitter emitter = new DorisWriterEmitter(key); + DorisFlushBatch flushBatch = new DorisFlushBatch("\n","csv"); + flushBatch.setLabel("test4"); + Map row1 = Maps.newHashMap(); + row1.put("k1", "2021-02-02"); + row1.put("k2", "2021-02-02 00:00:00"); + row1.put("k3", "3"); + String rowStr1 = JSON.toJSONString(row1); + System.out.println("rows1: " + rowStr1); + flushBatch.putData(rowStr1); + + Map row2 = Maps.newHashMap(); + row2.put("k1", "2021-02-03"); + row2.put("k2", "2021-02-03 00:00:00"); + row2.put("k3", "4"); + String rowStr2 = JSON.toJSONString(row2); + System.out.println("rows2: " + rowStr2); + flushBatch.putData(rowStr2); + + for (int i = 0; i < 50000; ++i) { + flushBatch.putData(rowStr2); + } + emitter.emit (flushBatch); + } +} diff --git a/package.xml b/package.xml index acf8f79e..96b52c30 100755 --- a/package.xml +++ b/package.xml @@ -266,6 +266,13 @@ datax + + doriswriter/target/datax/ + + **/*.* + + datax + txtfilewriter/target/datax/ diff --git a/pom.xml b/pom.xml index 727357b1..72442b1e 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,6 @@ hbase11xwriter hbase094xwriter hbase11xsqlwriter - hbase20xsqlwriter kuduwriter ftpwriter hdfswriter @@ -123,6 +122,10 @@ plugin-rdbms-util plugin-unstructured-storage-util + hbase20xsqlreader + hbase20xsqlwriter + kuduwriter + doriswriter