diff --git a/databendwriter/doc/databendwriter-CN.md b/databendwriter/doc/databendwriter-CN.md index d6a8f1f3..5b26ed7e 100644 --- a/databendwriter/doc/databendwriter-CN.md +++ b/databendwriter/doc/databendwriter-CN.md @@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, "writer": { "name": "databendwriter", "parameter": { + "writeMode": "replace", + "onConflictColumn": ["id"], "username": "databend", "password": "databend", "column": ["a", "b", "c", "d", "e", "f", "g"], @@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * 必选: 否 * 默认值: 无 * 示例: ["select count(*) from datax.sample1"] +* writeMode + * 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace,务必填写 onConflictColumn 参数 + * 必选:否 + * 默认值:insert + * 示例:"replace" +* onConflictColumn + * 描述:on conflict 字段,指定 writeMode 为 replace 后,需要此参数 + * 必选:否 + * 默认值:无 + * 示例:["id","user"] ### 3.3 类型转化 DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。 diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md index 0b57bf13..1712504f 100644 --- a/databendwriter/doc/databendwriter.md +++ b/databendwriter/doc/databendwriter.md @@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * Description: A list of SQL statements that will be executed after the write operation. * Required: no * Default: none +* writeMode + * Description:The write mode, support `insert` and `replace` two mode. + * Required:否 + * Default:insert + * Example:"replace" +* onConflictColumn + * Description:On conflict fields list. + * Required:否 + * Default:无 + * Example:["id","user"] ### 3.3 Type Convert Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index 10a3e09c..00a7c62a 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -19,7 +19,7 @@ public final class DatabendWriterUtil { public static void dealWriteMode(Configuration originalConfig) { List columns = originalConfig.getList(Key.COLUMN, String.class); - String valueHolders = "?"; + List onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); StringBuilder writeDataSqlTemplate = new StringBuilder(); String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", @@ -28,9 +28,13 @@ public final class DatabendWriterUtil { String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); LOG.info("write mode is {}", writeMode); if (writeMode.toLowerCase().contains("replace")) { + if (onConflictColumns == null || onConflictColumns.size() == 0) { + LOG.error("Replace mode must has onConflictColumn conf"); + return; + } // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)" writeDataSqlTemplate.append("REPLACE INTO %s (") - .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode)) + .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) .append(" VALUES"); LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); @@ -51,11 +55,9 @@ public final class DatabendWriterUtil { } } - public static String onConFlictDoString(String conflict) { - conflict = conflict.replace("replace", ""); - StringBuilder sb = new StringBuilder(); - sb.append(" ON "); - sb.append(conflict); - return sb.toString(); + public static String onConFlictDoString(List conflictColumns) { + return " ON " + + "(" + + StringUtils.join(conflictColumns, ",") + ") "; } } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java index 25a2ab52..3c282d5d 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java @@ -11,6 +11,8 @@ public final class Key { public final static String COLUMN = "column"; + public final static String ONCONFLICT_COLUMN = "onConflictColumn"; + //可选值为:insert,replace,默认为 insert (mysql 支持,oracle 没用 replace 机制,只能 insert,oracle 可以不暴露这个参数) public final static String WRITE_MODE = "writeMode";