Merge pull request #1280 from hf200012/doriswriter-1

[New Feature]Add support for writing data to Apache Doris
This commit is contained in:
Trafalgar 2022-10-11 11:01:30 +08:00 committed by GitHub
commit dd8310f915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1590 additions and 2 deletions

View File

@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5</version>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>

View File

@ -0,0 +1,181 @@
# DorisWriter 插件文档
## 1 快速介绍
DorisWriter支持将大批量数据写入Doris中。
## 2 实现原理
DorisWriter 通过Doris原生支持Stream load方式导入数据 DorisWriter会将`reader`读取的数据进行缓存在内存中拼接成Json文本然后批量导入至Doris。
## 3 功能说明
### 3.1 配置样例
这里是一份从Stream读取数据后导入至Doris的配置文件。
```
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
"table": ["employees_1"]
}
],
"username": "root",
"password": "xxxxx",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["172.16.0.13:8030"],
"loadProps": {
},
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"username": "root",
"password": "xxxxxx",
"postSql": ["select count(1) from all_employees_info"],
"preSql": [],
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
"selectedDatabase": "demo",
"table": ["all_employees_info"]
}
],
"loadProps": {
"format": "json",
"strip_outer_array": true
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
```
### 3.2 参数说明
* **jdbcUrl**
- 描述Doris 的 JDBC 连接串,用户执行 preSql 或 postSQL。
- 必选:是
- 默认值:无
* **loadUrl**
- 描述:作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IPport 是 FE 节点的 http_port。可以填写多个多个之间使用英文状态的分号隔开:`;`doriswriter 将以轮询的方式访问。
- 必选:是
- 默认值:无
* **username**
- 描述访问Doris数据库的用户名
- 必选:是
- 默认值:无
* **password**
- 描述访问Doris数据库的密码
- 必选:否
- 默认值:空
* **connection.selectedDatabase**
- 描述需要写入的Doris数据库名称。
- 必选:是
- 默认值:无
* **connection.table**
- 描述需要写入的Doris表名称。
- 必选:是
- 默认值:无
* **column**
- 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
- 必选:是
- 默认值:否
* **preSql**
- 描述:写入数据到目的表前,会先执行这里的标准语句。
- 必选:否
- 默认值:无
* **postSql**
- 描述:写入数据到目的表后,会执行这里的标准语句。
- 必选:否
- 默认值:无
* **maxBatchRows**
- 描述:每批次导入数据的最大行数。和 **maxBatchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值500000
* **maxBatchSize**
- 描述:每批次导入数据的最大数据量。和 **maxBatchRows** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否
- 默认值104857600
* **maxRetries**
- 描述:每批次导入数据失败后的重试次数。
- 必选:否
- 默认值0
* **labelPrefix**
- 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label确保数据不会重复导入
- 必选:否
- 默认值:`datax_doris_writer_`
* **loadProps**
- 描述StreamLoad 的请求参数详情参照StreamLoad介绍页面。[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
这里包括导入的数据格式format等导入数据格式默认我们使用csv支持JSON具体可以参照下面类型转换部分也可以参照上面Stream load 官方信息
- 必选:否
- 默认值:无
### 类型转换
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
默认是csv格式导入如需更改列分隔符 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
```
如需更改导入格式为`json` 则正确配置 `loadProps` 即可:
```json
"loadProps": {
"format": "json",
"strip_outer_array": true
}
```
更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)

View File

@ -0,0 +1,46 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["k1", "k2", "k3"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.10.10:3306/db1"],
"table": ["t1"]
}
],
"username": "root",
"password": "",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["192.168.1.1:8030"],
"loadProps": {},
"database": "db1",
"column": ["k1", "k2", "k3"],
"username": "root",
"password": "",
"postSql": [],
"preSql": [],
"connection": [
"jdbcUrl":"jdbc:mysql://192.168.1.1:9030/",
"table":["xxx"],
"selectedDatabase":"xxxx"
]
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}

99
doriswriter/pom.xml Normal file
View File

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>doriswriter</artifactId>
<name>doriswriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id/>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/doriswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>doriswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/doriswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/doriswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,54 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.google.common.base.Strings;
import java.io.StringWriter;
public class DelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp, String dSp) throws RuntimeException {
if ( Strings.isNullOrEmpty(sp)) {
return dSp;
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private static byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private static byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}

View File

@ -0,0 +1,23 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;
public class DorisBaseCodec {
protected String convertionField( Column col) {
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
return null;
}
if ( Column.Type.BOOL == col.getType()) {
return String.valueOf(col.asLong());
}
if ( Column.Type.BYTES == col.getType()) {
byte[] bts = (byte[])col.getRawData();
long value = 0;
for (int i = 0; i < bts.length; i++) {
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
}
return String.valueOf(value);
}
return col.asString();
}
}

View File

@ -0,0 +1,10 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import java.io.Serializable;
public interface DorisCodec extends Serializable {
String codec( Record row);
}

View File

@ -0,0 +1,19 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import java.util.Map;
public class DorisCodecFactory {
public DorisCodecFactory (){
}
public static DorisCodec createCodec( Keys writerOptions) {
if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
}
if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new DorisJsonCodec (writerOptions.getColumns());
}
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}

View File

@ -0,0 +1,27 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec {
private static final long serialVersionUID = 1L;
private final String columnSeparator;
public DorisCsvCodec ( String sp) {
this.columnSeparator = DelimiterParser.parse(sp, "\t");
}
@Override
public String codec( Record row) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < row.getColumnNumber(); i++) {
String value = convertionField(row.getColumn(i));
sb.append(null == value ? "\\N" : value);
if (i < row.getColumnNumber() - 1) {
sb.append(columnSeparator);
}
}
return sb.toString();
}
}

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DorisJsonCodec extends DorisBaseCodec implements DorisCodec {
private static final long serialVersionUID = 1L;
private final List<String> fieldNames;
public DorisJsonCodec ( List<String> fieldNames) {
this.fieldNames = fieldNames;
}
@Override
public String codec( Record row) {
if (null == fieldNames) {
return "";
}
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
int idx = 0;
for (String fieldName : fieldNames) {
rowMap.put(fieldName, convertionField(row.getColumn(idx)));
idx++;
}
return JSON.toJSONString(rowMap);
}
}

View File

@ -0,0 +1,233 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class DorisStreamLoadObserver {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);
private Keys options;
private long pos;
private static final String RESULT_FAILED = "Fail";
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
private static final String RESULT_LABEL_PREPARE = "PREPARE";
private static final String RESULT_LABEL_ABORTED = "ABORTED";
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
public DorisStreamLoadObserver ( Keys options){
this.options = options;
}
public void streamLoad(WriterTuple data) throws Exception {
String host = getLoadHost();
if(host == null){
throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
}
String loadUrl = new StringBuilder(host)
.append("/api/")
.append(options.getDatabase())
.append("/")
.append(options.getTable())
.append("/_stream_load")
.toString();
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to Doris: unknown result status.");
}
LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult));
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
throw new IOException(
new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString()
);
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult));
checkStreamLoadState(host, data.getLabel());
}
}
private void checkStreamLoadState(String host, String label) throws IOException {
int idx = 0;
while(true) {
try {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
} catch (InterruptedException ex) {
break;
}
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
httpGet.setHeader("Connection", "close");
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null) {
throw new IOException(String.format("Failed to flush data to Doris, Error " +
"could not get the final state of label[%s].\n", label), null);
}
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
String labelState = (String)result.get("state");
if (null == labelState) {
throw new IOException(String.format("Failed to flush data to Doris, Error " +
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
switch(labelState) {
case LAEBL_STATE_VISIBLE:
case LAEBL_STATE_COMMITTED:
return;
case RESULT_LABEL_PREPARE:
continue;
case RESULT_LABEL_ABORTED:
throw new DorisWriterExcetion (String.format("Failed to flush data to Doris, Error " +
"label[%s] state[%s]\n", label, labelState), null, true);
case RESULT_LABEL_UNKNOWN:
default:
throw new IOException(String.format("Failed to flush data to Doris, Error " +
"label[%s] state[%s]\n", label, labelState), null);
}
}
}
}
}
private byte[] addRows(List<byte[]> rows, int totalBytes) {
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (byte[] row : rows) {
bos.put(row);
bos.put(lineDelimiter);
}
return bos.array();
}
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true;
for (byte[] row : rows) {
if (!isFirstElement) {
bos.put(jsonDelimiter);
}
bos.put(row);
isFirstElement = false;
}
bos.put("]".getBytes(StandardCharsets.UTF_8));
return bos.array();
}
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
}
private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy () {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(loadUrl);
List<String> cols = options.getColumns();
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
}
if (null != options.getLoadProps()) {
for (Map.Entry<String, Object> entry : options.getLoadProps().entrySet()) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label);
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
httpPut.setEntity(new ByteArrayEntity (data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null)
return null;
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
}
}
}
private String getBasicAuthHeader(String username, String password) {
String auth = username + ":" + password;
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
}
private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (200 != code) {
LOG.warn("Request failed with code:{}", code);
return null;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
LOG.warn("Request failed with empty response.");
return null;
}
return respEntity;
}
private String getLoadHost() {
List<String> hostList = options.getLoadUrlList();
long tmp = pos + hostList.size();
for (; pos < tmp; pos++) {
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
if (checkConnection(host)) {
return host;
}
}
return null;
}
private boolean checkConnection(String host) {
try {
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(5000);
co.connect();
co.disconnect();
return true;
} catch (Exception e1) {
e1.printStackTrace();
return false;
}
}
}

View File

@ -0,0 +1,105 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* jdbc util
*/
public class DorisUtil {
private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class);
private DorisUtil() {}
public static List<String> getDorisTableColumns( Connection conn, String databaseName, String tableName) {
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
List<String> columns = new ArrayList<> ();
ResultSet rs = null;
try {
rs = DBUtil.query(conn, currentSql);
while (DBUtil.asyncResultSetNext(rs)) {
String colName = rs.getString("COLUMN_NAME");
columns.add(colName);
}
return columns;
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(rs, null, null);
}
}
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
if (null == preOrPostSqls) {
return Collections.emptyList();
}
List<String> renderedSqls = new ArrayList<>();
for (String sql : preOrPostSqls) {
if (! Strings.isNullOrEmpty(sql)) {
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
}
}
return renderedSqls;
}
public static void executeSqls(Connection conn, List<String> sqls) {
Statement stmt = null;
String currentSql = null;
try {
stmt = conn.createStatement();
for (String sql : sqls) {
currentSql = sql;
DBUtil.executeSqlWithoutResultSet(stmt, sql);
}
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(null, stmt, null);
}
}
public static void preCheckPrePareSQL( Keys options) {
String table = options.getTable();
List<String> preSqls = options.getPreSqlList();
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table);
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
for (String sql : renderedPreSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch ( ParserException e) {
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
public static void preCheckPostSQL( Keys options) {
String table = options.getTable();
List<String> postSqls = options.getPostSqlList();
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table);
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
for(String sql : renderedPostSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
} catch (ParserException e){
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
}
}
}
}
}

View File

@ -0,0 +1,164 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
/**
* doris data writer
*/
public class DorisWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private Keys options;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
options = new Keys (super.getPluginJobConf());
options.doPretreatment();
}
@Override
public void preCheck(){
this.init();
DorisUtil.preCheckPrePareSQL(options);
DorisUtil.preCheckPostSQL(options);
}
@Override
public void prepare() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
DorisUtil.executeSqls(conn, renderedPreSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(originalConfig);
}
return configurations;
}
@Override
public void post() {
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
DorisUtil.executeSqls(conn, renderedPostSqls);
DBUtil.closeDBResources(null, null, conn);
}
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private DorisWriterManager writerManager;
private Keys options;
private DorisCodec rowCodec;
@Override
public void init() {
options = new Keys (super.getPluginJobConf());
if (options.isWildcardColumn()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
List<String> columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
options.setInfoCchemaColumns(columns);
}
writerManager = new DorisWriterManager(options);
rowCodec = DorisCodecFactory.createCodec(options);
}
@Override
public void prepare() {
}
public void startWrite(RecordReceiver recordReceiver) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != options.getColumns().size()) {
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"There is an error in the column configuration information. " +
"This is because you have configured a task where the number of fields to be read from the source:%s " +
"is not equal to the number of fields to be written to the destination table:%s. " +
"Please check your configuration and make changes.",
record.getColumnNumber(),
options.getColumns().size()));
}
writerManager.writeRecord(rowCodec.codec(record));
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void post() {
try {
writerManager.close();
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override
public void destroy() {}
@Override
public boolean supportFailOver(){
return false;
}
}
}

View File

@ -0,0 +1,29 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import java.io.IOException;
import java.util.Map;
public class DorisWriterExcetion extends IOException {
private final Map<String, Object> response;
private boolean reCreateLabel;
public DorisWriterExcetion ( String message, Map<String, Object> response) {
super(message);
this.response = response;
}
public DorisWriterExcetion ( String message, Map<String, Object> response, boolean reCreateLabel) {
super(message);
this.response = response;
this.reCreateLabel = reCreateLabel;
}
public Map<String, Object> getFailedResponse() {
return response;
}
public boolean needReCreateLabel() {
return reCreateLabel;
}
}

View File

@ -0,0 +1,192 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.google.common.base.Strings;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class DorisWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
private final DorisStreamLoadObserver visitor;
private final Keys options;
private final List<byte[]> buffer = new ArrayList<> ();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
private volatile Exception flushException;
private final LinkedBlockingDeque< WriterTuple > flushQueue;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
public DorisWriterManager( Keys options) {
this.options = options;
this.visitor = new DorisStreamLoadObserver (options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
}
public void startScheduler() {
stopScheduler();
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());
this.scheduledFuture = this.scheduler.schedule(() -> {
synchronized (DorisWriterManager.this) {
if (!closed) {
try {
String label = createBatchLabel();
LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));
if (batchCount == 0) {
startScheduler();
}
flush(label, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, options.getFlushInterval(), TimeUnit.MILLISECONDS);
}
public void stopScheduler() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += bts.length;
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
flush(label, false);
}
} catch (Exception e) {
throw new IOException("Writing records to Doris failed.", e);
}
}
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
checkFlushException();
if (batchCount == 0) {
if (waitUtilDone) {
waitAsyncFlushingDone();
}
return;
}
flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer)));
if (waitUtilDone) {
// wait the last flush
waitAsyncFlushingDone();
}
buffer.clear();
batchCount = 0;
batchSize = 0;
}
public synchronized void close() {
if (!closed) {
closed = true;
try {
String label = createBatchLabel();
if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
flush(label, true);
} catch (Exception e) {
throw new RuntimeException("Writing records to Doris failed.", e);
}
}
checkFlushException();
}
public String createBatchLabel() {
StringBuilder sb = new StringBuilder();
if (! Strings.isNullOrEmpty(options.getLabelPrefix())) {
sb.append(options.getLabelPrefix());
}
return sb.append(UUID.randomUUID().toString())
.toString();
}
private void startAsyncFlushing() {
// start flush thread
Thread flushThread = new Thread(new Runnable(){
public void run() {
while(true) {
try {
asyncFlush();
} catch (Exception e) {
flushException = e;
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
}
private void waitAsyncFlushingDone() throws InterruptedException {
// wait previous flushings
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
flushQueue.put(new WriterTuple ("", 0l, null));
}
checkFlushException();
}
private void asyncFlush() throws Exception {
WriterTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
stopScheduler();
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= options.getMaxRetries(); i++) {
try {
// flush to Doris with stream load
visitor.streamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler();
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
if (i >= options.getMaxRetries()) {
throw new IOException(e);
}
if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
}
try {
Thread.sleep(1000l * Math.min(i + 1, 10));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
}
}
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to Doris failed.", flushException);
}
}
}

View File

@ -0,0 +1,177 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class Keys implements Serializable {
private static final long serialVersionUID = 1l;
private static final int MAX_RETRIES = 3;
private static final int BATCH_ROWS = 500000;
private static final long DEFAULT_FLUSH_INTERVAL = 30000;
private static final String LOAD_PROPS_FORMAT = "format";
public enum StreamLoadFormat {
CSV, JSON;
}
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String DATABASE = "connection[0].selectedDatabase";
private static final String TABLE = "connection[0].table[0]";
private static final String COLUMN = "column";
private static final String PRE_SQL = "preSql";
private static final String POST_SQL = "postSql";
private static final String JDBC_URL = "connection[0].jdbcUrl";
private static final String LABEL_PREFIX = "labelPrefix";
private static final String MAX_BATCH_ROWS = "maxBatchRows";
private static final String MAX_BATCH_SIZE = "maxBatchSize";
private static final String FLUSH_INTERVAL = "flushInterval";
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M
private final Configuration options;
private List<String> infoSchemaColumns;
private List<String> userSetColumns;
private boolean isWildcardColumn;
public Keys ( Configuration options) {
this.options = options;
this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) {
this.isWildcardColumn = true;
}
}
public void doPretreatment() {
validateRequired();
validateStreamLoadUrl();
}
public String getJdbcUrl() {
return options.getString(JDBC_URL);
}
public String getDatabase() {
return options.getString(DATABASE);
}
public String getTable() {
return options.getString(TABLE);
}
public String getUsername() {
return options.getString(USERNAME);
}
public String getPassword() {
return options.getString(PASSWORD);
}
public String getLabelPrefix() {
String label = options.getString(LABEL_PREFIX);
return null == label ? DEFAULT_LABEL_PREFIX : label;
}
public List<String> getLoadUrlList() {
return options.getList(LOAD_URL, String.class);
}
public List<String> getColumns() {
if (isWildcardColumn) {
return this.infoSchemaColumns;
}
return this.userSetColumns;
}
public boolean isWildcardColumn() {
return this.isWildcardColumn;
}
public void setInfoCchemaColumns(List<String> cols) {
this.infoSchemaColumns = cols;
}
public List<String> getPreSqlList() {
return options.getList(PRE_SQL, String.class);
}
public List<String> getPostSqlList() {
return options.getList(POST_SQL, String.class);
}
public Map<String, Object> getLoadProps() {
return options.getMap(LOAD_PROPS);
}
public int getMaxRetries() {
return MAX_RETRIES;
}
public int getBatchRows() {
Integer rows = options.getInt(MAX_BATCH_ROWS);
return null == rows ? BATCH_ROWS : rows;
}
public long getBatchSize() {
Long size = options.getLong(MAX_BATCH_SIZE);
return null == size ? DEFAULT_MAX_BATCH_SIZE : size;
}
public long getFlushInterval() {
Long interval = options.getLong(FLUSH_INTERVAL);
return null == interval ? DEFAULT_FLUSH_INTERVAL : interval;
}
public int getFlushQueueLength() {
Integer len = options.getInt(FLUSH_QUEUE_LENGTH);
return null == len ? 1 : len;
}
public StreamLoadFormat getStreamLoadFormat() {
Map<String, Object> loadProps = getLoadProps();
if (null == loadProps) {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
}
private void validateStreamLoadUrl() {
List<String> urlList = getLoadUrlList();
for (String host : urlList) {
if (host.split(":").length < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
"The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
}
}
}
private void validateRequired() {
final String[] requiredOptionKeys = new String[]{
USERNAME,
DATABASE,
TABLE,
COLUMN,
LOAD_URL
};
for (String optionKey : requiredOptionKeys) {
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
}
}
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import java.util.List;
public class WriterTuple {
private String label;
private Long bytes;
private List<byte[]> rows;
public WriterTuple ( String label, Long bytes, List<byte[]> rows){
this.label = label;
this.rows = rows;
this.bytes = bytes;
}
public String getLabel() { return label; }
public void setLabel(String label) { this.label = label; }
public Long getBytes() { return bytes; }
public List<byte[]> getRows() { return rows; }
}

View File

@ -0,0 +1,6 @@
{
"name": "doriswriter",
"class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter",
"description": "apache doris writer plugin",
"developer": "apche doris"
}

View File

@ -0,0 +1,20 @@
{
"name": "doriswriter",
"parameter": {
"username": "",
"password": "",
"column": [],
"preSql": [],
"postSql": [],
"beLoadUrl": [],
"loadUrl": [],
"loadProps": {},
"connection": [
{
"jdbcUrl": "",
"selectedDatabase": "",
"table": []
}
]
}
}

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
import com.alibaba.datax.common.util.Configuration;
import java.io.IOException;
import java.util.Map;
public class TestDorisWriterLoad {
// for test
public static void main(String[] args) throws IOException {
/**
* 下面示例使用的建表语句要首先有一套Ddoris的环境创建数据库demo然后使用下面的建表语句创建表
* 修改feLoadUrl中的IP地址usernamepassword然后运行
* CREATE TABLE `doris_test` (
* `k1` varchar(30) NULL ,
* `k2` varchar(255) NULL,
* `k3` varchar(200)
* ) ENGINE=OLAP
* Duplicate KEY(k1)
* COMMENT "OLAP"
* DISTRIBUTED BY HASH(k1) BUCKETS 1
* PROPERTIES (
* "replication_allocation" = "tag.location.default: 1",
* "in_memory" = "false",
* "storage_format" = "V2"
* )
*/
String json = "{\n" +
" \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" +
" \"column\": [\"k1\", \"k2\", \"k3\"],\n" +
" \"database\": \"demo\",\n" +
" \"jdbcUrl\": \"\",\n" +
" \"loadProps\": {},\n" +
" \"password\": \"12345\",\n" +
" \"postSql\": [],\n" +
" \"preSql\": [],\n" +
" \"table\": \"doris_test\",\n" +
" \"username\": \"root\"\n" +
"}";
Configuration configuration = Configuration.from(json);
Key key = new Key(configuration);
DorisWriterEmitter emitter = new DorisWriterEmitter(key);
DorisFlushBatch flushBatch = new DorisFlushBatch("\n","csv");
flushBatch.setLabel("test4");
Map<String, String> row1 = Maps.newHashMap();
row1.put("k1", "2021-02-02");
row1.put("k2", "2021-02-02 00:00:00");
row1.put("k3", "3");
String rowStr1 = JSON.toJSONString(row1);
System.out.println("rows1: " + rowStr1);
flushBatch.putData(rowStr1);
Map<String, String> row2 = Maps.newHashMap();
row2.put("k1", "2021-02-03");
row2.put("k2", "2021-02-03 00:00:00");
row2.put("k3", "4");
String rowStr2 = JSON.toJSONString(row2);
System.out.println("rows2: " + rowStr2);
flushBatch.putData(rowStr2);
for (int i = 0; i < 50000; ++i) {
flushBatch.putData(rowStr2);
}
emitter.emit (flushBatch);
}
}

View File

@ -266,6 +266,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>doriswriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>txtfilewriter/target/datax/</directory>
<includes>

View File

@ -101,7 +101,6 @@
<module>hbase11xwriter</module>
<module>hbase094xwriter</module>
<module>hbase11xsqlwriter</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<module>ftpwriter</module>
<module>hdfswriter</module>
@ -123,6 +122,10 @@
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<module>doriswriter</module>
</modules>
<dependencyManagement>