Uniform configuration naming, modify maxBatchSize to batchSize, the

Uniform configuration naming, modify maxBatchSize to batchSize, the
This commit is contained in:
jiafeng.zhang 2022-10-11 11:57:51 +08:00
parent fb8627ba96
commit b72c57e7ac
3 changed files with 2 additions and 90 deletions

View File

@ -129,7 +129,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter
- 必选:否
- 默认值500000
* **maxBatchSize**
* **batchSize**
- 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否

View File

@ -31,7 +31,7 @@ public class Keys implements Serializable {
private static final String JDBC_URL = "connection[0].jdbcUrl";
private static final String LABEL_PREFIX = "labelPrefix";
private static final String MAX_BATCH_ROWS = "maxBatchRows";
private static final String MAX_BATCH_SIZE = "maxBatchSize";
private static final String MAX_BATCH_SIZE = "batchSize";
private static final String FLUSH_INTERVAL = "flushInterval";
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";

View File

@ -1,88 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
import com.alibaba.datax.common.util.Configuration;
import java.io.IOException;
import java.util.Map;
public class TestDorisWriterLoad {
// for test
public static void main(String[] args) throws IOException {
/**
* 下面示例使用的建表语句要首先有一套Ddoris的环境创建数据库demo然后使用下面的建表语句创建表
* 修改feLoadUrl中的IP地址usernamepassword然后运行
* CREATE TABLE `doris_test` (
* `k1` varchar(30) NULL ,
* `k2` varchar(255) NULL,
* `k3` varchar(200)
* ) ENGINE=OLAP
* Duplicate KEY(k1)
* COMMENT "OLAP"
* DISTRIBUTED BY HASH(k1) BUCKETS 1
* PROPERTIES (
* "replication_allocation" = "tag.location.default: 1",
* "in_memory" = "false",
* "storage_format" = "V2"
* )
*/
String json = "{\n" +
" \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" +
" \"column\": [\"k1\", \"k2\", \"k3\"],\n" +
" \"database\": \"demo\",\n" +
" \"jdbcUrl\": \"\",\n" +
" \"loadProps\": {},\n" +
" \"password\": \"12345\",\n" +
" \"postSql\": [],\n" +
" \"preSql\": [],\n" +
" \"table\": \"doris_test\",\n" +
" \"username\": \"root\"\n" +
"}";
Configuration configuration = Configuration.from(json);
Key key = new Key(configuration);
DorisWriterEmitter emitter = new DorisWriterEmitter(key);
DorisFlushBatch flushBatch = new DorisFlushBatch("\n","csv");
flushBatch.setLabel("test4");
Map<String, String> row1 = Maps.newHashMap();
row1.put("k1", "2021-02-02");
row1.put("k2", "2021-02-02 00:00:00");
row1.put("k3", "3");
String rowStr1 = JSON.toJSONString(row1);
System.out.println("rows1: " + rowStr1);
flushBatch.putData(rowStr1);
Map<String, String> row2 = Maps.newHashMap();
row2.put("k1", "2021-02-03");
row2.put("k2", "2021-02-03 00:00:00");
row2.put("k3", "4");
String rowStr2 = JSON.toJSONString(row2);
System.out.println("rows2: " + rowStr2);
flushBatch.putData(rowStr2);
for (int i = 0; i < 50000; ++i) {
flushBatch.putData(rowStr2);
}
emitter.emit (flushBatch);
}
}