From 9e3e08987dceeae8d5b825404f2eb15d21bf3527 Mon Sep 17 00:00:00 2001 From: "jiafeng.zhang" Date: Tue, 25 Jan 2022 23:37:20 +0800 Subject: [PATCH] test case and Performance test report test case and Performance test report --- doriswriter/doc/doriswriter.md | 92 ++++++++++++++++--- .../doriswriter/TestDorisWriterLoad.java | 88 ++++++++++++++++++ 2 files changed, 166 insertions(+), 14 deletions(-) create mode 100644 doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index 7a072620..fedc2171 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -9,12 +9,60 @@ DorisWriter是一个支持将大批量数据写入Doris中的一个插件,可 Doris是完全支持Mysql协议的,所以如果你需要读取Doris的数据,可以使用mysqlreader插件完成,这里不单独提供Doris数据读取插件。 -## 2 实现原理 +## 2.支持版本 + +DorisWriter目前支持的Doris版本如下: + +| Doris版本号 | 说明 | +| -------------------- | ----------------------------------------------- | +| Apahce Doris 0.13.0 | 包括百度发行版palo 0.13.15 | +| Apache Doris 0.14.0 | 包括百度发行版palo 0.14.7、0.14.12.1、0.14.13.1 | +| Apache Doris 0.15.0 | 包括百度发行版palo 0.15.1 RC09 | +| Apahce Doris后续版本 | | + +大家在使用过程中如果遇到什么问题,可以通过邮件或者在Doris的[Issues · apache/incubator-doris](https://github.com/apache/incubator-doris/issues)上提问,我们会及时解决,或者可以给Doris的开发组邮箱发送邮件:dev@doris.apache.org,我们也会及时查看回复及修复。 + +## 3 实现原理 DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter会将`reader`读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。 -## 3 功能说明 +Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。 -### 3.1 配置样例 +Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据 + +Stream Load的数据导入流程如下: + +```text + ^ + + | | + | | 1A. User submit load to FE + | | + | +--v-----------+ + | | FE | +5. Return result to user | +--+-----------+ + | | + | | 2. Redirect to BE + | | + | +--v-----------+ + +---+Coordinator BE| 1B. User submit load to BE + +-+-----+----+-+ + | | | + +-----+ | +-----+ + | | | 3. Distrbute data + | | | + +-v-+ +-v-+ +-v-+ + |BE | |BE | |BE | + +---+ +---+ +---+ +``` + +Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。 + +用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。 + +导入的最终结果由 Coordinator BE 返回给用户。 + +## 4 功能说明 + +### 4.1 配置样例 这里是一份从Stream读取数据后导入至Doris的配置文件。 @@ -83,7 +131,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter } ``` -### 3.2 参数说明 +### 4.2 参数说明 * **jdbcUrl** @@ -147,37 +195,53 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **maxBatchRows** - - 描述:每批次导入数据的最大行数。和 **maxBatchByteSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 - 必选:否 - 默认值:500000 - * **maxBatchByteSize** - - 描述:每批次导入数据的最大数据量。和 ** maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 - 必选:否 - 默认值:104857600 - * **labelPrefix** - - 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID + 序号` 组成 - 必选:否 - 默认值:`datax_doris_writer_` - * **lineDelimiter** - - 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。支持多个字节, 例如'\x02\x03'。 - 必选:否 - 默认值:`\n` - * **loadProps** - - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。 - 必选:否 - 默认值:无 - * **connectTimeout** - 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。 - 必选:否 - 默认值:-1 + +### 4.3 doriswriter插件的约束与限制 + +DorisWriter是借助于Apache Doris提供的Stream Load方式来实现数据导入,为了避免频繁的数据插入引发Doris写失败,建议批到的方式,具体参照[常见报错 | Apache Doris](https://doris.apache.org/zh-CN/faq/error.html#e3-tablet-writer-write-failed-tablet-id-27306172-txn-id-28573520-err-235-or-215-or-238),建议将参数列表中的下列参数设大,下面给出建议值: + +1. maxBatchRows:10000,表示每10000条提交导入一次,如果你的数据量没那么可以适当调小 +2. maxBatchByteSize:这个参数表示你每个批到导入数据量大的大小,具体值=maxBatchRows * 单条记录的大小,如果一个批次导入的数据量大小超过这个值将被阻塞导入,导入数据格式是JSON格式所以这个值可以适当放大,通过上面的导入记录数来控制每个批次导入的数据量就可以了 +3. column:这个要和你在Doris里建表的字段顺序一致。 + +## 4.性能测试 + +下面是通过读取Mysql数据表的数据,插入到Doris进行的性能测试结果,仅供参考 + +``` +2022-01-25 23:32:53.638 [job-0] INFO JobContainer - PerfTrace not enable! +2022-01-25 23:32:53.638 [job-0] INFO StandAloneJobContainerCommunicator - Total 2000000 records, 80888896 bytes | Speed 3.86MB/s, 100000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 14.270s | All Task WaitReaderTime 0.147s | Percentage 100.00% +2022-01-25 23:32:53.639 [job-0] INFO JobContainer - +任务启动时刻 : 2022-01-25 23:32:33 +任务结束时刻 : 2022-01-25 23:32:53 +任务总计耗时 : 20s +任务平均流量 : 3.86MB/s +记录写入速度 : 100000rec/s +读出记录总数 : 2000000 +读写失败总数 : 0 +``` + diff --git a/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java new file mode 100644 index 00000000..35b6e3a4 --- /dev/null +++ b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Maps; +import com.alibaba.datax.common.util.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class TestDorisWriterLoad { + + + // for test + public static void main(String[] args) throws IOException { + /** + * 下面示例使用的建表语句,要首先有一套Ddoris的环境,创建数据库demo,然后使用下面的建表语句创建表 + * 修改feLoadUrl中的IP地址,username,password,然后运行 + * CREATE TABLE `doris_test` ( + * `k1` varchar(30) NULL , + * `k2` varchar(255) NULL, + * `k3` varchar(200) + * ) ENGINE=OLAP + * Duplicate KEY(k1) + * COMMENT "OLAP" + * DISTRIBUTED BY HASH(k1) BUCKETS 1 + * PROPERTIES ( + * "replication_allocation" = "tag.location.default: 1", + * "in_memory" = "false", + * "storage_format" = "V2" + * ) + */ + String json = "{\n" + + " \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" + + " \"column\": [\"k1\", \"k2\", \"k3\"],\n" + + " \"database\": \"demo\",\n" + + " \"jdbcUrl\": \"\",\n" + + " \"loadProps\": {},\n" + + " \"password\": \"12345\",\n" + + " \"postSql\": [],\n" + + " \"preSql\": [],\n" + + " \"table\": \"doris_test\",\n" + + " \"username\": \"root\"\n" + + "}"; + Configuration configuration = Configuration.from(json); + Key key = new Key(configuration); + + DorisWriterEmitter emitter = new DorisWriterEmitter(key); + DorisFlushBatch flushBatch = new DorisFlushBatch("\n"); + flushBatch.setLabel("test4"); + Map row1 = Maps.newHashMap(); + row1.put("k1", "2021-02-02"); + row1.put("k2", "2021-02-02 00:00:00"); + row1.put("k3", "3"); + String rowStr1 = JSON.toJSONString(row1); + System.out.println("rows1: " + rowStr1); + flushBatch.putData(rowStr1); + + Map row2 = Maps.newHashMap(); + row2.put("k1", "2021-02-03"); + row2.put("k2", "2021-02-03 00:00:00"); + row2.put("k3", "4"); + String rowStr2 = JSON.toJSONString(row2); + System.out.println("rows2: " + rowStr2); + flushBatch.putData(rowStr2); + + for (int i = 0; i < 50000; ++i) { + flushBatch.putData(rowStr2); + } + emitter.doStreamLoad(flushBatch); + } +}