From 30125cec996bb0f1067a9e1d6b6026dced45a879 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 1 Aug 2023 18:44:28 +0800 Subject: [PATCH] feat: databendWriter support replace mode --- databendwriter/pom.xml | 2 +- .../util/DatabendWriterUtil.java | 55 +++++++++++++------ 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml index 9ddc735c..b99ca5d8 100644 --- a/databendwriter/pom.xml +++ b/databendwriter/pom.xml @@ -17,7 +17,7 @@ com.databend databend-jdbc - 0.0.7 + 0.1.0 com.alibaba.datax diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index a862e920..10a3e09c 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -1,40 +1,61 @@ package com.alibaba.datax.plugin.writer.databendwriter.util; + import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.StringJoiner; -public final class DatabendWriterUtil -{ +public final class DatabendWriterUtil { private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class); - private DatabendWriterUtil() {} - public static void dealWriteMode(Configuration originalConfig) - { + private DatabendWriterUtil() { + } + + public static void dealWriteMode(Configuration originalConfig) { List columns = originalConfig.getList(Key.COLUMN, String.class); + String valueHolders = "?"; + StringBuilder writeDataSqlTemplate = new StringBuilder(); String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", Constant.CONN_MARK, Key.JDBC_URL, String.class)); String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); + LOG.info("write mode is {}", writeMode); + if (writeMode.toLowerCase().contains("replace")) { + // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)" + writeDataSqlTemplate.append("REPLACE INTO %s (") + .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode)) + .append(" VALUES"); - StringBuilder writeDataSqlTemplate = new StringBuilder(); - writeDataSqlTemplate.append("INSERT INTO %s"); - StringJoiner columnString = new StringJoiner(","); + LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); + } else { + writeDataSqlTemplate.append("INSERT INTO %s"); + StringJoiner columnString = new StringJoiner(","); - for (String column : columns) { - columnString.add(column); + for (String column : columns) { + columnString.add(column); + } + writeDataSqlTemplate.append(String.format("(%s)", columnString)); + writeDataSqlTemplate.append(" VALUES"); + + LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } - writeDataSqlTemplate.append(String.format("(%s)", columnString)); - writeDataSqlTemplate.append(" VALUES"); - - LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); - - originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } -} \ No newline at end of file + + public static String onConFlictDoString(String conflict) { + conflict = conflict.replace("replace", ""); + StringBuilder sb = new StringBuilder(); + sb.append(" ON "); + sb.append(conflict); + return sb.toString(); + } +}