Merge pull request #644 from tujiye/datax_clickhouse_writer

Add Clickhouse Writer
This commit is contained in:
Trafalgar 2020-04-20 15:00:47 +08:00 committed by GitHub
commit 74b776cdc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 531 additions and 1 deletions

94
clickhousewriter/pom.xml Normal file
View File

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>clickhousewriter</artifactId>
<name>clickhousewriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>simulator</artifactId>
<version>${datax-project-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/clickhousewriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>clickhousewriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/clickhousewriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/clickhousewriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,330 @@
package com.alibaba.datax.plugin.writer.clickhousewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import ru.yandex.clickhouse.ClickHouseTuple;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.List;
import java.util.regex.Pattern;
public class ClickhouseWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.ClickHouse;
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
}
@Override
public void prepare() {
this.commonRdbmsWriterMaster.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
}
@Override
public void post() {
this.commonRdbmsWriterMaster.post(this.originalConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterMaster.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) {
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, Column column) throws SQLException {
try {
if (column.getRawData() == null) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
return preparedStatement;
}
java.util.Date utilDate;
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
break;
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setNull(columnIndex + 1, columnSqltype);
} else {
switch (columnSqltype) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
case Types.BIGINT:
preparedStatement.setLong(columnIndex + 1, column.asLong());
break;
case Types.DECIMAL:
preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal());
break;
case Types.REAL:
case Types.FLOAT:
preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue());
break;
case Types.DOUBLE:
preparedStatement.setDouble(columnIndex + 1, column.asDouble());
break;
}
}
break;
case Types.DATE:
if (this.resultSetMetaData.getRight().get(columnIndex)
.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
break;
case Types.TIMESTAMP:
Timestamp sqlTimestamp = null;
if (column instanceof StringColumn && column.asString() != null) {
String timeStampStr = column.asString();
// JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+";
boolean isMatch = Pattern.matches(pattern, timeStampStr);
if (isMatch) {
sqlTimestamp = Timestamp.valueOf(timeStampStr);
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
}
}
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
break;
case Types.BOOLEAN:
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
Boolean asBoolean = column.asBoolean();
if (asBoolean != null) {
preparedStatement.setBoolean(columnIndex + 1, asBoolean);
} else {
preparedStatement.setNull(columnIndex + 1, Types.BIT);
}
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
default:
boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement,
columnIndex, columnSqltype, column);
if (isHandled) {
break;
}
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
} catch (DataXException e) {
// fix类型转换或者溢出失败时将具体哪一列打印出来
if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT ||
e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) {
throw DataXException
.asDataXException(
e.getErrorCode(),
String.format(
"类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
} else {
throw e;
}
}
}
private Object toJavaArray(Object val) {
if (null == val) {
return null;
} else if (val instanceof JSONArray) {
Object[] valArray = ((JSONArray) val).toArray();
for (int i = 0; i < valArray.length; i++) {
valArray[i] = this.toJavaArray(valArray[i]);
}
return valArray;
} else {
return val;
}
}
boolean fillPreparedStatementColumnType4CustomType(PreparedStatement ps,
int columnIndex, int columnSqltype,
Column column) throws SQLException {
switch (columnSqltype) {
case Types.OTHER:
if (this.resultSetMetaData.getRight().get(columnIndex).startsWith("Tuple")) {
throw DataXException
.asDataXException(ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR, ClickhouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR.getDescription());
} else {
ps.setString(columnIndex + 1, column.asString());
}
return true;
case Types.ARRAY:
Connection conn = ps.getConnection();
List<Object> values = JSON.parseArray(column.asString(), Object.class);
for (int i = 0; i < values.size(); i++) {
values.set(i, this.toJavaArray(values.get(i)));
}
Array array = conn.createArrayOf("String", values.toArray());
ps.setArray(columnIndex + 1, array);
return true;
default:
break;
}
return false;
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}
@Override
public void prepare() {
this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
}
@Override
public void startWrite(RecordReceiver recordReceiver) {
this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
}
@Override
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}
@Override
public void destroy() {
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}
}
}

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.writer.clickhousewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum ClickhouseWriterErrorCode implements ErrorCode {
TUPLE_NOT_SUPPORTED_ERROR("ClickhouseWriter-00", "不支持TUPLE类型导入."),
;
private final String code;
private final String description;
private ClickhouseWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,6 @@
{
"name": "clickhousewriter",
"class": "com.alibaba.datax.plugin.writer.clickhousewriter.ClickhouseWriter",
"description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql.",
"developer": "jiye.tjy"
}

View File

@ -0,0 +1,21 @@
{
"name": "clickhousewriter",
"parameter": {
"username": "username",
"password": "password",
"column": ["col1", "col2", "col3"],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://<host>:<port>[/<database>]",
"table": ["table1", "table2"]
}
],
"preSql": [],
"postSql": [],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}

View File

@ -357,5 +357,12 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>clickhousewriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -18,7 +18,8 @@ public enum DataBaseType {
PostgreSQL("postgresql", "org.postgresql.Driver"),
RDBMS("rdbms", "com.alibaba.datax.plugin.rdbms.util.DataBaseType"),
DB2("db2", "com.ibm.db2.jcc.DB2Driver"),
ADS("ads","com.mysql.jdbc.Driver");
ADS("ads","com.mysql.jdbc.Driver"),
ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver");
private String typeName;
@ -54,6 +55,8 @@ public enum DataBaseType {
break;
case PostgreSQL:
break;
case ClickHouse:
break;
case RDBMS:
break;
default:
@ -91,6 +94,8 @@ public enum DataBaseType {
break;
case PostgreSQL:
break;
case ClickHouse:
break;
case RDBMS:
break;
default:

View File

@ -93,6 +93,7 @@
<module>adbpgwriter</module>
<module>gdbwriter</module>
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>