diff --git a/databendwriter/doc/databendwriter-CN.md b/databendwriter/doc/databendwriter-CN.md index d6a8f1f3..5b26ed7e 100644 --- a/databendwriter/doc/databendwriter-CN.md +++ b/databendwriter/doc/databendwriter-CN.md @@ -79,6 +79,8 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, "writer": { "name": "databendwriter", "parameter": { + "writeMode": "replace", + "onConflictColumn": ["id"], "username": "databend", "password": "databend", "column": ["a", "b", "c", "d", "e", "f", "g"], @@ -149,6 +151,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * 必选: 否 * 默认值: 无 * 示例: ["select count(*) from datax.sample1"] +* writeMode + * 描述:写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace,务必填写 onConflictColumn 参数 + * 必选:否 + * 默认值:insert + * 示例:"replace" +* onConflictColumn + * 描述:on conflict 字段,指定 writeMode 为 replace 后,需要此参数 + * 必选:否 + * 默认值:无 + * 示例:["id","user"] ### 3.3 类型转化 DataX中的数据类型可以转换为databend中的相应数据类型。下表显示了两种类型之间的对应关系。 diff --git a/databendwriter/doc/databendwriter.md b/databendwriter/doc/databendwriter.md index 0b57bf13..c92d6387 100644 --- a/databendwriter/doc/databendwriter.md +++ b/databendwriter/doc/databendwriter.md @@ -142,6 +142,16 @@ create table if not exsits datax.sample1(a string, b int64, c date, d timestamp, * Description: A list of SQL statements that will be executed after the write operation. * Required: no * Default: none +* writeMode + * Description:The write mode, support `insert` and `replace` two mode. + * Required:no + * Default:insert + * Example:"replace" +* onConflictColumn + * Description:On conflict fields list. + * Required:no + * Default:none + * Example:["id","user"] ### 3.3 Type Convert Data types in datax can be converted to the corresponding data types in databend. The following table shows the correspondence between the two types. diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml index 9ddc735c..b99ca5d8 100644 --- a/databendwriter/pom.xml +++ b/databendwriter/pom.xml @@ -17,7 +17,7 @@ com.databend databend-jdbc - 0.0.7 + 0.1.0 com.alibaba.datax diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java index a4222f08..ddb8fc9a 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriter.java @@ -17,20 +17,17 @@ import java.sql.*; import java.util.List; import java.util.regex.Pattern; -public class DatabendWriter extends Writer -{ +public class DatabendWriter extends Writer { private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend; public static class Job - extends Writer.Job - { + extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig; private CommonRdbmsWriter.Job commonRdbmsWriterMaster; @Override - public void init() - { + public void init() throws DataXException { this.originalConfig = super.getPluginJobConf(); this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster.init(this.originalConfig); @@ -39,8 +36,7 @@ public class DatabendWriter extends Writer } @Override - public void preCheck() - { + public void preCheck() { this.init(); this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE); } @@ -67,8 +63,7 @@ public class DatabendWriter extends Writer } - public static class Task extends Writer.Task - { + public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; @@ -76,11 +71,10 @@ public class DatabendWriter extends Writer private CommonRdbmsWriter.Task commonRdbmsWriterSlave; @Override - public void init() - { + public void init() { this.writerSliceConfig = super.getPluginJobConf(); - this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend){ + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend) { @Override protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { try { @@ -177,8 +171,8 @@ public class DatabendWriter extends Writer case Types.BOOLEAN: - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes case Types.BIT: if (this.dataBaseType == DataBaseType.MySql) { Boolean asBoolean = column.asBoolean(); @@ -224,8 +218,7 @@ public class DatabendWriter extends Writer } @Override - public void destroy() - { + public void destroy() { this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); } @@ -238,9 +231,9 @@ public class DatabendWriter extends Writer public void post() { this.commonRdbmsWriterSlave.post(this.writerSliceConfig); } + @Override - public void startWrite(RecordReceiver lineReceiver) - { + public void startWrite(RecordReceiver lineReceiver) { this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector()); } diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java new file mode 100644 index 00000000..21cbf428 --- /dev/null +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/DatabendWriterErrorCode.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.writer.databendwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + + +public enum DatabendWriterErrorCode implements ErrorCode { + CONF_ERROR("DatabendWriter-00", "配置错误."), + WRITE_DATA_ERROR("DatabendWriter-01", "写入数据时失败."), + ; + + private final String code; + private final String description; + + private DatabendWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} \ No newline at end of file diff --git a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java index a862e920..516a75eb 100644 --- a/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java +++ b/databendwriter/src/main/java/com/alibaba/datax/plugin/writer/databendwriter/util/DatabendWriterUtil.java @@ -1,40 +1,72 @@ package com.alibaba.datax.plugin.writer.databendwriter.util; + +import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.writer.databendwriter.DatabendWriterErrorCode; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.crypto.Data; import java.util.List; import java.util.StringJoiner; -public final class DatabendWriterUtil -{ +public final class DatabendWriterUtil { private static final Logger LOG = LoggerFactory.getLogger(DatabendWriterUtil.class); - private DatabendWriterUtil() {} - public static void dealWriteMode(Configuration originalConfig) - { + private DatabendWriterUtil() { + } + + public static void dealWriteMode(Configuration originalConfig) throws DataXException { List columns = originalConfig.getList(Key.COLUMN, String.class); + List onConflictColumns = originalConfig.getList(Key.ONCONFLICT_COLUMN, String.class); + StringBuilder writeDataSqlTemplate = new StringBuilder(); String jdbcUrl = originalConfig.getString(String.format("%s[0].%s", Constant.CONN_MARK, Key.JDBC_URL, String.class)); String writeMode = originalConfig.getString(Key.WRITE_MODE, "INSERT"); + LOG.info("write mode is {}", writeMode); + if (writeMode.toLowerCase().contains("replace")) { + if (onConflictColumns == null || onConflictColumns.size() == 0) { + throw DataXException + .asDataXException( + DatabendWriterErrorCode.CONF_ERROR, + String.format( + "Replace mode must has onConflictColumn config." + )); + } - StringBuilder writeDataSqlTemplate = new StringBuilder(); - writeDataSqlTemplate.append("INSERT INTO %s"); - StringJoiner columnString = new StringJoiner(","); + // for databend if you want to use replace mode, the writeMode should be: "writeMode": "replace" + writeDataSqlTemplate.append("REPLACE INTO %s (") + .append(StringUtils.join(columns, ",")).append(") ").append(onConFlictDoString(onConflictColumns)) + .append(" VALUES"); - for (String column : columns) { - columnString.add(column); + LOG.info("Replace data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); + } else { + writeDataSqlTemplate.append("INSERT INTO %s"); + StringJoiner columnString = new StringJoiner(","); + + for (String column : columns) { + columnString.add(column); + } + writeDataSqlTemplate.append(String.format("(%s)", columnString)); + writeDataSqlTemplate.append(" VALUES"); + + LOG.info("Insert data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); + + originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } - writeDataSqlTemplate.append(String.format("(%s)", columnString)); - writeDataSqlTemplate.append(" VALUES"); - LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); - - originalConfig.set(Constant.INSERT_OR_REPLACE_TEMPLATE_MARK, writeDataSqlTemplate); } -} \ No newline at end of file + + public static String onConFlictDoString(List conflictColumns) { + return " ON " + + "(" + + StringUtils.join(conflictColumns, ",") + ") "; + } +} diff --git a/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar new file mode 100644 index 00000000..38162912 Binary files /dev/null and b/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar differ diff --git a/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar new file mode 100644 index 00000000..38162912 Binary files /dev/null and b/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar differ diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java index 25a2ab52..3c282d5d 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java @@ -11,6 +11,8 @@ public final class Key { public final static String COLUMN = "column"; + public final static String ONCONFLICT_COLUMN = "onConflictColumn"; + //可选值为:insert,replace,默认为 insert (mysql 支持,oracle 没用 replace 机制,只能 insert,oracle 可以不暴露这个参数) public final static String WRITE_MODE = "writeMode";