This commit is contained in:
hantmac 2023-08-02 16:38:39 +08:00
parent 30125cec99
commit 51b1afeeeb
4 changed files with 34 additions and 8 deletions

View File

@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
"writer": {
"name": "databendwriter",
"parameter": {
"writeMode": "replace",
"onConflictColumn": ["id"],
"username": "databend",
"password": "databend",
"column": ["a", "b", "c", "d", "e", "f", "g"],
@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* 必选: 否
* 默认值: 无
* 示例: ["select count(*) from datax.sample1"]
* writeMode
* 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace务必填写 onConflictColumn 参数
* 必选:否
* 默认值insert
* 示例:"replace"
* onConflictColumn
* 描述on conflict 字段,指定 writeMode 为 replace 后,需要此参数
* 必选:否
* 默认值:无
* 示例:["id","user"]
### 3.3 类型转化
DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。

View File

@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp,
* Description: A list of SQL statements that will be executed after the write operation.
* Required: no
* Default: none
* writeMode
* DescriptionThe write mode, support `insert` and `replace` two mode.
* Required
* Defaultinsert
* Example"replace"
* onConflictColumn
* DescriptionOn conflict fields list.
* Required
* Default
* Example["id","user"]
### 3.3 Type Convert
Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types.

View File

@ -19,7 +19,7 @@ public final class DatabendWriterUtil {
public static void dealWriteMode(Configuration originalConfig) {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
String valueHolders = "?";
List<String> onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class);
StringBuilder writeDataSqlTemplate = new StringBuilder();
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
@ -28,9 +28,13 @@ public final class DatabendWriterUtil {
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) {
if (onConflictColumns == null || onConflictColumns.size() == 0) {
LOG.error("Replace mode must has onConflictColumn conf");
return;
}
// for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)"
writeDataSqlTemplate.append("REPLACE INTO %s (")
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode))
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns))
.append(" VALUES");
LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
@ -51,11 +55,9 @@ public final class DatabendWriterUtil {
}
}
public static String onConFlictDoString(String conflict) {
conflict = conflict.replace("replace", "");
StringBuilder sb = new StringBuilder();
sb.append(" ON ");
sb.append(conflict);
return sb.toString();
public static String onConFlictDoString(List<String> conflictColumns) {
return " ON " +
"(" +
StringUtils.join(conflictColumns, ",") + ") ";
}
}

View File

@ -11,6 +11,8 @@ public final class Key {
public final static String COLUMN = "column";
public final static String ONCONFLICT_COLUMN = "onConflictColumn";
//可选值为insert,replace默认为 insert mysql 支持oracle 没用 replace 机制只能 insert,oracle 可以不暴露这个参数
public final static String WRITE_MODE = "writeMode";