From d9f2f4aa0d09c99c6d000ecdca069b16a3338c7e Mon Sep 17 00:00:00 2001 From: "jiye.tjy" Date: Tue, 14 Apr 2020 21:22:23 +0800 Subject: [PATCH] Add Clickhouse Writer --- clickhousewriter/pom.xml | 94 +++++ .../src/main/assembly/package.xml | 35 ++ .../clickhousewriter/ClickhouseWriter.java | 330 ++++++++++++++++++ .../ClickhouseWriterErrorCode.java | 31 ++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 21 ++ package.xml | 7 + .../datax/plugin/rdbms/util/DataBaseType.java | 7 +- pom.xml | 1 + 9 files changed, 531 insertions(+), 1 deletion(-) create mode 100644 clickhousewriter/pom.xml create mode 100755 clickhousewriter/src/main/assembly/package.xml create mode 100644 clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java create mode 100644 clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriterErrorCode.java create mode 100755 clickhousewriter/src/main/resources/plugin.json create mode 100644 clickhousewriter/src/main/resources/plugin_job_template.json diff --git a/clickhousewriter/pom.xml b/clickhousewriter/pom.xml new file mode 100644 index 00000000..41b956f4 --- /dev/null +++ b/clickhousewriter/pom.xml @@ -0,0 +1,94 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + clickhousewriter + clickhousewriter + jar + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.4 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + com.alibaba.datax + simulator + ${datax-project-version} + test + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + \ No newline at end of file diff --git a/clickhousewriter/src/main/assembly/package.xml b/clickhousewriter/src/main/assembly/package.xml new file mode 100755 index 00000000..d1128bd1 --- /dev/null +++ b/clickhousewriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/clickhousewriter + + + target/ + + clickhousewriter-0.0.1-SNAPSHOT.jar + + plugin/writer/clickhousewriter + + + + + + false + plugin/writer/clickhousewriter/libs + runtime + + + diff --git a/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java new file mode 100644 index 00000000..6b2de4a3 --- /dev/null +++ b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java @@ -0,0 +1,330 @@ +package com.alibaba.datax.plugin.writer.clickhousewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import ru.yandex.clickhouse.ClickHouseTuple; + +import java.sql.Array; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.List; +import java.util.regex.Pattern; + +public class ClickhouseWriter extends Writer { + private static final DataBaseType DATABASE_TYPE = DataBaseType.ClickHouse; + + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterMaster.init(this.originalConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterMaster.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsWriterMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterMaster.destroy(this.originalConfig); + } + } + + public static class Task extends Writer.Task { + private Configuration writerSliceConfig; + + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) { + @Override + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, Column column) throws SQLException { + try { + if (column.getRawData() == null) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + return preparedStatement; + } + + java.util.Date utilDate; + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + break; + + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + } else { + switch (columnSqltype) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + case Types.BIGINT: + preparedStatement.setLong(columnIndex + 1, column.asLong()); + break; + case Types.DECIMAL: + preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); + break; + case Types.REAL: + case Types.FLOAT: + preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); + break; + case Types.DOUBLE: + preparedStatement.setDouble(columnIndex + 1, column.asDouble()); + break; + } + } + break; + + case Types.DATE: + if (this.resultSetMetaData.getRight().get(columnIndex) + .equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + } + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + } + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + Timestamp sqlTimestamp = null; + if (column instanceof StringColumn && column.asString() != null) { + String timeStampStr = column.asString(); + // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 + String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; + boolean isMatch = Pattern.matches(pattern, timeStampStr); + if (isMatch) { + sqlTimestamp = Timestamp.valueOf(timeStampStr); + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + } + } + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + + case Types.BOOLEAN: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + Boolean asBoolean = column.asBoolean(); + if (asBoolean != null) { + preparedStatement.setBoolean(columnIndex + 1, asBoolean); + } else { + preparedStatement.setNull(columnIndex + 1, Types.BIT); + } + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + + default: + boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement, + columnIndex, columnSqltype, column); + if (isHandled) { + break; + } + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } catch (DataXException e) { + // fix类型转换或者溢出失败时,将具体哪一列打印出来 + if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || + e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { + throw DataXException + .asDataXException( + e.getErrorCode(), + String.format( + "类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } else { + throw e; + } + } + } + + private Object toJavaArray(Object val) { + if (null == val) { + return null; + } else if (val instanceof JSONArray) { + Object[] valArray = ((JSONArray) val).toArray(); + for (int i = 0; i < valArray.length; i++) { + valArray[i] = this.toJavaArray(valArray[i]); + } + return valArray; + } else { + return val; + } + } + + boolean fillPreparedStatementColumnType4CustomType(PreparedStatement ps, + int columnIndex, int columnSqltype, + Column column) throws SQLException { + switch (columnSqltype) { + case Types.OTHER: + if (this.resultSetMetaData.getRight().get(columnIndex).startsWith("Tuple")) { + throw DataXException + .asDataXException(ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR, ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR.getDescription()); + } else { + ps.setString(columnIndex + 1, column.asString()); + } + return true; + + case Types.ARRAY: + Connection conn = ps.getConnection(); + List values = JSON.parseArray(column.asString(), Object.class); + for (int i = 0; i < values.size(); i++) { + values.set(i, this.toJavaArray(values.get(i))); + } + Array array = conn.createArrayOf("String", values.toArray()); + ps.setArray(columnIndex + 1, array); + return true; + + default: + break; + } + + return false; + } + }; + + this.commonRdbmsWriterSlave.init(this.writerSliceConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig); + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); + } + + @Override + public void post() { + this.commonRdbmsWriterSlave.post(this.writerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); + } + } + +} \ No newline at end of file diff --git a/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriterErrorCode.java b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriterErrorCode.java new file mode 100644 index 00000000..4fc63ae1 --- /dev/null +++ b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriterErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.clickhousewriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum ClickhouseWriterErrorCode implements ErrorCode { + TUPLE_NOT_SUPPORTED_ERROR("ClickhouseWriter-00", "不支持TUPLE类型导入."), + ; + + private final String code; + private final String description; + + private ClickhouseWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/clickhousewriter/src/main/resources/plugin.json b/clickhousewriter/src/main/resources/plugin.json new file mode 100755 index 00000000..ff1acf01 --- /dev/null +++ b/clickhousewriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "clickhousewriter", + "class": "com.alibaba.datax.plugin.writer.clickhousewriter.ClickhouseWriter", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql.", + "developer": "jiye.tjy" +} \ No newline at end of file diff --git a/clickhousewriter/src/main/resources/plugin_job_template.json b/clickhousewriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..2e1ceed0 --- /dev/null +++ b/clickhousewriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,21 @@ +{ + "name": "clickhousewriter", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:clickhouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [], + + "batchSize": 65536, + "batchByteSize": 134217728, + "dryRun": false, + "writeMode": "insert" + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 42185f54..6d97b372 100755 --- a/package.xml +++ b/package.xml @@ -357,5 +357,12 @@ datax + + clickhousewriter/target/datax/ + + **/*.* + + datax + diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java index 55d9e47b..ea11d99b 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java @@ -18,7 +18,8 @@ public enum DataBaseType { PostgreSQL("postgresql", "org.postgresql.Driver"), RDBMS("rdbms", "com.alibaba.datax.plugin.rdbms.util.DataBaseType"), DB2("db2", "com.ibm.db2.jcc.DB2Driver"), - ADS("ads","com.mysql.jdbc.Driver"); + ADS("ads","com.mysql.jdbc.Driver"), + ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"); private String typeName; @@ -54,6 +55,8 @@ public enum DataBaseType { break; case PostgreSQL: break; + case ClickHouse: + break; case RDBMS: break; default: @@ -91,6 +94,8 @@ public enum DataBaseType { break; case PostgreSQL: break; + case ClickHouse: + break; case RDBMS: break; default: diff --git a/pom.xml b/pom.xml index dc006b5a..d720533a 100755 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ adbpgwriter gdbwriter cassandrawriter + clickhousewriter plugin-rdbms-util plugin-unstructured-storage-util