diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml
index 9ddc735c..b99ca5d8 100644
--- a/databendwriter/pom.xml
+++ b/databendwriter/pom.xml
@@ -17,7 +17,7 @@
com.databend
databend-jdbc
- 0.0.7
+ 0.1.0
com.alibaba.datax
diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java
index a862e920..10a3e09c 100644
--- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java
+++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java
@@ -1,40 +1,61 @@
package com.alibaba.datax.plugin.writer.databendwriter.util;
+
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.StringJoiner;
-public final class DatabendWriterUtil
-{
+public final class DatabendWriterUtil {
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
- private DatabendWriterUtil() {}
- public static void dealWriteMode(Configuration originalConfig)
- {
+ private DatabendWriterUtil() {
+ }
+
+ public static void dealWriteMode(Configuration originalConfig) {
List columns = originalConfig.getList(Key.COLUMN, String.class);
+ String valueHolders = "?";
+ StringBuilder writeDataSqlTemplate = new StringBuilder();
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
+ LOG.info("write mode is {}", writeMode);
+ if (writeMode.toLowerCase().contains("replace")) {
+ // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)"
+ writeDataSqlTemplate.append("REPLACE INTO %s (")
+ .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode))
+ .append(" VALUES");
- StringBuilder writeDataSqlTemplate = new StringBuilder();
- writeDataSqlTemplate.append("INSERT INTO %s");
- StringJoiner columnString = new StringJoiner(",");
+ LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
+ originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
+ } else {
+ writeDataSqlTemplate.append("INSERT INTO %s");
+ StringJoiner columnString = new StringJoiner(",");
- for (String column : columns) {
- columnString.add(column);
+ for (String column : columns) {
+ columnString.add(column);
+ }
+ writeDataSqlTemplate.append(String.format("(%s)", columnString));
+ writeDataSqlTemplate.append(" VALUES");
+
+ LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
+
+ originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
- writeDataSqlTemplate.append(String.format("(%s)", columnString));
- writeDataSqlTemplate.append(" VALUES");
-
- LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
-
- originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
-}
\ No newline at end of file
+
+ public static String onConFlictDoString(String conflict) {
+ conflict = conflict.replace("replace", "");
+ StringBuilder sb = new StringBuilder();
+ sb.append(" ON ");
+ sb.append(conflict);
+ return sb.toString();
+ }
+}