mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 03:59:07 +08:00
245 lines
6.0 KiB
Markdown
245 lines
6.0 KiB
Markdown
# DataX ElasticSearchWriter
|
||
|
||
|
||
---
|
||
|
||
## 1 快速介绍
|
||
|
||
数据导入elasticsearch的插件
|
||
|
||
## 2 实现原理
|
||
|
||
使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch
|
||
|
||
## 3 功能说明
|
||
|
||
### 3.1 配置样例
|
||
|
||
#### job.json
|
||
|
||
```
|
||
{
|
||
"job": {
|
||
"setting": {
|
||
"speed": {
|
||
"channel": 1
|
||
}
|
||
},
|
||
"content": [
|
||
{
|
||
"reader": {
|
||
...
|
||
},
|
||
"writer": {
|
||
"name": "elasticsearchwriter",
|
||
"parameter": {
|
||
"endpoint": "http://xxx:9999",
|
||
"accessId": "xxxx",
|
||
"accessKey": "xxxx",
|
||
"index": "test-1",
|
||
"type": "default",
|
||
"cleanup": true,
|
||
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
|
||
"discovery": false,
|
||
"batchSize": 1000,
|
||
"splitter": ",",
|
||
"column": [
|
||
{"name": "pk", "type": "id"},
|
||
{ "name": "col_ip","type": "ip" },
|
||
{ "name": "col_double","type": "double" },
|
||
{ "name": "col_long","type": "long" },
|
||
{ "name": "col_integer","type": "integer" },
|
||
{ "name": "col_keyword", "type": "keyword" },
|
||
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
|
||
{ "name": "col_geo_point", "type": "geo_point" },
|
||
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
|
||
{ "name": "col_nested1", "type": "nested" },
|
||
{ "name": "col_nested2", "type": "nested" },
|
||
{ "name": "col_object1", "type": "object" },
|
||
{ "name": "col_object2", "type": "object" },
|
||
{ "name": "col_integer_array", "type":"integer", "array":true},
|
||
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
|
||
]
|
||
}
|
||
}
|
||
}
|
||
]
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.2 参数说明
|
||
|
||
* endpoint
|
||
* 描述:ElasticSearch的连接地址
|
||
* 必选:是
|
||
* 默认值:无
|
||
|
||
* accessId
|
||
* 描述:http auth中的user
|
||
* 必选:否
|
||
* 默认值:空
|
||
|
||
* accessKey
|
||
* 描述:http auth中的password
|
||
* 必选:否
|
||
* 默认值:空
|
||
|
||
* index
|
||
* 描述:elasticsearch中的index名
|
||
* 必选:是
|
||
* 默认值:无
|
||
|
||
* type
|
||
* 描述:elasticsearch中index的type名
|
||
* 必选:否
|
||
* 默认值:index名
|
||
|
||
* cleanup
|
||
* 描述:是否删除原表
|
||
* 必选:否
|
||
* 默认值:false
|
||
|
||
* batchSize
|
||
* 描述:每次批量数据的条数
|
||
* 必选:否
|
||
* 默认值:1000
|
||
|
||
* trySize
|
||
* 描述:失败后重试的次数
|
||
* 必选:否
|
||
* 默认值:30
|
||
|
||
* timeout
|
||
* 描述:客户端超时时间
|
||
* 必选:否
|
||
* 默认值:600000
|
||
|
||
* discovery
|
||
* 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。
|
||
* 必选:否
|
||
* 默认值:false
|
||
|
||
* compression
|
||
* 描述:http请求,开启压缩
|
||
* 必选:否
|
||
* 默认值:true
|
||
|
||
* multiThread
|
||
* 描述:http请求,是否有多线程
|
||
* 必选:否
|
||
* 默认值:true
|
||
|
||
* ignoreWriteError
|
||
* 描述:忽略写入错误,不重试,继续写入
|
||
* 必选:否
|
||
* 默认值:false
|
||
|
||
* ignoreParseError
|
||
* 描述:忽略解析数据格式错误,继续写入
|
||
* 必选:否
|
||
* 默认值:true
|
||
|
||
* alias
|
||
* 描述:数据导入完成后写入别名
|
||
* 必选:否
|
||
* 默认值:无
|
||
|
||
* aliasMode
|
||
* 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
|
||
* 必选:否
|
||
* 默认值:append
|
||
|
||
* settings
|
||
* 描述:创建index时候的settings, 与elasticsearch官方相同
|
||
* 必选:否
|
||
* 默认值:无
|
||
|
||
* splitter
|
||
* 描述:如果插入数据是array,就使用指定分隔符
|
||
* 必选:否
|
||
* 默认值:-,-
|
||
|
||
* column
|
||
* 描述:elasticsearch所支持的字段类型,样例中包含了全部
|
||
* 必选:是
|
||
|
||
* dynamic
|
||
* 描述: 不使用datax的mappings,使用es自己的自动mappings
|
||
* 必选: 否
|
||
* 默认值: false
|
||
|
||
|
||
|
||
## 4 性能报告
|
||
|
||
### 4.1 环境准备
|
||
|
||
* 总数据量 1kw条数据, 每条0.1kb
|
||
* 1个shard, 0个replica
|
||
* 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升
|
||
|
||
#### 4.1.1 输入数据类型(streamreader)
|
||
|
||
```
|
||
{"value": "1.1.1.1", "type": "string"},
|
||
{"value": 19890604.0, "type": "double"},
|
||
{"value": 19890604, "type": "long"},
|
||
{"value": 19890604, "type": "long"},
|
||
{"value": "hello world", "type": "string"},
|
||
{"value": "hello world", "type": "string"},
|
||
{"value": "41.12,-71.34", "type": "string"},
|
||
{"value": "2017-05-25", "type": "string"},
|
||
```
|
||
|
||
#### 4.1.2 输出数据类型(eswriter)
|
||
|
||
```
|
||
{ "name": "col_ip","type": "ip" },
|
||
{ "name": "col_double","type": "double" },
|
||
{ "name": "col_long","type": "long" },
|
||
{ "name": "col_integer","type": "integer" },
|
||
{ "name": "col_keyword", "type": "keyword" },
|
||
{ "name": "col_text", "type": "text"},
|
||
{ "name": "col_geo_point", "type": "geo_point" },
|
||
{ "name": "col_date", "type": "date"}
|
||
```
|
||
|
||
#### 4.1.2 机器参数
|
||
|
||
1. cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
|
||
2. mem: 128G
|
||
3. net: 千兆双网卡
|
||
|
||
#### 4.1.3 DataX jvm 参数
|
||
|
||
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
|
||
|
||
### 4.2 测试报告
|
||
|
||
| 通道数| 批量提交行数| DataX速度(Rec/s)|DataX流量(MB/s)|
|
||
|--------|--------| --------|--------|
|
||
| 4| 256| 11013| 0.828|
|
||
| 4| 1024| 19417| 1.43|
|
||
| 4| 4096| 23923| 1.76|
|
||
| 4| 8172| 24449| 1.80|
|
||
| 8| 256| 21459| 1.58|
|
||
| 8| 1024| 37037| 2.72|
|
||
| 8| 4096| 45454| 3.34|
|
||
| 8| 8172| 45871| 3.37|
|
||
| 16| 1024| 67567| 4.96|
|
||
| 16| 4096| 78125| 5.74|
|
||
| 16| 8172| 77519| 5.69|
|
||
| 32| 1024| 94339| 6.93|
|
||
| 32| 4096| 96153| 7.06|
|
||
| 64| 1024| 91743| 6.74|
|
||
|
||
### 4.3 测试总结
|
||
|
||
* 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
|
||
* 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展
|
||
|
||
## 5 约束限制
|
||
|
||
* 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
|
||
* 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的) |