Merge branch 'master' into example_demo

This commit is contained in:
FuYouJ 2023-08-12 09:30:27 +08:00
commit 99d57a880d
9 changed files with 118 additions and 36 deletions

View File

@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
"writer": { "writer": {
"name": "databendwriter", "name": "databendwriter",
"parameter": { "parameter": {
"writeMode": "replace",
"onConflictColumn": ["id"],
"username": "databend", "username": "databend",
"password": "databend", "password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"], "column": ["a", "b", "c", "d", "e", "f", "g"],
@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* 必选: 否 * 必选: 否
* 默认值: 无 * 默认值: 无
* 示例: ["select count(*) from datax.sample1"] * 示例: ["select count(*) from datax.sample1"]
* writeMode
* 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace务必填写 onConflictColumn 参数
* 必选:否
* 默认值insert
* 示例:"replace"
* onConflictColumn
* 描述on conflict 字段,指定 writeMode 为 replace 后,需要此参数
* 必选:否
* 默认值:无
* 示例:["id","user"]
### 3.3 类型转化 ### 3.3 类型转化
DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。 DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。

View File

@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* Description: A list of SQL statements that will be executed after the write operation. * Description: A list of SQL statements that will be executed after the write operation.
* Required: no * Required: no
* Default: none * Default: none
* writeMode
* DescriptionThe write mode, support `insert` and `replace` two mode.
* Requiredno
* Defaultinsert
* Example"replace"
* onConflictColumn
* DescriptionOn conflict fields list.
* Requiredno
* Defaultnone
* Example["id","user"]
### 3.3 Type Convert ### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.

View File

@ -17,7 +17,7 @@
<dependency> <dependency>
<groupId>com.databend</groupId> <groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId> <artifactId>databend-jdbc</artifactId>
<version>0.0.7</version> <version>0.1.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba.datax</groupId> <groupId>com.alibaba.datax</groupId>

View File

@ -17,20 +17,17 @@ import java.sql.*;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DatabendWriter extends Writer public class DatabendWriter extends Writer {
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend; private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
public static class Job public static class Job
extends Writer.Job extends Writer.Job {
{
private static final Logger LOG = LoggerFactory.getLogger(Job.class); private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig; private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster; private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override @Override
public void init() public void init() throws DataXException {
{
this.originalConfig = super.getPluginJobConf(); this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig); this.commonRdbmsWriterMaster.init(this.originalConfig);
@ -39,8 +36,7 @@ public class DatabendWriter extends Writer
} }
@Override @Override
public void preCheck() public void preCheck() {
{
this.init(); this.init();
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE); this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
} }
@ -67,8 +63,7 @@ public class DatabendWriter extends Writer
} }
public static class Task extends Writer.Task public static class Task extends Writer.Task {
{
private static final Logger LOG = LoggerFactory.getLogger(Task.class); private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig; private Configuration writerSliceConfig;
@ -76,8 +71,7 @@ public class DatabendWriter extends Writer
private CommonRdbmsWriter.Task commonRdbmsWriterSlave; private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override @Override
public void init() public void init() {
{
this.writerSliceConfig = super.getPluginJobConf(); this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) { this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) {
@ -224,8 +218,7 @@ public class DatabendWriter extends Writer
} }
@Override @Override
public void destroy() public void destroy() {
{
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
} }
@ -238,9 +231,9 @@ public class DatabendWriter extends Writer
public void post() { public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig); this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
} }
@Override @Override
public void startWrite(RecordReceiver lineReceiver) public void startWrite(RecordReceiver lineReceiver) {
{
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector()); this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
} }

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.writer.databendwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum DatabendWriterErrorCode implements ErrorCode {
CONF_ERROR("DatabendWriter-00", "配置错误."),
WRITE_DATA_ERROR("DatabendWriter-01", "写入数据时失败."),
;
private final String code;
private final String description;
private DatabendWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -1,29 +1,53 @@
package com.alibaba.datax.plugin.writer.databendwriter.util; package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.databendwriter.DatabendWriterErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.xml.crypto.Data;
import java.util.List; import java.util.List;
import java.util.StringJoiner; import java.util.StringJoiner;
public final class DatabendWriterUtil public final class DatabendWriterUtil {
{
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class); private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
private DatabendWriterUtil() {} private DatabendWriterUtil() {
public static void dealWriteMode(Configuration originalConfig) }
{
public static void dealWriteMode(Configuration originalConfig) throws DataXException {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class); List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class);
StringBuilder writeDataSqlTemplate = new StringBuilder();
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class)); Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) {
if (onConflictColumns == null || onConflictColumns.size() == 0) {
throw DataXException
.asDataXException(
DatabendWriterErrorCode.CONF_ERROR,
String.format(
"Replace mode must has onConflictColumn config."
));
}
StringBuilder writeDataSqlTemplate = new StringBuilder(); // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace"
writeDataSqlTemplate.append("REPLACE INTO %s (")
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns))
.append(" VALUES");
LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
} else {
writeDataSqlTemplate.append("INSERT INTO %s"); writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(","); StringJoiner columnString = new StringJoiner(",");
@ -33,8 +57,16 @@ public final class DatabendWriterUtil
writeDataSqlTemplate.append(String.format("(%s)", columnString)); writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES"); writeDataSqlTemplate.append(" VALUES");
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
} }
}
public static String onConFlictDoString(List<String> conflictColumns) {
return " ON " +
"(" +
StringUtils.join(conflictColumns, ",") + ") ";
}
} }

View File

@ -11,6 +11,8 @@ public final class Key {
public final static String COLUMN = "column"; public final static String COLUMN = "column";
public final static String ONCONFLICT_COLUMN = "onConflictColumn";
//可选值为insert,replace默认为 insert mysql 支持oracle 没用 replace 机制只能 insert,oracle 可以不暴露这个参数 //可选值为insert,replace默认为 insert mysql 支持oracle 没用 replace 机制只能 insert,oracle 可以不暴露这个参数
public final static String WRITE_MODE = "writeMode"; public final static String WRITE_MODE = "writeMode";