test case and Performance test report

test case and Performance test report
This commit is contained in:
jiafeng.zhang 2022-01-25 23:37:20 +08:00
parent 7ac4301fc5
commit 9e3e08987d
2 changed files with 166 additions and 14 deletions

View File

@ -9,12 +9,60 @@ DorisWriter是一个支持将大批量数据写入Doris中的一个插件
Doris是完全支持Mysql协议的所以如果你需要读取Doris的数据可以使用mysqlreader插件完成这里不单独提供Doris数据读取插件。
## 2 实现原理
## 2.支持版本
DorisWriter目前支持的Doris版本如下
| Doris版本号 | 说明 |
| -------------------- | ----------------------------------------------- |
| Apahce Doris 0.13.0 | 包括百度发行版palo 0.13.15 |
| Apache Doris 0.14.0 | 包括百度发行版palo 0.14.7、0.14.12.1、0.14.13.1 |
| Apache Doris 0.15.0 | 包括百度发行版palo 0.15.1 RC09 |
| Apahce Doris后续版本 | |
大家在使用过程中如果遇到什么问题可以通过邮件或者在Doris的[Issues · apache/incubator-doris](https://github.com/apache/incubator-doris/issues)上提问我们会及时解决或者可以给Doris的开发组邮箱发送邮件dev@doris.apache.org我们也会及时查看回复及修复。
## 3 实现原理
DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter会将`reader`读取的数据进行缓存在内存中拼接成Json文本然后批量导入至Doris。
## 3 功能说明
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
### 3.1 配置样例
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据
Stream Load的数据导入流程如下
```text
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
```
Stream load 中Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
## 4 功能说明
### 4.1 配置样例
这里是一份从Stream读取数据后导入至Doris的配置文件。
@ -83,7 +131,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
}
```
### 3.2 参数说明
### 4.2 参数说明
* **jdbcUrl**
@ -147,37 +195,53 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
* **maxBatchRows**
- 描述:每批次导入数据的最大行数。和 **maxBatchByteSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值500000
* **maxBatchByteSize**
- 描述:每批次导入数据的最大数据量。和 ** maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值104857600
* **labelPrefix**
- 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID + 序号` 组成
- 必选:否
- 默认值:`datax_doris_writer_`
* **lineDelimiter**
- 描述:每批次数据包含多行,每行为 Json 格式,每行的的分隔符即为 lineDelimiter。支持多个字节, 例如'\x02\x03'。
- 必选:否
- 默认值:`\n`
* **loadProps**
- 描述StreamLoad 的请求参数详情参照StreamLoad介绍页面。
- 必选:否
- 默认值:无
* **connectTimeout**
- 描述StreamLoad单次请求的超时时间, 单位毫秒(ms)。
- 必选:否
- 默认值:-1
### 4.3 doriswriter插件的约束与限制
DorisWriter是借助于Apache Doris提供的Stream Load方式来实现数据导入为了避免频繁的数据插入引发Doris写失败建议批到的方式具体参照[常见报错 | Apache Doris](https://doris.apache.org/zh-CN/faq/error.html#e3-tablet-writer-write-failed-tablet-id-27306172-txn-id-28573520-err-235-or-215-or-238),建议将参数列表中的下列参数设大,下面给出建议值:
1. maxBatchRows10000表示每10000条提交导入一次如果你的数据量没那么可以适当调小
2. maxBatchByteSize这个参数表示你每个批到导入数据量大的大小具体值=maxBatchRows * 单条记录的大小如果一个批次导入的数据量大小超过这个值将被阻塞导入导入数据格式是JSON格式所以这个值可以适当放大通过上面的导入记录数来控制每个批次导入的数据量就可以了
3. column这个要和你在Doris里建表的字段顺序一致。
## 4.性能测试
下面是通过读取Mysql数据表的数据插入到Doris进行的性能测试结果仅供参考
```
2022-01-25 23:32:53.638 [job-0] INFO JobContainer - PerfTrace not enable!
2022-01-25 23:32:53.638 [job-0] INFO StandAloneJobContainerCommunicator - Total 2000000 records, 80888896 bytes | Speed 3.86MB/s, 100000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 14.270s | All Task WaitReaderTime 0.147s | Percentage 100.00%
2022-01-25 23:32:53.639 [job-0] INFO JobContainer -
任务启动时刻 : 2022-01-25 23:32:33
任务结束时刻 : 2022-01-25 23:32:53
任务总计耗时 : 20s
任务平均流量 : 3.86MB/s
记录写入速度 : 100000rec/s
读出记录总数 : 2000000
读写失败总数 : 0
```

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
import com.alibaba.datax.common.util.Configuration;
import java.io.IOException;
import java.util.Map;
public class TestDorisWriterLoad {
// for test
public static void main(String[] args) throws IOException {
/**
* 下面示例使用的建表语句要首先有一套Ddoris的环境创建数据库demo然后使用下面的建表语句创建表
* 修改feLoadUrl中的IP地址usernamepassword然后运行
* CREATE TABLE `doris_test` (
* `k1` varchar(30) NULL ,
* `k2` varchar(255) NULL,
* `k3` varchar(200)
* ) ENGINE=OLAP
* Duplicate KEY(k1)
* COMMENT "OLAP"
* DISTRIBUTED BY HASH(k1) BUCKETS 1
* PROPERTIES (
* "replication_allocation" = "tag.location.default: 1",
* "in_memory" = "false",
* "storage_format" = "V2"
* )
*/
String json = "{\n" +
" \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" +
" \"column\": [\"k1\", \"k2\", \"k3\"],\n" +
" \"database\": \"demo\",\n" +
" \"jdbcUrl\": \"\",\n" +
" \"loadProps\": {},\n" +
" \"password\": \"12345\",\n" +
" \"postSql\": [],\n" +
" \"preSql\": [],\n" +
" \"table\": \"doris_test\",\n" +
" \"username\": \"root\"\n" +
"}";
Configuration configuration = Configuration.from(json);
Key key = new Key(configuration);
DorisWriterEmitter emitter = new DorisWriterEmitter(key);
DorisFlushBatch flushBatch = new DorisFlushBatch("\n");
flushBatch.setLabel("test4");
Map<String, String> row1 = Maps.newHashMap();
row1.put("k1", "2021-02-02");
row1.put("k2", "2021-02-02 00:00:00");
row1.put("k3", "3");
String rowStr1 = JSON.toJSONString(row1);
System.out.println("rows1: " + rowStr1);
flushBatch.putData(rowStr1);
Map<String, String> row2 = Maps.newHashMap();
row2.put("k1", "2021-02-03");
row2.put("k2", "2021-02-03 00:00:00");
row2.put("k3", "4");
String rowStr2 = JSON.toJSONString(row2);
System.out.println("rows2: " + rowStr2);
flushBatch.putData(rowStr2);
for (int i = 0; i < 50000; ++i) {
flushBatch.putData(rowStr2);
}
emitter.doStreamLoad(flushBatch);
}
}