diff --git a/core/src/main/bin/perftrace.py b/core/src/main/bin/perftrace.py index 41a1ecb3..b9c79a43 100755 --- a/core/src/main/bin/perftrace.py +++ b/core/src/main/bin/perftrace.py @@ -174,6 +174,9 @@ def parsePluginName(jdbcUrl, pluginType): db2Regex = re.compile('jdbc:(db2)://.*') if (db2Regex.match(jdbcUrl)): name = 'db2' + kingbaseesRegex = re.compile('jdbc:(kingbase8)://.*') + if (kingbaseesRegex.match(jdbcUrl)): + name = 'kingbasees' return "%s%s" % (name, pluginType) def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1): diff --git a/hbase20xsqlreader/doc/hbase20xsqlreader.md b/hbase20xsqlreader/doc/hbase20xsqlreader.md index 9df020cc..43c42bf7 100644 --- a/hbase20xsqlreader/doc/hbase20xsqlreader.md +++ b/hbase20xsqlreader/doc/hbase20xsqlreader.md @@ -58,7 +58,9 @@ hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据,对应版本 * **queryServerAddress** * 描述:hbase20xsqlreader需要通过Phoenix轻客户端去连接Phoenix QueryServer,因此这里需要填写对应QueryServer地址。 - + 增强版/Lindorm 用户若需透传user, password参数,可以在queryServerAddress后增加对应可选属性. + 格式参考:http://127.0.0.1:8765;user=root;password=root + * 必选:是
* 默认值:无
diff --git a/hbase20xsqlreader/pom.xml b/hbase20xsqlreader/pom.xml index ec1c3419..818123f3 100644 --- a/hbase20xsqlreader/pom.xml +++ b/hbase20xsqlreader/pom.xml @@ -14,7 +14,7 @@ jar - 5.1.0-HBase-2.0.0.2 + 5.2.5-HBase-2.x diff --git a/hbase20xsqlwriter/doc/hbase20xsqlwriter.md b/hbase20xsqlwriter/doc/hbase20xsqlwriter.md index 2cc8cb41..e1f4e2f1 100644 --- a/hbase20xsqlwriter/doc/hbase20xsqlwriter.md +++ b/hbase20xsqlwriter/doc/hbase20xsqlwriter.md @@ -120,7 +120,9 @@ HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功 * **queryServerAddress** - * 描述:Phoenix QueryServer地址,为必填项,格式:http://${hostName}:${ip},如http://172.16.34.58:8765 + * 描述:Phoenix QueryServer地址,为必填项,格式:http://${hostName}:${ip},如http://172.16.34.58:8765。 + 增强版/Lindorm 用户若需透传user, password参数,可以在queryServerAddress后增加对应可选属性. + 格式参考:http://127.0.0.1:8765;user=root;password=root * 必选:是 * 默认值:无 diff --git a/hbase20xsqlwriter/pom.xml b/hbase20xsqlwriter/pom.xml index 690bc95e..5a2843e1 100644 --- a/hbase20xsqlwriter/pom.xml +++ b/hbase20xsqlwriter/pom.xml @@ -14,7 +14,7 @@ jar - 5.1.0-HBase-2.0.0.2 + 5.2.5-HBase-2.x 1.8 diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java index 43f710b7..481e07df 100644 --- a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java @@ -6,12 +6,12 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.sql.*; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -154,7 +154,7 @@ public class HBase20xSQLWriterTask { * 从接收器中获取每条记录,写入Phoenix */ private void writeData(RecordReceiver lineReceiver) throws SQLException { - List buffer = Lists.newArrayListWithExpectedSize(batchSize); + List buffer = new ArrayList(batchSize); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { // 校验列数量是否符合预期 diff --git a/kingbaseesreader/doc/kingbaseesreader.md b/kingbaseesreader/doc/kingbaseesreader.md new file mode 100644 index 00000000..ec5495a6 --- /dev/null +++ b/kingbaseesreader/doc/kingbaseesreader.md @@ -0,0 +1,241 @@ + +# KingbaseesReader 插件文档 + + +___ + + +## 1 快速介绍 + +KingbaseesReader插件实现了从KingbaseES读取数据。在底层实现上,KingbaseesReader通过JDBC连接远程KingbaseES数据库,并执行相应的sql语句将数据从KingbaseES库中SELECT出来。 + +## 2 实现原理 + +简而言之,KingbaseesReader通过JDBC连接器连接到远程的KingbaseES数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程KingbaseES数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 + +对于用户配置Table、Column、Where的信息,KingbaseesReader将其拼接为SQL语句发送到KingbaseES数据库;对于用户配置querySql信息,KingbaseesReader直接将其发送到KingbaseES数据库。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从KingbaseES数据库同步抽取数据到本地的作业: + +``` +{ + "job": { + "setting": { + "speed": { + //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. + "byte": 1048576 + }, + //出错限制 + "errorLimit": { + //出错的record条数上限,当大于该值即报错。 + "record": 0, + //出错的record百分比上限 1.0表示100%,0.02表示2% + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "kingbaseesreader", + "parameter": { + // 数据库连接用户名 + "username": "xx", + // 数据库连接密码 + "password": "xx", + "column": [ + "id","name" + ], + //切分主键 + "splitPk": "id", + "connection": [ + { + "table": [ + "table" + ], + "jdbcUrl": [ + "jdbc:kingbase8://host:port/database" + ] + } + ] + } + }, + "writer": { + //writer类型 + "name": "streamwriter", + //是否打印内容 + "parameter": { + "print":true, + } + } + } + ] + } +} + +``` + +* 配置一个自定义SQL的数据库同步任务到本地内容的作业: + +``` +{ + "job": { + "setting": { + "speed": 1048576 + }, + "content": [ + { + "reader": { + "name": "kingbaseesreader", + "parameter": { + "username": "xx", + "password": "xx", + "where": "", + "connection": [ + { + "querySql": [ + "select db_id,on_line_flag from db_info where db_id < 10;" + ], + "jdbcUrl": [ + "jdbc:kingbase8://host:port/database", "jdbc:kingbase8://host:port/database" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": false, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + + +### 3.2 参数说明 + +* **jdbcUrl** + + * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,KingbaseesReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,KingbaseesReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。 + + jdbcUrl按照KingbaseES官方规范,并可以填写连接附件控制信息。具体请参看[KingbaseES官方文档](https://help.kingbase.com.cn/doc-view-5683.html)。 + + * 必选:是
+ + * 默认值:无
+ +* **username** + + * 描述:数据源的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:数据源指定用户名的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,KingbaseesReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+ + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。 + + 支持列裁剪,即列可以挑选部分列进行导出。 + + 支持列换序,即列可以不按照表schema信息进行导出。 + + 支持常量配置,用户需要按照KingbaseES语法格式: + ["id", "'hello'::varchar", "true", "2.5::real", "power(2,3)"] + id为普通列名,'hello'::varchar为字符串常量,true为布尔值,2.5为浮点数, power(2,3)为函数。 + + **column必须用户显示指定同步的列集合,不允许为空!** + + * 必选:是
+ + * 默认值:无
+ +* **splitPk** + + * 描述:KingbaseesReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。 + + 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + + 目前splitPk仅支持整形数据切分,`不支持浮点、字符串型、日期等其他类型`。如果用户指定其他非支持类型,KingbaseesReader将报错! + + splitPk设置为空,底层将视作用户不允许对单表进行切分,因此使用单通道进行抽取。 + + * 必选:否
+ + * 默认值:空
+ +* **where** + + * 描述:筛选条件,KingbaseesReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
+ + where条件可以有效地进行业务增量同步。 where条件不配置或者为空,视作全表同步数据。 + + * 必选:否
+ + * 默认值:无
+ +* **querySql** + + * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
+ + `当用户配置querySql时,KingbaseesReader直接忽略table、column、where条件的配置`。 + + * 必选:否
+ + * 默认值:无
+ +* **fetchSize** + + * 描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能。
+ + `注意,该值过大(>2048)可能造成DataX进程OOM。`。 + + * 必选:否
+ + * 默认值:1024
+ + +### 3.3 类型转换 + +目前KingbaseesReader支持大部分KingbaseES类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出KingbaseesReader针对KingbaseES类型转换列表: + + +| DataX 内部类型| KingbaseES 数据类型 | +| -------- | ----- | +| Long |bigint, bigserial, integer, smallint, serial | +| Double |double precision, money, numeric, real | +| String |varchar, char, text, bit, inet| +| Date |date, time, timestamp | +| Boolean |bool| +| Bytes |bytea| + +请注意: + +* `除上述罗列字段类型外,其他类型均不支持; money,inet,bit需用户使用a_inet::varchar类似的语法转换`。 \ No newline at end of file diff --git a/kingbaseesreader/pom.xml b/kingbaseesreader/pom.xml new file mode 100644 index 00000000..6e844c10 --- /dev/null +++ b/kingbaseesreader/pom.xml @@ -0,0 +1,88 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + kingbaseesreader + kingbaseesreader + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + com.kingbase8 + kingbase8 + 8.2.0 + system + ${basedir}/src/main/libs/kingbase8-8.2.0.jar + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + diff --git a/kingbaseesreader/src/main/assembly/package.xml b/kingbaseesreader/src/main/assembly/package.xml new file mode 100644 index 00000000..e369c5f0 --- /dev/null +++ b/kingbaseesreader/src/main/assembly/package.xml @@ -0,0 +1,42 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/kingbaseesreader + + + target/ + + kingbaseesreader-0.0.1-SNAPSHOT.jar + + plugin/reader/kingbaseesreader + + + src/main/libs + + *.* + + plugin/reader/kingbaseesreader/libs + + + + + + false + plugin/reader/kingbaseesreader/libs + runtime + + + diff --git a/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/Constant.java b/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/Constant.java new file mode 100644 index 00000000..bed4c6e6 --- /dev/null +++ b/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/Constant.java @@ -0,0 +1,7 @@ +package com.alibaba.datax.plugin.reader.kingbaseesreader; + +public class Constant { + + public static final int DEFAULT_FETCH_SIZE = 1000; + +} diff --git a/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/KingbaseesReader.java b/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/KingbaseesReader.java new file mode 100644 index 00000000..9246655f --- /dev/null +++ b/kingbaseesreader/src/main/java/com/alibaba/datax/plugin/reader/kingbaseesreader/KingbaseesReader.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.reader.kingbaseesreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; + +import java.util.List; + +public class KingbaseesReader extends Reader { + + private static final DataBaseType DATABASE_TYPE = DataBaseType.KingbaseES; + + public static class Job extends Reader.Job { + + private Configuration originalConfig; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + int fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, + Constant.DEFAULT_FETCH_SIZE); + if (fetchSize < 1) { + throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE, + String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize)); + } + this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize); + + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.originalConfig); + } + + @Override + public List split(int adviceNumber) { + return this.commonRdbmsReaderMaster.split(this.originalConfig, adviceNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.originalConfig); + } + + } + + public static class Task extends Reader.Task { + + private Configuration readerSliceConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.readerSliceConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.readerSliceConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE); + + this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender, + super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.readerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig); + } + + } + +} diff --git a/kingbaseesreader/src/main/libs/kingbase8-8.2.0.jar b/kingbaseesreader/src/main/libs/kingbase8-8.2.0.jar new file mode 100644 index 00000000..0b5ac964 Binary files /dev/null and b/kingbaseesreader/src/main/libs/kingbase8-8.2.0.jar differ diff --git a/kingbaseesreader/src/main/resources/plugin.json b/kingbaseesreader/src/main/resources/plugin.json new file mode 100644 index 00000000..9bc0684b --- /dev/null +++ b/kingbaseesreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "kingbaseesreader", + "class": "com.alibaba.datax.plugin.reader.kingbaseesreader.KingbaseesReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/kingbaseesreader/src/main/resources/plugin_job_template.json b/kingbaseesreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..49c07098 --- /dev/null +++ b/kingbaseesreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "kingbaseesreader", + "parameter": { + "username": "", + "password": "", + "connection": [ + { + "table": [], + "jdbcUrl": [] + } + ] + } +} \ No newline at end of file diff --git a/kingbaseeswriter/doc/kingbaseeswriter.md b/kingbaseeswriter/doc/kingbaseeswriter.md new file mode 100644 index 00000000..96e1a3ac --- /dev/null +++ b/kingbaseeswriter/doc/kingbaseeswriter.md @@ -0,0 +1,208 @@ +# DataX KingbaseesWriter + + +--- + + +## 1 快速介绍 + +KingbaseesWriter插件实现了写入数据到 KingbaseES主库目的表的功能。在底层实现上,KingbaseesWriter通过JDBC连接远程 KingbaseES 数据库,并执行相应的 insert into ... sql 语句将数据写入 KingbaseES,内部会分批次提交入库。 + +KingbaseesWriter面向ETL开发工程师,他们使用KingbaseesWriter从数仓导入数据到KingbaseES。同时 KingbaseesWriter亦可以作为数据迁移工具为DBA等用户提供服务。 + + +## 2 实现原理 + +KingbaseesWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句 + + +* `insert into...`(当主键/唯一性索引冲突时会写不进去冲突的行) + +
+ + 注意: + 1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 + 2. KingbaseesWriter和MysqlWriter不同,不支持配置writeMode参数。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 这里使用一份从内存产生到 KingbaseesWriter导入的数据。 + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 1 + } + }, + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": "DataX", + "type": "string" + }, + { + "value": 19880808, + "type": "long" + }, + { + "value": "1988-08-08 08:08:08", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + } + ], + "sliceRecordCount": 1000 + } + }, + "writer": { + "name": "kingbaseeswriter", + "parameter": { + "username": "xx", + "password": "xx", + "column": [ + "id", + "name" + ], + "preSql": [ + "delete from test" + ], + "connection": [ + { + "jdbcUrl": "jdbc:kingbase8://127.0.0.1:3002/datax", + "table": [ + "test" + ] + } + ] + } + } + } + ] + } +} + +``` + + +### 3.2 参数说明 + +* **jdbcUrl** + + * 描述:目的数据库的 JDBC 连接信息 ,jdbcUrl必须包含在connection配置单元中。 + + 注意:1、在一个数据库上只能配置一个值。 + 2、jdbcUrl按照KingbaseES官方规范,并可以填写连接附加参数信息。具体请参看KingbaseES官方文档或者咨询对应 DBA。 + + + * 必选:是
+ + * 默认值:无
+ +* **username** + + * 描述:目的数据库的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:目的数据库的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。 + + 注意:table 和 jdbcUrl 必须包含在 connection 配置单元中 + + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用\*表示, 例如: "column": ["\*"] + + 注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败 + 2、此处 column 不能配置任何常量值 + + * 必选:是
+ + * 默认值:否
+ +* **preSql** + + * 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
+ + * 必选:否
+ + * 默认值:无
+ +* **postSql** + + * 描述:写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
+ + * 必选:否
+ + * 默认值:无
+ +* **batchSize** + + * 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与KingbaseES的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
+ + * 必选:否
+ + * 默认值:1024
+ +### 3.3 类型转换 + +目前 KingbaseesWriter支持大部分 KingbaseES类型,但也存在部分没有支持的情况,请注意检查你的类型。 + +下面列出 KingbaseesWriter针对 KingbaseES类型转换列表: + +| DataX 内部类型| KingbaseES 数据类型 | +| -------- | ----- | +| Long |bigint, bigserial, integer, smallint, serial | +| Double |double precision, money, numeric, real | +| String |varchar, char, text, bit| +| Date |date, time, timestamp | +| Boolean |bool| +| Bytes |bytea| + + +## FAQ + +*** + +**Q: KingbaseesWriter 执行 postSql 语句报错,那么数据导入到目标数据库了吗?** + +A: DataX 导入过程存在三块逻辑,pre 操作、导入操作、post 操作,其中任意一环报错,DataX 作业报错。由于 DataX 不能保证在同一个事务完成上述几个操作,因此有可能数据已经落入到目标端。 + +*** + +**Q: 按照上述说法,那么有部分脏数据导入数据库,如果影响到线上数据库怎么办?** + +A: 目前有两种解法,第一种配置 pre 语句,该 sql 可以清理当天导入数据, DataX 每次导入时候可以把上次清理干净并导入完整数据。 +第二种,向临时表导入数据,完成后再 rename 到线上表。 + +*** diff --git a/kingbaseeswriter/pom.xml b/kingbaseeswriter/pom.xml new file mode 100644 index 00000000..284c8c5e --- /dev/null +++ b/kingbaseeswriter/pom.xml @@ -0,0 +1,84 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + kingbaseeswriter + kingbaseeswriter + jar + writer data into kingbasees database + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + com.kingbase8 + kingbase8 + 8.2.0 + system + ${basedir}/src/main/libs/kingbase8-8.2.0.jar + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/kingbaseeswriter/src/main/assembly/package.xml b/kingbaseeswriter/src/main/assembly/package.xml new file mode 100644 index 00000000..aa78a6ec --- /dev/null +++ b/kingbaseeswriter/src/main/assembly/package.xml @@ -0,0 +1,42 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/kingbaseeswriter + + + target/ + + kingbaseeswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/kingbaseeswriter + + + src/main/libs + + *.* + + plugin/writer/kingbaseeswriter/libs + + + + + + false + plugin/writer/kingbaseeswriter/libs + runtime + + + diff --git a/kingbaseeswriter/src/main/java/com/alibaba/datax/plugin/writer/kingbaseeswriter/KingbaseesWriter.java b/kingbaseeswriter/src/main/java/com/alibaba/datax/plugin/writer/kingbaseeswriter/KingbaseesWriter.java new file mode 100644 index 00000000..dec5ff95 --- /dev/null +++ b/kingbaseeswriter/src/main/java/com/alibaba/datax/plugin/writer/kingbaseeswriter/KingbaseesWriter.java @@ -0,0 +1,100 @@ +package com.alibaba.datax.plugin.writer.kingbaseeswriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.Key; + +import java.util.List; + +public class KingbaseesWriter extends Writer { + private static final DataBaseType DATABASE_TYPE = DataBaseType.KingbaseES; + + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + + // warn:not like mysql, KingbaseES only support insert mode, don't use + String writeMode = this.originalConfig.getString(Key.WRITE_MODE); + if (null != writeMode) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + String.format("写入模式(writeMode)配置有误. 因为KingbaseES不支持配置参数项 writeMode: %s, KingbaseES仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode)); + } + + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterMaster.init(this.originalConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterMaster.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsWriterMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterMaster.destroy(this.originalConfig); + } + + } + + public static class Task extends Writer.Task { + private Configuration writerSliceConfig; + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){ + @Override + public String calcValueHolder(String columnType){ + if("serial".equalsIgnoreCase(columnType)){ + return "?::int"; + }else if("bit".equalsIgnoreCase(columnType)){ + return "?::bit varying"; + } + return "?::" + columnType; + } + }; + this.commonRdbmsWriterSlave.init(this.writerSliceConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig); + } + + public void startWrite(RecordReceiver recordReceiver) { + this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); + } + + @Override + public void post() { + this.commonRdbmsWriterSlave.post(this.writerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); + } + + } + +} diff --git a/kingbaseeswriter/src/main/libs/kingbase8-8.2.0.jar b/kingbaseeswriter/src/main/libs/kingbase8-8.2.0.jar new file mode 100644 index 00000000..0b5ac964 Binary files /dev/null and b/kingbaseeswriter/src/main/libs/kingbase8-8.2.0.jar differ diff --git a/kingbaseeswriter/src/main/resources/plugin.json b/kingbaseeswriter/src/main/resources/plugin.json new file mode 100644 index 00000000..83517760 --- /dev/null +++ b/kingbaseeswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "kingbaseeswriter", + "class": "com.alibaba.datax.plugin.writer.kingbaseeswriter.KingbaseesWriter", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql. warn: The more you know about the database, the less problems you encounter.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/kingbaseeswriter/src/main/resources/plugin_job_template.json b/kingbaseeswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..94b66168 --- /dev/null +++ b/kingbaseeswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,17 @@ +{ + "name": "kingbaseeswriter", + "parameter": { + "username": "", + "password": "", + "column": [], + "preSql": [], + "connection": [ + { + "jdbcUrl": "", + "table": [] + } + ], + "preSql": [], + "postSql": [] + } +} \ No newline at end of file diff --git a/kuduwriter/README.md b/kuduwriter/README.md new file mode 100644 index 00000000..f53de1b5 --- /dev/null +++ b/kuduwriter/README.md @@ -0,0 +1,6 @@ +# datax-kudu-plugin +datax kudu的writer插件 + + + +仅在kudu11进行过测试 diff --git a/kuduwriter/pom.xml b/kuduwriter/pom.xml new file mode 100644 index 00000000..04b5ef53 --- /dev/null +++ b/kuduwriter/pom.xml @@ -0,0 +1,82 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + kuduwriter + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.kudu + kudu-client + 1.11.1 + + + junit + junit + 4.13 + test + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-service-face + + + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + \ No newline at end of file diff --git a/kuduwriter/src/main/assembly/package.xml b/kuduwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..5b1a10a7 --- /dev/null +++ b/kuduwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/kudu11xwriter + + + target/ + + kudu11xwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/kudu11xwriter + + + + + + false + plugin/writer/kudu11xwriter/libs + runtime + + + \ No newline at end of file diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java new file mode 100644 index 00000000..ebd6ea79 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java @@ -0,0 +1,37 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; + +import java.util.Arrays; + +/** + * @author daizihao + * @create 2020-08-31 19:12 + **/ +public enum ColumnType { + INT("int"), + FLOAT("float"), + STRING("string"), + BIGINT("bigint"), + DOUBLE("double"), + BOOLEAN("boolean"), + LONG("long"); + private String mode; + ColumnType(String mode) { + this.mode = mode.toLowerCase(); + } + + public String getMode() { + return mode; + } + + public static ColumnType getByTypeName(String modeName) { + for (ColumnType modeType : values()) { + if (modeType.mode.equalsIgnoreCase(modeName)) { + return modeType; + } + } + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Kuduwriter does not support the type:%s, currently supported types are:%s", modeName, Arrays.asList(values()))); + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java new file mode 100644 index 00000000..2710e350 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java @@ -0,0 +1,21 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +/** + * @author daizihao + * @create 2020-08-31 14:42 + **/ +public class Constant { + public static final String DEFAULT_ENCODING = "UTF-8"; +// public static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static final String COMPRESSION = "DEFAULT_COMPRESSION"; + public static final String ENCODING = "AUTO_ENCODING"; + public static final Long ADMIN_TIMEOUTMS = 60000L; + public static final Long SESSION_TIMEOUTMS = 60000L; + + + public static final String INSERT_MODE = "upsert"; + public static final long DEFAULT_WRITE_BATCH_SIZE = 512L; + public static final long DEFAULT_MUTATION_BUFFER_SPACE = 3072L; + +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java new file mode 100644 index 00000000..754ca4fc --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java @@ -0,0 +1,34 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; + +import java.util.Arrays; + +/** + * @author daizihao + * @create 2020-08-31 14:47 + **/ +public enum InsertModeType { + Insert("insert"), + Upsert("upsert"), + Update("update"); + private String mode; + + InsertModeType(String mode) { + this.mode = mode.toLowerCase(); + } + + public String getMode() { + return mode; + } + + public static InsertModeType getByTypeName(String modeName) { + for (InsertModeType modeType : values()) { + if (modeType.mode.equalsIgnoreCase(modeName)) { + return modeType; + } + } + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Kuduwriter does not support the mode :[%s], currently supported mode types are :%s", modeName, Arrays.asList(values()))); + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java new file mode 100644 index 00000000..7e5755aa --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java @@ -0,0 +1,45 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +/** + * @author daizihao + * @create 2020-08-31 14:17 + **/ +public class Key { + public final static String KUDU_CONFIG = "kuduConfig"; + public final static String KUDU_MASTER = "kudu.master_addresses"; + public final static String KUDU_ADMIN_TIMEOUT = "timeout"; + public final static String KUDU_SESSION_TIMEOUT = "sessionTimeout"; + + public final static String TABLE = "table"; + public final static String PARTITION = "partition"; + public final static String COLUMN = "column"; + + public static final String NAME = "name"; + public static final String TYPE = "type"; + public static final String INDEX = "index"; + public static final String PRIMARYKEY = "primaryKey"; + public static final String COMPRESSION = "compress"; + public static final String COMMENT = "comment"; + public final static String ENCODING = "encoding"; + + + + public static final String NUM_REPLICAS = "replicaCount"; + public static final String HASH = "hash"; + public static final String HASH_NUM = "number"; + + public static final String RANGE = "range"; + public static final String LOWER = "lower"; + public static final String UPPER = "upper"; + + + + public static final String TRUNCATE = "truncate"; + + public static final String INSERT_MODE = "writeMode"; + + public static final String WRITE_BATCH_SIZE = "batchSize"; + + public static final String MUTATION_BUFFER_SPACE = "bufferSize"; + public static final String SKIP_FAIL = "skipFail"; +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java new file mode 100644 index 00000000..10568820 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java @@ -0,0 +1,292 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author daizihao + * @create 2020-08-27 18:30 + **/ +public class Kudu11xHelper { + + private static final Logger LOG = LoggerFactory.getLogger(Kudu11xHelper.class); + + public static Map getKuduConfiguration(String kuduConfig) { + if (StringUtils.isBlank(kuduConfig)) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, + "Connection configuration information required."); + } + Map kConfiguration; + try { + kConfiguration = JSON.parseObject(kuduConfig, HashMap.class); + Validate.isTrue(kConfiguration != null, "kuduConfig is null!"); + kConfiguration.put(Key.KUDU_ADMIN_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_ADMIN_TIMEOUT, Constant.ADMIN_TIMEOUTMS)); + kConfiguration.put(Key.KUDU_SESSION_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_SESSION_TIMEOUT, Constant.SESSION_TIMEOUTMS)); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } + + return kConfiguration; + } + + public static KuduClient getKuduClient(String kuduConfig) { + Map conf = Kudu11xHelper.getKuduConfiguration(kuduConfig); + KuduClient kuduClient = null; + try { + String masterAddress = (String)conf.get(Key.KUDU_MASTER); + kuduClient = new KuduClient.KuduClientBuilder(masterAddress) + .defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT)) + .defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT)) + .build(); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } + return kuduClient; + } + + public static KuduTable getKuduTable(Configuration configuration, KuduClient kuduClient) { + String tableName = configuration.getString(Key.TABLE); + + KuduTable table = null; + try { + if (kuduClient.tableExists(tableName)) { + table = kuduClient.openTable(tableName); + } else { + synchronized (Kudu11xHelper.class) { + if (!kuduClient.tableExists(tableName)) { + Schema schema = Kudu11xHelper.getSchema(configuration); + CreateTableOptions tableOptions = new CreateTableOptions(); + + Kudu11xHelper.setTablePartition(configuration, tableOptions, schema); + //副本数 + Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3); + tableOptions.setNumReplicas(numReplicas); + table = kuduClient.createTable(tableName, schema, tableOptions); + } else { + table = kuduClient.openTable(tableName); + } + } + } + + + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_TABLE_ERROR, e); + } + return table; + } + + public static void createTable(Configuration configuration) { + String tableName = configuration.getString(Key.TABLE); + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + try { + Schema schema = Kudu11xHelper.getSchema(configuration); + CreateTableOptions tableOptions = new CreateTableOptions(); + + Kudu11xHelper.setTablePartition(configuration, tableOptions, schema); + //副本数 + Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3); + tableOptions.setNumReplicas(numReplicas); + kuduClient.createTable(tableName, schema, tableOptions); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e); + } finally { + AtomicInteger i = new AtomicInteger(5); + while (i.get()>0) { + try { + if (kuduClient.isCreateTableDone(tableName)){ + Kudu11xHelper.closeClient(kuduClient); + LOG.info("Table "+ tableName +" is created!"); + break; + } + i.decrementAndGet(); + LOG.error("timeout!"); + } catch (KuduException e) { + LOG.info("Wait for the table to be created..... "+i); + try { + Thread.sleep(1000L); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + i.decrementAndGet(); + } + } + try { + if (kuduClient != null) { + kuduClient.close(); + } + } catch (KuduException e) { + LOG.info("Kudu client has been shut down!"); + } + } + } + + public static boolean isTableExists(Configuration configuration) { + String tableName = configuration.getString(Key.TABLE); + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + try { + return kuduClient.tableExists(tableName); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } finally { + Kudu11xHelper.closeClient(kuduClient); + } + } + + public static void closeClient(KuduClient kuduClient) { + try { + if (kuduClient != null) { + kuduClient.close(); + } + } catch (KuduException e) { + LOG.warn("kudu client is not gracefully closed !"); + + } + + } + + public static Schema getSchema(Configuration configuration) { + List columns = configuration.getListConfiguration(Key.COLUMN); + List columnSchemas = new ArrayList<>(); + Schema schema = null; + if (columns == null || columns.isEmpty()) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, "column is not defined,eg:column:[{\"name\": \"cf0:column0\",\"type\": \"string\"},{\"name\": \"cf1:column1\",\"type\": \"long\"}]"); + } + try { + for (Configuration column : columns) { + + String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) || + "LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ? + "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())? + "INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase(); + String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE); + Boolean key = column.getBool(Key.PRIMARYKEY, false); + String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase(); + String compression = column.getString(Key.COMPRESSION, Constant.COMPRESSION).toUpperCase(); + String comment = column.getString(Key.COMMENT, ""); + + columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder(name, Type.getTypeForName(type)) + .key(key) + .encoding(ColumnSchema.Encoding.valueOf(encoding)) + .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.valueOf(compression)) + .comment(comment) + .build()); + } + schema = new Schema(columnSchemas); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, e); + } + return schema; + } + + public static Integer getPrimaryKeyIndexUntil(List columns){ + int i = 0; + while ( i < columns.size() ) { + Configuration col = columns.get(i); + if (!col.getBool(Key.PRIMARYKEY, false)) { + break; + } + i++; + } + return i; + } + + public static void setTablePartition(Configuration configuration, + CreateTableOptions tableOptions, + Schema schema) { + Configuration partition = configuration.getConfiguration(Key.PARTITION); + if (partition == null) { + ColumnSchema columnSchema = schema.getColumns().get(0); + tableOptions.addHashPartitions(Collections.singletonList(columnSchema.getName()), 3); + return; + } + //range分区 + Configuration range = partition.getConfiguration(Key.RANGE); + if (range != null) { + List rangeColums = new ArrayList<>(range.getKeys()); + tableOptions.setRangePartitionColumns(rangeColums); + for (String rangeColum : rangeColums) { + List lowerAndUppers = range.getListConfiguration(rangeColum); + for (Configuration lowerAndUpper : lowerAndUppers) { + PartialRow lower = schema.newPartialRow(); + lower.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.LOWER, Kudu11xWriterErrorcode.REQUIRED_VALUE)); + PartialRow upper = schema.newPartialRow(); + upper.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.UPPER, Kudu11xWriterErrorcode.REQUIRED_VALUE)); + tableOptions.addRangePartition(lower, upper); + } + } + LOG.info("Set range partition complete!"); + } + + // 设置Hash分区 + Configuration hash = partition.getConfiguration(Key.HASH); + if (hash != null) { + List hashColums = hash.getList(Key.COLUMN, String.class); + Integer hashPartitionNum = configuration.getInt(Key.HASH_NUM, 3); + tableOptions.addHashPartitions(hashColums, hashPartitionNum); + LOG.info("Set hash partition complete!"); + } + } + + public static void validateParameter(Configuration configuration) { + configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE); + configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE); + String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); + if (!Charset.isSupported(encoding)) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Encoding is not supported:[%s] .", encoding)); + } + configuration.set(Key.ENCODING, encoding); + String insertMode = configuration.getString(Key.INSERT_MODE, Constant.INSERT_MODE); + try { + InsertModeType.getByTypeName(insertMode); + } catch (Exception e) { + insertMode = Constant.INSERT_MODE; + } + configuration.set(Key.INSERT_MODE, insertMode); + + Long writeBufferSize = configuration.getLong(Key.WRITE_BATCH_SIZE, Constant.DEFAULT_WRITE_BATCH_SIZE); + configuration.set(Key.WRITE_BATCH_SIZE, writeBufferSize); + + Long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE, Constant.DEFAULT_MUTATION_BUFFER_SPACE); + configuration.set(Key.MUTATION_BUFFER_SPACE, mutationBufferSpace); + + Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false); + configuration.set(Key.SKIP_FAIL, isSkipFail); + LOG.info("==validate parameter complete!"); + } + + public static void truncateTable(Configuration configuration) { + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + String userTable = configuration.getString(Key.TABLE); + LOG.info(String.format("Because you have configured truncate is true,KuduWriter begins to truncate table %s .", userTable)); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + + try { + if (kuduClient.tableExists(userTable)) { + kuduClient.deleteTable(userTable); + LOG.info(String.format("table %s has been deleted.", userTable)); + } + } catch (KuduException e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.DELETE_KUDU_ERROR, e); + } finally { + Kudu11xHelper.closeClient(kuduClient); + } + + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java new file mode 100644 index 00000000..9447a6c2 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java @@ -0,0 +1,85 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author daizihao + * @create 2020-08-27 16:58 + **/ +public class Kudu11xWriter extends Writer { + public static class Job extends Writer.Job{ + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration config = null; + @Override + public void init() { + this.config = this.getPluginJobConf(); + Kudu11xHelper.validateParameter(this.config); + } + + @Override + public void prepare() { + Boolean truncate = config.getBool(Key.TRUNCATE,false); + if(truncate){ + Kudu11xHelper.truncateTable(this.config); + } + + if (!Kudu11xHelper.isTableExists(config)){ + Kudu11xHelper.createTable(config); + } + } + + @Override + public List split(int i) { + List splitResultConfigs = new ArrayList(); + for (int j = 0; j < i; j++) { + splitResultConfigs.add(config.clone()); + } + + return splitResultConfigs; + } + + + + @Override + public void destroy() { + + } + } + + public static class Task extends Writer.Task{ + private Configuration taskConfig; + private KuduWriterTask kuduTaskProxy; + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + @Override + public void init() { + this.taskConfig = super.getPluginJobConf(); + this.kuduTaskProxy = new KuduWriterTask(this.taskConfig); + } + @Override + public void startWrite(RecordReceiver lineReceiver) { + this.kuduTaskProxy.startWriter(lineReceiver,super.getTaskPluginCollector()); + } + + + @Override + public void destroy() { + try { + if (kuduTaskProxy.session != null) { + kuduTaskProxy.session.close(); + } + }catch (Exception e){ + LOG.warn("kudu session is not gracefully closed !"); + } + Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient); + + } + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java new file mode 100644 index 00000000..d46bcea3 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java @@ -0,0 +1,39 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * @author daizihao + * @create 2020-08-27 19:25 + **/ +public enum Kudu11xWriterErrorcode implements ErrorCode { + REQUIRED_VALUE("Kuduwriter-00", "You are missing a required parameter value."), + ILLEGAL_VALUE("Kuduwriter-01", "You fill in the parameter values are not legitimate."), + GET_KUDU_CONNECTION_ERROR("Kuduwriter-02", "Error getting Kudu connection."), + GET_KUDU_TABLE_ERROR("Kuduwriter-03", "Error getting Kudu table."), + CLOSE_KUDU_CONNECTION_ERROR("Kuduwriter-04", "Error closing Kudu connection."), + CLOSE_KUDU_SESSION_ERROR("Kuduwriter-06", "Error closing Kudu table connection."), + PUT_KUDU_ERROR("Kuduwriter-07", "IO exception occurred when writing to Kudu."), + DELETE_KUDU_ERROR("Kuduwriter-08", "An exception occurred while delete Kudu table."), + GREATE_KUDU_TABLE_ERROR("Kuduwriter-09", "Error creating Kudu table."), + PARAMETER_NUM_ERROR("Kuduwriter-10","The number of parameters does not match.") + ; + + private final String code; + private final String description; + + + Kudu11xWriterErrorcode(String code, String description) { + this.code = code; + this.description = description; + } + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java new file mode 100644 index 00000000..127ee0c1 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -0,0 +1,187 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.RetryUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author daizihao + * @create 2020-08-31 16:55 + **/ +public class KuduWriterTask { + private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class); + + public List columns; + public String encoding; + public String insertMode; + public Double batchSize; + public long mutationBufferSpace; + public Boolean isUpsert; + public Boolean isSkipFail; + + public KuduClient kuduClient; + public KuduTable table; + public KuduSession session; + private Integer primaryKeyIndexUntil; + + + public KuduWriterTask(Configuration configuration) { + this.columns = configuration.getListConfiguration(Key.COLUMN); + this.encoding = configuration.getString(Key.ENCODING); + this.insertMode = configuration.getString(Key.INSERT_MODE); + this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE); + this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); + this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert"); + this.isSkipFail = configuration.getBool(Key.SKIP_FAIL); + + this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG)); + this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient); + this.session = kuduClient.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + session.setMutationBufferSpace((int) mutationBufferSpace); + this.primaryKeyIndexUntil = Kudu11xHelper.getPrimaryKeyIndexUntil(columns); +// tableName = configuration.getString(Key.TABLE); + } + + public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { + LOG.info("==kuduwriter began to write!"); + Record record; + AtomicLong counter = new AtomicLong(0L); + try { + while ((record = lineReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != columns.size()) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PARAMETER_NUM_ERROR, " number of record fields:" + record.getColumnNumber() + " number of configuration fields:" + columns.size()); + } + boolean isDirtyRecord = false; + + + for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) { + Column column = record.getColumn(i); + isDirtyRecord = StringUtils.isBlank(column.asString()); + } + + if (isDirtyRecord) { + taskPluginCollector.collectDirtyRecord(record, "primarykey field is null"); + continue; + } + + Upsert upsert = table.newUpsert(); + Insert insert = table.newInsert(); + + for (int i = 0; i < columns.size(); i++) { + PartialRow row; + if (isUpsert) { + //覆盖更新 + row = upsert.getRow(); + } else { + //增量更新 + row = insert.getRow(); + } + Configuration col = columns.get(i); + String name = col.getString(Key.NAME); + ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE)); + Column column = record.getColumn(col.getInt(Key.INDEX, i)); + Object rawData = column.getRawData(); + if (rawData == null) { + row.setNull(name); + continue; + } + switch (type) { + case INT: + row.addInt(name, Integer.parseInt(rawData.toString())); + break; + case LONG: + case BIGINT: + row.addLong(name, Long.parseLong(rawData.toString())); + break; + case FLOAT: + row.addFloat(name, Float.parseFloat(rawData.toString())); + break; + case DOUBLE: + row.addDouble(name, Double.parseDouble(rawData.toString())); + break; + case BOOLEAN: + row.addBoolean(name, Boolean.getBoolean(rawData.toString())); + break; + case STRING: + default: + row.addString(name, rawData.toString()); + } + } + try { + RetryUtil.executeWithRetry(()->{ + if (isUpsert) { + //覆盖更新 + session.apply(upsert); + } else { + //增量更新 + session.apply(insert); + } + //提前写数据,阈值可自定义 + if (counter.incrementAndGet() > batchSize * 0.75) { + session.flush(); + counter.set(0L); + } + return true; + },5,1000L,true); + + } catch (Exception e) { + LOG.error("Data write failed!", e); + if (isSkipFail) { + LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); + taskPluginCollector.collectDirtyRecord(record, e.getMessage()); + }else { + throw e; + } + } + } + } catch (Exception e) { + LOG.error("write failed! the task will exit!"); + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); + } + AtomicInteger i = new AtomicInteger(10); + try { + while (i.get() > 0) { + if (session.hasPendingOperations()) { + session.flush(); + break; + } + Thread.sleep(1000L); + i.decrementAndGet(); + } + } catch (Exception e) { + LOG.info("Waiting for data to be inserted...... " + i + "s"); + try { + Thread.sleep(1000L); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + i.decrementAndGet(); + } finally { + try { + session.flush(); + } catch (KuduException e) { + LOG.error("==kuduwriter flush error! the results may not be complete!"); + e.printStackTrace(); + } + } + + } + + +} diff --git a/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java new file mode 100644 index 00000000..f1499a0f --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java @@ -0,0 +1,9 @@ +package com.q1.kudu.conf; + +/** + * @author daizihao + * @create 2020-09-16 11:39 + **/ +public class KuduConfig { + +} diff --git a/kuduwriter/src/main/resources/plugin.json b/kuduwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..948c7e22 --- /dev/null +++ b/kuduwriter/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "kudu11xwriter", + "class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter", + "description": "use put: prod. mechanism: use kudu java api put data.", + "developer": "com.q1.daizihao" +} + diff --git a/kuduwriter/src/main/resources/plugin_job_template.json b/kuduwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..d2723098 --- /dev/null +++ b/kuduwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,59 @@ +{ + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***", + "timeout": 60000, + "sessionTimeout": 60000 + + }, + "table": "", + "replicaCount": 3, + "truncate": false, + "writeMode": "upsert", + "partition": { + "range": { + "column1": [ + { + "lower": "2020-08-25", + "upper": "2020-08-26" + }, + { + "lower": "2020-08-26", + "upper": "2020-08-27" + }, + { + "lower": "2020-08-27", + "upper": "2020-08-28" + } + ] + }, + "hash": { + "column": [ + "column1" + ], + "number": 3 + } + }, + "column": [ + { + "index": 0, + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "index": 1, + "name": "c2", + "type": "string", + "compress": "DEFAULT_COMPRESSION", + "encoding": "AUTO_ENCODING", + "comment": "注解xxxx" + } + ], + "batchSize": 1024, + "bufferSize": 2048, + "skipFail": false, + "encoding": "UTF-8" + } +} \ No newline at end of file diff --git a/kuduwriter/src/test/java/com/dai/test.java b/kuduwriter/src/test/java/com/dai/test.java new file mode 100644 index 00000000..5fd17beb --- /dev/null +++ b/kuduwriter/src/test/java/com/dai/test.java @@ -0,0 +1,40 @@ +package com.dai; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.RetryUtil; +import com.q1.datax.plugin.writer.kudu11xwriter.*; +import static org.apache.kudu.client.AsyncKuduClient.LOG; + +/** + * @author daizihao + * @create 2020-08-28 11:03 + **/ +public class test { + static boolean isSkipFail; + + + public static void main(String[] args) { + try { + while (true) { + try { + RetryUtil.executeWithRetry(()->{ + throw new RuntimeException(); + },5,1000L,true); + + } catch (Exception e) { + LOG.error("Data write failed!", e); + System.out.println(isSkipFail); + if (isSkipFail) { + LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); + }else { + System.out.println("异常抛出"); + throw e; + } + } + } + } catch (Exception e) { + LOG.error("write failed! the task will exit!"); + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e); + } + } +} diff --git a/package.xml b/package.xml index 6d97b372..2323ce18 100755 --- a/package.xml +++ b/package.xml @@ -74,6 +74,13 @@ datax + + kingbaseesreader/target/datax/ + + **/*.* + + datax + rdbmsreader/target/datax/ @@ -266,6 +273,13 @@ datax + + kingbaseeswriter/target/datax/ + + **/*.* + + datax + rdbmswriter/target/datax/ diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java index ea11d99b..eb82aa44 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java @@ -19,7 +19,8 @@ public enum DataBaseType { RDBMS("rdbms", "com.alibaba.datax.plugin.rdbms.util.DataBaseType"), DB2("db2", "com.ibm.db2.jcc.DB2Driver"), ADS("ads","com.mysql.jdbc.Driver"), - ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"); + ClickHouse("clickhouse", "ru.yandex.clickhouse.ClickHouseDriver"), + KingbaseES("kingbasees", "com.kingbase8.Driver"); private String typeName; @@ -59,6 +60,8 @@ public enum DataBaseType { break; case RDBMS: break; + case KingbaseES: + break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); } @@ -98,6 +101,8 @@ public enum DataBaseType { break; case RDBMS: break; + case KingbaseES: + break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); } @@ -122,6 +127,7 @@ public enum DataBaseType { break; case DB2: case PostgreSQL: + case KingbaseES: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); @@ -145,6 +151,7 @@ public enum DataBaseType { break; case DB2: case PostgreSQL: + case KingbaseES: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type"); @@ -168,6 +175,8 @@ public enum DataBaseType { break; case PostgreSQL: break; + case KingbaseES: + break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type"); } diff --git a/pom.xml b/pom.xml index bd270cc5..5b7f8e48 100755 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ drdsreader sqlserverreader postgresqlreader + kingbaseesreader oraclereader odpsreader otsreader @@ -80,6 +81,7 @@ oraclewriter sqlserverwriter postgresqlwriter + kingbaseeswriter osswriter mongodbwriter adswriter @@ -100,6 +102,7 @@ plugin-unstructured-storage-util hbase20xsqlreader hbase20xsqlwriter + kuduwriter