From 30125cec996bb0f1067a9e1d6b6026dced45a879 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 1 Aug 2023 18:44:28 +0800 Subject: [PATCH 1/6] feat: databendWriter support replace mode --- databendwriter/pom.xml | 2 +- .../util/DatabendWriterUtil.java | 55 +++++++++++++------ 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml index 9ddc735c..b99ca5d8 100644 --- a/databendwriter/pom.xml +++ b/databendwriter/pom.xml @@ -17,7 +17,7 @@ com.databend databend-jdbc - 0.0.7 + 0.1.0 com.alibaba.datax diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index a862e920..10a3e09c 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -1,40 +1,61 @@ package com.alibaba.datax.plugin.writer.databendwriter.util; + import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.StringJoiner; -public final class DatabendWriterUtil -{ +public final class DatabendWriterUtil { private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class); - private DatabendWriterUtil() {} - public static void dealWriteMode(Configuration originalConfig) - { + private DatabendWriterUtil() { + } + + public static void dealWriteMode(Configuration originalConfig) { List columns = originalConfig.getList(Key.COLUMN, String.class); + String valueHolders = "?"; + StringBuilder writeDataSqlTemplate = new StringBuilder(); String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", Constant.CONN_MARK, Key.JDBC_URL, String.class)); String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); + LOG.info("write mode is {}", writeMode); + if (writeMode.toLowerCase().contains("replace")) { + // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)" + writeDataSqlTemplate.append("REPLACE INTO %s (") + .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode)) + .append(" VALUES"); - StringBuilder writeDataSqlTemplate = new StringBuilder(); - writeDataSqlTemplate.append("INSERT INTO %s"); - StringJoiner columnString = new StringJoiner(","); + LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); + } else { + writeDataSqlTemplate.append("INSERT INTO %s"); + StringJoiner columnString = new StringJoiner(","); - for (String column : columns) { - columnString.add(column); + for (String column : columns) { + columnString.add(column); + } + writeDataSqlTemplate.append(String.format("(%s)", columnString)); + writeDataSqlTemplate.append(" VALUES"); + + LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } - writeDataSqlTemplate.append(String.format("(%s)", columnString)); - writeDataSqlTemplate.append(" VALUES"); - - LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); - - originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } -} \ No newline at end of file + + public static String onConFlictDoString(String conflict) { + conflict = conflict.replace("replace", ""); + StringBuilder sb = new StringBuilder(); + sb.append(" ON "); + sb.append(conflict); + return sb.toString(); + } +} From 51b1afeeebf44b76a210abdd9f2f1615657a6d32 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 2 Aug 2023 16:38:39 +0800 Subject: [PATCH 2/6] fix --- databendwriter/doc/databendwriter-CN.md | 12 ++++++++++++ databendwriter/doc/databendwriter.md | 10 ++++++++++ .../util/DatabendWriterUtil.java | 18 ++++++++++-------- .../alibaba/datax/plugin/rdbms/writer/Key.java | 2 ++ 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/databendwriter/doc/databendwriter-CN.md b/databendwriter/doc/databendwriter-CN.md index d6a8f1f3..5b26ed7e 100644 --- a/databendwriter/doc/databendwriter-CN.md +++ b/databendwriter/doc/databendwriter-CN.md @@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, "writer": { "name": "databendwriter", "parameter": { + "writeMode": "replace", + "onConflictColumn": ["id"], "username": "databend", "password": "databend", "column": ["a", "b", "c", "d", "e", "f", "g"], @@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * 必选: 否 * 默认值: 无 * 示例: ["select count(*) from datax.sample1"] +* writeMode + * 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace,务必填写 onConflictColumn 参数 + * 必选:否 + * 默认值:insert + * 示例:"replace" +* onConflictColumn + * 描述:on conflict 字段,指定 writeMode 为 replace 后,需要此参数 + * 必选:否 + * 默认值:无 + * 示例:["id","user"] ### 3.3 类型转化 DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。 diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md index 0b57bf13..1712504f 100644 --- a/databendwriter/doc/databendwriter.md +++ b/databendwriter/doc/databendwriter.md @@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * Description: A list of SQL statements that will be executed after the write operation. * Required: no * Default: none +* writeMode + * Description:The write mode, support `insert` and `replace` two mode. + * Required:否 + * Default:insert + * Example:"replace" +* onConflictColumn + * Description:On conflict fields list. + * Required:否 + * Default:无 + * Example:["id","user"] ### 3.3 Type Convert Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index 10a3e09c..00a7c62a 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -19,7 +19,7 @@ public final class DatabendWriterUtil { public static void dealWriteMode(Configuration originalConfig) { List columns = originalConfig.getList(Key.COLUMN, String.class); - String valueHolders = "?"; + List onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); StringBuilder writeDataSqlTemplate = new StringBuilder(); String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", @@ -28,9 +28,13 @@ public final class DatabendWriterUtil { String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); LOG.info("write mode is {}", writeMode); if (writeMode.toLowerCase().contains("replace")) { + if (onConflictColumns == null || onConflictColumns.size() == 0) { + LOG.error("Replace mode must has onConflictColumn conf"); + return; + } // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)" writeDataSqlTemplate.append("REPLACE INTO %s (") - .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode)) + .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) .append(" VALUES"); LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); @@ -51,11 +55,9 @@ public final class DatabendWriterUtil { } } - public static String onConFlictDoString(String conflict) { - conflict = conflict.replace("replace", ""); - StringBuilder sb = new StringBuilder(); - sb.append(" ON "); - sb.append(conflict); - return sb.toString(); + public static String onConFlictDoString(List conflictColumns) { + return " ON " + + "(" + + StringUtils.join(conflictColumns, ",") + ") "; } } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java index 25a2ab52..3c282d5d 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java @@ -11,6 +11,8 @@ public final class Key { public final static String COLUMN = "column"; + public final static String ONCONFLICT_COLUMN = "onConflictColumn"; + //可选值为:insert,replace,默认为 insert (mysql 支持,oracle 没用 replace 机制,只能 insert,oracle 可以不暴露这个参数) public final static String WRITE_MODE = "writeMode"; From 404b6946c755bf9a661640461f1537f0d3b5c9c4 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 2 Aug 2023 16:41:20 +0800 Subject: [PATCH 3/6] fix --- databendwriter/doc/databendwriter.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md index 1712504f..c92d6387 100644 --- a/databendwriter/doc/databendwriter.md +++ b/databendwriter/doc/databendwriter.md @@ -144,13 +144,13 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * Default: none * writeMode * Description:The write mode, support `insert` and `replace` two mode. - * Required:否 + * Required:no * Default:insert * Example:"replace" * onConflictColumn * Description:On conflict fields list. - * Required:否 - * Default:无 + * Required:no + * Default:none * Example:["id","user"] ### 3.3 Type Convert From 75f2ebc0956e9426ce628a86062273969536a946 Mon Sep 17 00:00:00 2001 From: hantmac Date: Wed, 2 Aug 2023 16:57:00 +0800 Subject: [PATCH 4/6] fix --- .../plugin/writer/databendwriter/util/DatabendWriterUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index 00a7c62a..036baf69 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -32,7 +32,7 @@ public final class DatabendWriterUtil { LOG.error("Replace mode must has onConflictColumn conf"); return; } - // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)" + // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace" writeDataSqlTemplate.append("REPLACE INTO %s (") .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) .append(" VALUES"); From 8bfe3a590cbe787bdde11f6dda72b1ef103c4c7e Mon Sep 17 00:00:00 2001 From: hantmac Date: Thu, 3 Aug 2023 10:04:08 +0800 Subject: [PATCH 5/6] throw exception --- .../writer/databendwriter/DatabendWriter.java | 37 +++++++++---------- .../util/DatabendWriterUtil.java | 5 +-- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java index a4222f08..27e9463d 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java @@ -17,30 +17,30 @@ import java.sql.*; import java.util.List; import java.util.regex.Pattern; -public class DatabendWriter extends Writer -{ +public class DatabendWriter extends Writer { private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend; public static class Job - extends Writer.Job - { + extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig; private CommonRdbmsWriter.Job commonRdbmsWriterMaster; @Override - public void init() - { + public void init() { this.originalConfig = super.getPluginJobConf(); this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster.init(this.originalConfig); // placeholder currently not supported by databend driver, needs special treatment - DatabendWriterUtil.dealWriteMode(this.originalConfig); + try { + DatabendWriterUtil.dealWriteMode(this.originalConfig); + } catch (Exception e) { + LOG.error(e.toString()); + } } @Override - public void preCheck() - { + public void preCheck() { this.init(); this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE); } @@ -67,8 +67,7 @@ public class DatabendWriter extends Writer } - public static class Task extends Writer.Task - { + public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; @@ -76,11 +75,10 @@ public class DatabendWriter extends Writer private CommonRdbmsWriter.Task commonRdbmsWriterSlave; @Override - public void init() - { + public void init() { this.writerSliceConfig = super.getPluginJobConf(); - this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){ + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) { @Override protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { try { @@ -177,8 +175,8 @@ public class DatabendWriter extends Writer case Types.BOOLEAN: - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes case Types.BIT: if (this.dataBaseType == DataBaseType.MySql) { Boolean asBoolean = column.asBoolean(); @@ -224,8 +222,7 @@ public class DatabendWriter extends Writer } @Override - public void destroy() - { + public void destroy() { this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); } @@ -238,9 +235,9 @@ public class DatabendWriter extends Writer public void post() { this.commonRdbmsWriterSlave.post(this.writerSliceConfig); } + @Override - public void startWrite(RecordReceiver lineReceiver) - { + public void startWrite(RecordReceiver lineReceiver) { this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector()); } diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index 036baf69..35071461 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -17,7 +17,7 @@ public final class DatabendWriterUtil { private DatabendWriterUtil() { } - public static void dealWriteMode(Configuration originalConfig) { + public static void dealWriteMode(Configuration originalConfig) throws Exception { List columns = originalConfig.getList(Key.COLUMN, String.class); List onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); StringBuilder writeDataSqlTemplate = new StringBuilder(); @@ -29,8 +29,7 @@ public final class DatabendWriterUtil { LOG.info("write mode is {}", writeMode); if (writeMode.toLowerCase().contains("replace")) { if (onConflictColumns == null || onConflictColumns.size() == 0) { - LOG.error("Replace mode must has onConflictColumn conf"); - return; + throw new Exception("Replace mode must has onConflictColumn config"); } // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace" writeDataSqlTemplate.append("REPLACE INTO %s (") From ae6e6630efbc177727e8b13035671330938ad4e3 Mon Sep 17 00:00:00 2001 From: hantmac Date: Thu, 3 Aug 2023 10:59:26 +0800 Subject: [PATCH 6/6] fix --- .../writer/databendwriter/DatabendWriter.java | 8 ++--- .../DatabendWriterErrorCode.java | 33 +++++++++++++++++++ .../util/DatabendWriterUtil.java | 14 ++++++-- 3 files changed, 47 insertions(+), 8 deletions(-) create mode 100644 databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java index 27e9463d..ddb8fc9a 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java @@ -27,16 +27,12 @@ public class DatabendWriter extends Writer { private CommonRdbmsWriter.Job commonRdbmsWriterMaster; @Override - public void init() { + public void init() throws DataXException { this.originalConfig = super.getPluginJobConf(); this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster.init(this.originalConfig); // placeholder currently not supported by databend driver, needs special treatment - try { - DatabendWriterUtil.dealWriteMode(this.originalConfig); - } catch (Exception e) { - LOG.error(e.toString()); - } + DatabendWriterUtil.dealWriteMode(this.originalConfig); } @Override diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java new file mode 100644 index 00000000..21cbf428 --- /dev/null +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.writer.databendwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + + +public enum DatabendWriterErrorCode implements ErrorCode { + CONF_ERROR("DatabendWriter-00", "配置错误."), + WRITE_DATA_ERROR("DatabendWriter-01", "写入数据时失败."), + ; + + private final String code; + private final String description; + + private DatabendWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} \ No newline at end of file diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index 35071461..516a75eb 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -1,13 +1,16 @@ package com.alibaba.datax.plugin.writer.databendwriter.util; +import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.writer.databendwriter.DatabendWriterErrorCode; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.crypto.Data; import java.util.List; import java.util.StringJoiner; @@ -17,7 +20,7 @@ public final class DatabendWriterUtil { private DatabendWriterUtil() { } - public static void dealWriteMode(Configuration originalConfig) throws Exception { + public static void dealWriteMode(Configuration originalConfig) throws DataXException { List columns = originalConfig.getList(Key.COLUMN, String.class); List onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); StringBuilder writeDataSqlTemplate = new StringBuilder(); @@ -29,8 +32,14 @@ public final class DatabendWriterUtil { LOG.info("write mode is {}", writeMode); if (writeMode.toLowerCase().contains("replace")) { if (onConflictColumns == null || onConflictColumns.size() == 0) { - throw new Exception("Replace mode must has onConflictColumn config"); + throw DataXException + .asDataXException( + DatabendWriterErrorCode.CONF_ERROR, + String.format( + "Replace mode must has onConflictColumn config." + )); } + // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace" writeDataSqlTemplate.append("REPLACE INTO %s (") .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) @@ -52,6 +61,7 @@ public final class DatabendWriterUtil { originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } + } public static String onConFlictDoString(List conflictColumns) {