Merge pull request #1866 from hantmac/feat/support-replace-mode

feat: databendWriter support replace mode
This commit is contained in:
dingxiaobo 2023-08-09 15:27:13 +08:00 committed by GitHub
commit 74ec2d1240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 118 additions and 36 deletions

View File

@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
"writer": {
"name": "databendwriter",
"parameter": {
"writeMode": "replace",
"onConflictColumn": ["id"],
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* 必选: 否
* 默认值: 无
* 示例: ["select count(*) from datax.sample1"]
* writeMode
* 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace务必填写 onConflictColumn 参数
* 必选:否
* 默认值insert
* 示例:"replace"
* onConflictColumn
* 描述on conflict 字段,指定 writeMode 为 replace 后,需要此参数
* 必选:否
* 默认值:无
* 示例:["id","user"]
### 3.3 类型转化
DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。

View File

@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* Description: A list of SQL statements that will be executed after the write operation.
* Required: no
* Default: none
* writeMode
* DescriptionThe write mode, support `insert` and `replace` two mode.
* Requiredno
* Defaultinsert
* Example"replace"
* onConflictColumn
* DescriptionOn conflict fields list.
* Requiredno
* Defaultnone
* Example["id","user"]
### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.

View File

@ -17,7 +17,7 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>

View File

@ -17,20 +17,17 @@ import java.sql.*;
import java.util.List;
import java.util.regex.Pattern;
public class DatabendWriter extends Writer
{
public class DatabendWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;
public static class Job
extends Writer.Job
{
extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override
public void init()
{
public void init() throws DataXException {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
@ -39,8 +36,7 @@ public class DatabendWriter extends Writer
}
@Override
public void preCheck()
{
public void preCheck() {
this.init();
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@ -67,8 +63,7 @@ public class DatabendWriter extends Writer
}
public static class Task extends Writer.Task
{
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
@ -76,11 +71,10 @@ public class DatabendWriter extends Writer
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override
public void init()
{
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) {
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException {
try {
@ -177,8 +171,8 @@ public class DatabendWriter extends Writer
case Types.BOOLEAN:
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
@ -224,8 +218,7 @@ public class DatabendWriter extends Writer
}
@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}
@ -238,9 +231,9 @@ public class DatabendWriter extends Writer
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver)
{
public void startWrite(RecordReceiver lineReceiver) {
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
}

View File

@ -0,0 +1,33 @@
package com.alibaba.datax.plugin.writer.databendwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum DatabendWriterErrorCode implements ErrorCode {
CONF_ERROR("DatabendWriter-00", "配置错误."),
WRITE_DATA_ERROR("DatabendWriter-01", "写入数据时失败."),
;
private final String code;
private final String description;
private DatabendWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -1,40 +1,72 @@
package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.databendwriter.DatabendWriterErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.crypto.Data;
import java.util.List;
import java.util.StringJoiner;
public final class DatabendWriterUtil
{
public final class DatabendWriterUtil {
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
private DatabendWriterUtil() {}
public static void dealWriteMode(Configuration originalConfig)
{
private DatabendWriterUtil() {
}
public static void dealWriteMode(Configuration originalConfig) throws DataXException {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class);
StringBuilder writeDataSqlTemplate = new StringBuilder();
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) {
if (onConflictColumns == null || onConflictColumns.size() == 0) {
throw DataXException
.asDataXException(
DatabendWriterErrorCode.CONF_ERROR,
String.format(
"Replace mode must has onConflictColumn config."
));
}
StringBuilder writeDataSqlTemplate = new StringBuilder();
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
// for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace"
writeDataSqlTemplate.append("REPLACE INTO %s (")
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns))
.append(" VALUES");
for (String column : columns) {
columnString.add(column);
LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
} else {
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
for (String column : columns) {
columnString.add(column);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
}
public static String onConFlictDoString(List<String> conflictColumns) {
return " ON " +
"(" +
StringUtils.join(conflictColumns, ",") + ") ";
}
}

View File

@ -11,6 +11,8 @@ public final class Key {
public final static String COLUMN = "column";
public final static String ONCONFLICT_COLUMN = "onConflictColumn";
//可选值为insert,replace默认为 insert mysql 支持oracle 没用 replace 机制只能 insert,oracle 可以不暴露这个参数
public final static String WRITE_MODE = "writeMode";