mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 10:21:33 +08:00
add doriswriter
This commit is contained in:
parent
33e6ace661
commit
9f5d9a6317
170
doriswriter/doc/doriswriter.md
Normal file
170
doriswriter/doc/doriswriter.md
Normal file
@ -0,0 +1,170 @@
|
||||
# DataX DorisWriter
|
||||
|
||||
|
||||
---
|
||||
|
||||
|
||||
## 1 快速介绍
|
||||
|
||||
DorisWriter 插件实现了写入数据到 Doris 主库的目的表的功能。在底层实现上, DorisWriter 通过Streamload以csv格式导入数据至Doris。
|
||||
|
||||
|
||||
## 2 实现原理
|
||||
|
||||
DorisWriter 通过Streamload以csv格式导入数据至Doris, 内部将`reader`读取的数据进行缓存后批量导入至Doris,以提高写入性能。
|
||||
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
* 这里使用一份从内存Mysql读取数据后导入至Doris。
|
||||
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
},
|
||||
"errorLimit": {
|
||||
"record": 0,
|
||||
"percentage": 0
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "xxxx",
|
||||
"password": "xxxx",
|
||||
"column": [ "k1", "k2", "v1", "v2" ],
|
||||
"connection": [
|
||||
{
|
||||
"table": [ "table1", "table2" ],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://127.0.0.1:3306/datax_test1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"table": [ "table3", "table4" ],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://127.0.0.1:3306/datax_test2"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "doriswriter",
|
||||
"parameter": {
|
||||
"username": "xxxx",
|
||||
"password": "xxxx",
|
||||
"database": "xxxx",
|
||||
"table": "xxxx",
|
||||
"column": ["k1", "k2", "v1", "v2"],
|
||||
"preSql": [],
|
||||
"postSql": [],
|
||||
"jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
|
||||
"loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
### 3.2 参数说明
|
||||
|
||||
* **username**
|
||||
|
||||
* 描述:Doris数据库的用户名 <br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **password**
|
||||
|
||||
* 描述:Doris数据库的密码 <br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **database**
|
||||
|
||||
* 描述:Doris表的数据库名称。
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **table**
|
||||
|
||||
* 描述:Doris表的表名称。
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **loadUrl**
|
||||
|
||||
* 描述:Doris FE的地址用于Streamload,可以为多个fe地址,`fe_ip:fe_http_port`。
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **column**
|
||||
|
||||
* 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
|
||||
|
||||
**column配置项必须指定,不能留空!**
|
||||
|
||||
注意:我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:否 <br />
|
||||
|
||||
* **preSql**
|
||||
|
||||
* 描述:写入数据到目的表前,会先执行这里的标准语句。 <br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **postSql**
|
||||
|
||||
* 描述:写入数据到目的表后,会执行这里的标准语句。 <br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **jdbcUrl**
|
||||
|
||||
* 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。 <br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
|
||||
### 3.3 类型转换
|
||||
|
||||
传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
|
||||
|
||||
## 4 性能报告
|
||||
|
||||
|
||||
## 5 约束限制
|
||||
|
||||
|
||||
## FAQ
|
155
doriswriter/pom.xml
Executable file
155
doriswriter/pom.xml
Executable file
@ -0,0 +1,155 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-all</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>doriswriter</artifactId>
|
||||
<name>doriswriter</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>plugin-rdbms-util</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
<version>1.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<version>1.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore</artifactId>
|
||||
<version>4.4.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.5.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.75</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>5.1.34</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- compiler plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>${jdk-version}</source>
|
||||
<target>${jdk-version}</target>
|
||||
<encoding>${project-sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<executions>
|
||||
<!-- Run shade goal on package phase -->
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>true</createDependencyReducedPom>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>com.alibaba.fastjson</pattern>
|
||||
<shadedPattern>com.dorisdb.shade.com.alibaba.fastjson</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>org.apache.http</pattern>
|
||||
<shadedPattern>com.dorisdb.shade.org.apache.http</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>org.apache.commons</pattern>
|
||||
<shadedPattern>com.dorisdb.shade.org.apache.commons</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>commons-codec:commons-codec</include>
|
||||
<include>commons-logging:*</include>
|
||||
<include>org.apache.httpcomponents:httpclient</include>
|
||||
<include>org.apache.httpcomponents:httpcore</include>
|
||||
<include>com.alibaba:fastjson</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<filters>
|
||||
<filter>
|
||||
<!-- Do not copy the signatures in the META-INF folder.
|
||||
Otherwise, this might cause SecurityExceptions when using the JAR. -->
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<!-- assembly plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||
</descriptors>
|
||||
<finalName>datax</finalName>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>dwzip</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
35
doriswriter/src/main/assembly/package.xml
Executable file
35
doriswriter/src/main/assembly/package.xml
Executable file
@ -0,0 +1,35 @@
|
||||
<assembly
|
||||
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id></id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>plugin.json</include>
|
||||
<include>plugin_job_template.json</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/doriswriter</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>doriswriter-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/doriswriter</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>plugin/writer/doriswriter/libs</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -0,0 +1,144 @@
|
||||
package com.dorisdb.connector.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.dorisdb.connector.datax.plugin.writer.doriswriter.manager.DorisWriterManager;
|
||||
import com.dorisdb.connector.datax.plugin.writer.doriswriter.util.DorisWriterUtil;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DorisWriter extends Writer {
|
||||
|
||||
public static class Job extends Writer.Job {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||
private Configuration originalConfig = null;
|
||||
private DorisWriterOptions options;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
||||
options.doPretreatment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCheck(){
|
||||
this.init();
|
||||
DorisWriterUtil.preCheckPrePareSQL(options);
|
||||
DorisWriterUtil.preCheckPostSQL(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
List<String> renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
|
||||
DorisWriterUtil.executeSqls(conn, renderedPreSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int mandatoryNumber) {
|
||||
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
|
||||
for (int i = 0; i < mandatoryNumber; i++) {
|
||||
configurations.add(originalConfig);
|
||||
}
|
||||
return configurations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
List<String> renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
||||
DorisWriterUtil.executeSqls(conn, renderedPostSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Task extends Writer.Task {
|
||||
private DorisWriterManager writerManager;
|
||||
private DorisWriterOptions options;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
||||
writerManager = new DorisWriterManager(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
}
|
||||
|
||||
public void startWrite(RecordReceiver recordReceiver) {
|
||||
try {
|
||||
Record record;
|
||||
while ((record = recordReceiver.getFromReader()) != null) {
|
||||
if (record.getColumnNumber() != options.getColumns().size()) {
|
||||
throw DataXException
|
||||
.asDataXException(
|
||||
DBUtilErrorCode.CONF_ERROR,
|
||||
String.format(
|
||||
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
|
||||
record.getColumnNumber(),
|
||||
options.getColumns().size()));
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < record.getColumnNumber(); i++) {
|
||||
sb.append(record.getColumn(i).getRawData().toString());
|
||||
if (i < record.getColumnNumber() - 1) {
|
||||
sb.append("\t");
|
||||
}
|
||||
}
|
||||
writerManager.writeRecord(sb.toString());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
try {
|
||||
writerManager.flush(writerManager.createBatchLabel());
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public boolean supportFailOver(){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,111 @@
|
||||
package com.dorisdb.connector.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DorisWriterOptions implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1l;
|
||||
private static final long KILO_BYTES_SCALE = 1024l;
|
||||
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
|
||||
private static final int MAX_RETRIES = 1;
|
||||
private static final int BATCH_ROWS = 500000;
|
||||
private static final long BATCH_BYTES = 100 * MEGA_BYTES_SCALE;
|
||||
|
||||
private static final String KEY_USERNAME = "username";
|
||||
private static final String KEY_PASSWORD = "password";
|
||||
private static final String KEY_DATABASE = "database";
|
||||
private static final String KEY_TABLE = "table";
|
||||
private static final String KEY_COLUMN = "column";
|
||||
private static final String KEY_PRE_SQL = "preSql";
|
||||
private static final String KEY_POST_SQL = "postSql";
|
||||
private static final String KEY_JDBC_URL = "jdbcUrl";
|
||||
private static final String KEY_LOAD_URL = "loadUrl";
|
||||
|
||||
private final Configuration options;
|
||||
|
||||
public DorisWriterOptions(Configuration options) {
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
public void doPretreatment() {
|
||||
validateRequired();
|
||||
validateStreamLoadUrl();
|
||||
}
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return options.getString(KEY_JDBC_URL);
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return options.getString(KEY_DATABASE);
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return options.getString(KEY_TABLE);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return options.getString(KEY_USERNAME);
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return options.getString(KEY_PASSWORD);
|
||||
}
|
||||
|
||||
public List<String> getLoadUrlList() {
|
||||
return options.getList(KEY_LOAD_URL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getColumns() {
|
||||
return options.getList(KEY_COLUMN, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPreSqlList() {
|
||||
return options.getList(KEY_PRE_SQL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPostSqlList() {
|
||||
return options.getList(KEY_POST_SQL, String.class);
|
||||
}
|
||||
|
||||
public int getMaxRetries() {
|
||||
return MAX_RETRIES;
|
||||
}
|
||||
|
||||
public int getBatchRows() {
|
||||
return BATCH_ROWS;
|
||||
}
|
||||
|
||||
public long getBatchSize() {
|
||||
return BATCH_BYTES;
|
||||
}
|
||||
|
||||
private void validateStreamLoadUrl() {
|
||||
List<String> urlList = getLoadUrlList();
|
||||
for (String host : urlList) {
|
||||
if (host.split(":").length < 2) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
"loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRequired() {
|
||||
final String[] requiredOptionKeys = new String[]{
|
||||
KEY_USERNAME,
|
||||
KEY_PASSWORD,
|
||||
KEY_DATABASE,
|
||||
KEY_TABLE,
|
||||
KEY_LOAD_URL
|
||||
};
|
||||
for (String optionKey : requiredOptionKeys) {
|
||||
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,143 @@
|
||||
package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultRedirectStrategy;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class DorisStreamLoadVisitor implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class);
|
||||
|
||||
private final DorisWriterOptions writerOptions;
|
||||
private int pos;
|
||||
|
||||
public DorisStreamLoadVisitor(DorisWriterOptions writerOptions) {
|
||||
this.writerOptions = writerOptions;
|
||||
}
|
||||
|
||||
public void doStreamLoad(String label, List<String> labeledRows) throws IOException {
|
||||
String host = getAvailableHost();
|
||||
if (null == host) {
|
||||
throw new IOException("None of the host in `load_url` could be connected.");
|
||||
}
|
||||
String loadUrl = new StringBuilder(host)
|
||||
.append("/api/")
|
||||
.append(writerOptions.getDatabase())
|
||||
.append("/")
|
||||
.append(writerOptions.getTable())
|
||||
.append("/_stream_load")
|
||||
.toString();
|
||||
Map<String, Object> loadResult = doHttpPut(loadUrl, label, joinRows(labeledRows));
|
||||
final String keyStatus = "Status";
|
||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
||||
throw new IOException("Unable to flush data to doris: unknown result status.");
|
||||
}
|
||||
if (loadResult.get(keyStatus).equals("Fail")) {
|
||||
throw new IOException(
|
||||
new StringBuilder("Failed to flush data to doris.").append(loadResult.get("Message").toString()).toString()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private String getAvailableHost() {
|
||||
List<String> hostList = writerOptions.getLoadUrlList();
|
||||
if (pos >= hostList.size()) {
|
||||
pos = 0;
|
||||
}
|
||||
for (; pos < hostList.size(); pos++) {
|
||||
String host = new StringBuilder("http://").append(hostList.get(pos)).toString();
|
||||
if (tryHttpConnection(host)) {
|
||||
return host;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean tryHttpConnection(String host) {
|
||||
try {
|
||||
URL url = new URL(host);
|
||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
||||
co.setConnectTimeout(1000);
|
||||
co.connect();
|
||||
co.disconnect();
|
||||
return true;
|
||||
} catch (Exception e1) {
|
||||
LOG.warn("Failed to connect to address:{}", host, e1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] joinRows(List<String> rows) {
|
||||
return String.join("\n", rows).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
|
||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
||||
.setRedirectStrategy(new DefaultRedirectStrategy() {
|
||||
@Override
|
||||
protected boolean isRedirectable(String method) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||
HttpPut httpPut = new HttpPut(loadUrl);
|
||||
List<String> cols = writerOptions.getColumns();
|
||||
if (null != cols && !cols.isEmpty()) {
|
||||
httpPut.setHeader("columns", String.join(",", cols));
|
||||
}
|
||||
httpPut.setHeader("Expect", "100-continue");
|
||||
httpPut.setHeader("label", label);
|
||||
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
|
||||
httpPut.setEntity(new ByteArrayEntity(data));
|
||||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
||||
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
||||
int code = resp.getStatusLine().getStatusCode();
|
||||
if (200 != code) {
|
||||
LOG.warn("Request failed with code:{}", code);
|
||||
return null;
|
||||
}
|
||||
HttpEntity respEntity = resp.getEntity();
|
||||
if (null == respEntity) {
|
||||
LOG.warn("Request failed with empty response.");
|
||||
return null;
|
||||
}
|
||||
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getBasicAuthHeader(String username, String password) {
|
||||
String auth = username + ":" + password;
|
||||
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes());
|
||||
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
|
||||
|
||||
public class DorisWriterManager implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
||||
|
||||
private final DorisStreamLoadVisitor dorisStreamLoadVisitor;
|
||||
private final DorisWriterOptions writerOptions;
|
||||
|
||||
private final List<String> buffer = new ArrayList<>();
|
||||
private int batchCount = 0;
|
||||
private long batchSize = 0;
|
||||
private volatile boolean closed = false;
|
||||
private volatile Exception flushException;
|
||||
|
||||
public DorisWriterManager(DorisWriterOptions writerOptions) {
|
||||
this.writerOptions = writerOptions;
|
||||
this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions);
|
||||
}
|
||||
|
||||
public final synchronized void writeRecord(String record) throws IOException {
|
||||
checkFlushException();
|
||||
try {
|
||||
buffer.add(record);
|
||||
batchCount++;
|
||||
batchSize += record.length();
|
||||
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
|
||||
flush(createBatchLabel());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Writing records to Doris failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void flush(String label) throws IOException {
|
||||
checkFlushException();
|
||||
if (batchCount == 0) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
|
||||
try {
|
||||
tryToFlush(label);
|
||||
buffer.clear();
|
||||
batchCount = 0;
|
||||
batchSize = 0;
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e);
|
||||
if (i >= writerOptions.getMaxRetries()) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000l * (i + 1));
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
|
||||
if (batchCount > 0) {
|
||||
try {
|
||||
flush(createBatchLabel());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Writing records to Doris failed.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
checkFlushException();
|
||||
}
|
||||
|
||||
public String createBatchLabel() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public List<String> getBufferedBatchList() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public void setBufferedBatchList(List<String> buffer) {
|
||||
this.buffer.clear();
|
||||
this.buffer.addAll(buffer);
|
||||
}
|
||||
|
||||
private void tryToFlush(String label) throws IOException {
|
||||
// flush to Doris with stream load
|
||||
dorisStreamLoadVisitor.doStreamLoad(label, buffer);
|
||||
}
|
||||
|
||||
private void checkFlushException() {
|
||||
if (flushException != null) {
|
||||
throw new RuntimeException("Writing records to Doris failed.", flushException);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package com.dorisdb.connector.datax.plugin.writer.doriswriter.util;
|
||||
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||
import com.alibaba.druid.sql.parser.ParserException;
|
||||
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.Statement;
|
||||
import java.util.*;
|
||||
|
||||
public final class DorisWriterUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterUtil.class);
|
||||
|
||||
private DorisWriterUtil() {}
|
||||
|
||||
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
|
||||
if (null == preOrPostSqls) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<String> renderedSqls = new ArrayList<>();
|
||||
for (String sql : preOrPostSqls) {
|
||||
if (!Strings.isNullOrEmpty(sql)) {
|
||||
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
|
||||
}
|
||||
}
|
||||
return renderedSqls;
|
||||
}
|
||||
|
||||
public static void executeSqls(Connection conn, List<String> sqls) {
|
||||
Statement stmt = null;
|
||||
String currentSql = null;
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
for (String sql : sqls) {
|
||||
currentSql = sql;
|
||||
DBUtil.executeSqlWithoutResultSet(stmt, sql);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, stmt, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPrePareSQL(DorisWriterOptions options) {
|
||||
String table = options.getTable();
|
||||
List<String> preSqls = options.getPreSqlList();
|
||||
List<String> renderedPreSqls = DorisWriterUtil.renderPreOrPostSqls(preSqls, table);
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
|
||||
for (String sql : renderedPreSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e) {
|
||||
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPostSQL(DorisWriterOptions options) {
|
||||
String table = options.getTable();
|
||||
List<String> postSqls = options.getPostSqlList();
|
||||
List<String> renderedPostSqls = DorisWriterUtil.renderPreOrPostSqls(postSqls, table);
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
|
||||
for(String sql : renderedPostSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e){
|
||||
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
6
doriswriter/src/main/resources/plugin.json
Executable file
6
doriswriter/src/main/resources/plugin.json
Executable file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "doriswriter",
|
||||
"class": "com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriter",
|
||||
"description": "useScene: prod. mechanism: DorisStreamLoad. warn: The more you know about the database, the less problems you encounter.",
|
||||
"developer": "dorisdb"
|
||||
}
|
14
doriswriter/src/main/resources/plugin_job_template.json
Normal file
14
doriswriter/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"name": "doriswriter",
|
||||
"parameter": {
|
||||
"username": "",
|
||||
"password": "",
|
||||
"database": "",
|
||||
"table": "",
|
||||
"column": [],
|
||||
"preSql": [],
|
||||
"postSql": [],
|
||||
"jdbcUrl": "",
|
||||
"loadUrl": []
|
||||
}
|
||||
}
|
@ -189,6 +189,13 @@
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>doriswriter/target/datax/</directory>
|
||||
<includes>
|
||||
<include>**/*.*</include>
|
||||
</includes>
|
||||
<outputDirectory>datax</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>drdswriter/target/datax/</directory>
|
||||
<includes>
|
||||
|
3
pom.xml
3
pom.xml
@ -71,6 +71,7 @@
|
||||
|
||||
<!-- writer -->
|
||||
<module>mysqlwriter</module>
|
||||
<module>doriswriter</module>
|
||||
<module>drdswriter</module>
|
||||
<module>odpswriter</module>
|
||||
<module>txtfilewriter</module>
|
||||
@ -97,7 +98,7 @@
|
||||
<module>gdbwriter</module>
|
||||
<module>cassandrawriter</module>
|
||||
<module>clickhousewriter</module>
|
||||
<module>oscarwriter</module>
|
||||
<!-- <module>oscarwriter</module> -->
|
||||
<!-- common support module -->
|
||||
<module>plugin-rdbms-util</module>
|
||||
<module>plugin-unstructured-storage-util</module>
|
||||
|
Loading…
Reference in New Issue
Block a user