feat: databendWriter support replace mode

This commit is contained in:
hantmac 2023-08-01 18:44:28 +08:00
parent 82680c4c63
commit 30125cec99
2 changed files with 39 additions and 18 deletions

View File

@ -17,7 +17,7 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>

View File

@ -1,40 +1,61 @@
package com.alibaba.datax.plugin.writer.databendwriter.util;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.StringJoiner;
public final class DatabendWriterUtil
{
public final class DatabendWriterUtil {
private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class);
private DatabendWriterUtil() {}
public static void dealWriteMode(Configuration originalConfig)
{
private DatabendWriterUtil() {
}
public static void dealWriteMode(Configuration originalConfig) {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
String valueHolders = "?";
StringBuilder writeDataSqlTemplate = new StringBuilder();
String jdbcUrl = originalConfig.getString(String.format("%s[0].%s",
Constant.CONN_MARK, Key.JDBC_URL, String.class));
String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT");
LOG.info("write mode is {}", writeMode);
if (writeMode.toLowerCase().contains("replace")) {
// for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace (userid)"
writeDataSqlTemplate.append("REPLACE INTO %s (")
.append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(writeMode))
.append(" VALUES");
StringBuilder writeDataSqlTemplate = new StringBuilder();
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
} else {
writeDataSqlTemplate.append("INSERT INTO %s");
StringJoiner columnString = new StringJoiner(",");
for (String column : columns) {
columnString.add(column);
for (String column : columns) {
columnString.add(column);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
writeDataSqlTemplate.append(String.format("(%s)", columnString));
writeDataSqlTemplate.append(" VALUES");
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);
originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate);
}
}
public static String onConFlictDoString(String conflict) {
conflict = conflict.replace("replace", "");
StringBuilder sb = new StringBuilder();
sb.append(" ON ");
sb.append(conflict);
return sb.toString();
}
}