From 8059bba623fa0ee7b525510be85eee5fdf6d707e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=B5=E6=88=90?= Date: Thu, 25 Jul 2019 11:12:52 +0800 Subject: [PATCH 1/5] add adbpgwriter plugin --- adbpgwriter/duplicate | 0 adbpgwriter/pom.xml | 113 +++++++++ adbpgwriter/src/main/assembly/package.xml | 35 +++ adbpgwriter/src/main/doc/adbpgwriter.md | 216 ++++++++++++++++++ .../AdbpgWriter.java | 117 ++++++++++ .../package-info.java | 8 + .../adbpgwriter/copy/Adb4pgClientProxy.java | 182 +++++++++++++++ .../writer/adbpgwriter/copy/AdbProxy.java | 13 ++ .../writer/adbpgwriter/util/Adb4pgUtil.java | 146 ++++++++++++ .../writer/adbpgwriter/util/Constant.java | 12 + .../plugin/writer/adbpgwriter/util/Key.java | 26 +++ adbpgwriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 13 ++ package.xml | 7 + pom.xml | 3 +- 15 files changed, 896 insertions(+), 1 deletion(-) create mode 100644 adbpgwriter/duplicate create mode 100644 adbpgwriter/pom.xml create mode 100644 adbpgwriter/src/main/assembly/package.xml create mode 100644 adbpgwriter/src/main/doc/adbpgwriter.md create mode 100644 adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java create mode 100644 adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java create mode 100644 adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java create mode 100644 adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java create mode 100644 adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java create mode 100644 adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java create mode 100644 adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java create mode 100644 adbpgwriter/src/main/resources/plugin.json create mode 100644 adbpgwriter/src/main/resources/plugin_job_template.json diff --git a/adbpgwriter/duplicate b/adbpgwriter/duplicate new file mode 100644 index 00000000..e69de29b diff --git a/adbpgwriter/pom.xml b/adbpgwriter/pom.xml new file mode 100644 index 00000000..2b78aaf0 --- /dev/null +++ b/adbpgwriter/pom.xml @@ -0,0 +1,113 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + adbpgwriter + adbpgwriter + jar + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + mysql + mysql-connector-java + + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + com.alibaba + druid + + + + + com.alibaba + druid + 1.1.17 + + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-exec + 1.3 + + + ch.qos.logback + logback-classic + + + commons-configuration + commons-configuration + 1.10 + + + + com.alibaba.cloud.analyticdb + adb4pgclient + 1.0.0-SNAPSHOT + + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + \ No newline at end of file diff --git a/adbpgwriter/src/main/assembly/package.xml b/adbpgwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..6b5ba1a8 --- /dev/null +++ b/adbpgwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/adbpgwriter + + + target/ + + adbpgwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/adbpgwriter + + + + + + false + plugin/writer/adbpgwriter/libs + runtime + + + diff --git a/adbpgwriter/src/main/doc/adbpgwriter.md b/adbpgwriter/src/main/doc/adbpgwriter.md new file mode 100644 index 00000000..800e5619 --- /dev/null +++ b/adbpgwriter/src/main/doc/adbpgwriter.md @@ -0,0 +1,216 @@ +# DataX ADB PG Writer + + + +--- + + +## 1 快速介绍 +AdbpgWriter 插件实现了写入数据到 ABD PG版数据库的功能。在底层实现上,AdbpgWriter 插件会先缓存需要写入的数据,当缓存的 +数据量达到 commitSize 时,插件会通过 JDBC 连接远程 ADB PG版 数据库,并执行 COPY 命令将数据写入 ADB PG 数据库。 + +AdbpgWriter 可以作为数据迁移工具为用户提供服务。 + +## 2 实现原理 + +AdbpgWriter 通过 DataX 框架获取 Reader 生成的协议数据,首先会将数据缓存,当缓存的数据量达到commitSize时,插件根据你配置生成相应的COPY语句,执行 +COPY命令将数据写入ADB PG数据库中。 + +## 3 功能说明 + +### 3.1 配置样例 + +* 这里使用一份从内存产生到 AdbpgWriter导入的数据 + +```json + +{ + "job": { + "setting": { + "speed": { + "channel": 32 + } + }, + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": "DataX", + "type": "string" + }, + { + "value": 19880808, + "type": "long" + }, + { + "value": "1988-08-08 08:08:08", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + } + ] + }, + "sliceRecordCount": 1000 + }, + + "writer": { + "name": "adbpgwriter", + "parameter": { + "username": "username", + "password": "password", + "host": "host", + "port": "1234", + "database": "database", + "schema": "schema", + "table": "table", + "column": ["*"] + } + } + } + ] + } +} +``` + +### 3.2 参数说明 + +* **name** + * 描述:插件名称
+ + * 必选:是
+ + * 默认值:无
+ +* **username** + * 描述:目的数据库的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:目的数据库的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **host** + + * 描述:目的数据库主机名
+ + * 必选:是
+ + * 默认值:无
+ +* **port** + + * 描述:目的数据库的端口
+ + * 必选:是
+ + * 默认值:无
+* **database** + + * 描述:需要写入的表所属的数据库名称
+ + * 必选:是
+ + * 默认值:无
+* **schema** + + * 描述:需要写入的表所属的schema名称
+ + * 必选:是
+ + * 默认值:无
+* **table** + + * 描述:需要写入的表名称
+ + * 必选:是
+ + * 默认值:无
+* **column** + + * 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"] + + 注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败 + 2、此处 column 不能配置任何常量值 + + * 必选:是
+ + * 默认值:否
+ +### 3.3 类型转换 + +目前 AdbpgWriter 支持大部分 ADB PG 数据库的类型,但也存在部分没有支持的情况,请注意检查你的类型。 + +下面列出 AdbpgWriter 针对 ADB PG 类型转换列表: + +| DataX 内部类型| ADB PG 数据类型 | +| -------- | ----- | +| Long |bigint, bigserial, integer, smallint, serial | +| Double |double precision, float, numeric, real | +| String |varchar, char, text| +| Date |date, time, timestamp | +| Boolean |bool| + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 +建表语句: +```sql +create table schematest.test_datax ( + t1 int, + t2 bigint, + t3 bigserial, + t4 float, + t5 timestamp, + t6 varchar +)distributed by(t1); + +``` + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + 1. cpu: 24核 + 2. mem: 96GB + + +* ADB PG数据库机器参数为: + 1. 平均core数量:4 + 2. primary segment 数量: 4 + 3. 计算组数量:2 +### 4.2 测试报告 + +#### 4.2.1 单表测试报告 + +| 通道数| commitSize MB | DataX速度(Rec/s)| DataX流量(M/s) +|--------|--------| --------|--------| +|1| 10 | 54098 | 15.54 | +|1| 20 | 55000 | 15.80 | +|4| 10 | 183333 | 52.66 | +|4| 20 | 173684 | 49.89 | +|8| 10 | 330000 | 94.79 | +|8| 20 | 300000 | 86.17 | +|16| 10 | 412500 | 118.48 | +|16| 20 | 366666 | 105.32 | +|32| 10 | 366666 | 105.32 | + +#### 4.2.2 性能测试小结 +1. `channel数对性能影响很大` +2. `通常不建议写入数据库时,通道个数 > 32` \ No newline at end of file diff --git a/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java b/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java new file mode 100644 index 00000000..7c1c6379 --- /dev/null +++ b/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java @@ -0,0 +1,117 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter; + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil; +import com.alibaba.datax.plugin.writer.adbpgwriter.copy.Adb4pgClientProxy; +import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.*; +import static com.alibaba.datax.plugin.rdbms.util.DataBaseType.PostgreSQL; + +/** + * Created by yuncheng on 07/13/2019. + */ +public class AdbpgWriter extends Writer { + private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL; + + public static class Job extends Writer.Job { + + private Configuration originalConfig; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + private static final Logger LOG = LoggerFactory.getLogger(Writer.Job.class); + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + LOG.info("in Job.init(), config is:[\n{}\n]", originalConfig.toJSON()); + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + //convert to DatabaseConfig, use DatabaseConfig to check user configuration + Adb4pgUtil.checkConfig(originalConfig); + } + + @Override + public void prepare() { + + Adb4pgUtil.prepare(originalConfig); + } + + @Override + public List split(int adviceNumber) { + List splitResult = new ArrayList(); + for(int i = 0; i < adviceNumber; i++) { + splitResult.add(this.originalConfig.clone()); + } + return splitResult; + } + + @Override + public void post() { + + Adb4pgUtil.post(originalConfig); + } + + @Override + public void destroy() { + + } + + + + } + + public static class Task extends Writer.Task { + private Configuration writerSliceConfig; + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + private Adb4pgClientProxy adb4pgClientProxy; + //Adb4pgClient client; + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + this.adb4pgClientProxy = new Adb4pgClientProxy(writerSliceConfig, super.getTaskPluginCollector()); + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){ + @Override + public String calcValueHolder(String columnType){ + if("serial".equalsIgnoreCase(columnType)){ + return "?::int"; + }else if("bit".equalsIgnoreCase(columnType)){ + return "?::bit varying"; + } + return "?::" + columnType; + } + }; + } + + @Override + public void prepare() { + + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + this.adb4pgClientProxy.startWriteWithConnection(recordReceiver, Adb4pgUtil.getAdbpgConnect(writerSliceConfig)); + } + + @Override + public void post() { + + } + + @Override + public void destroy() { + + } + + } +} diff --git a/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java b/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java new file mode 100644 index 00000000..b32e989d --- /dev/null +++ b/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java @@ -0,0 +1,8 @@ +/** + * Greenplum Writer. + * + * @since 0.0.1 + */ +package com.alibaba.datax.plugin.writer.adbpgwriter; + + diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java new file mode 100644 index 00000000..5a09aeb0 --- /dev/null +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java @@ -0,0 +1,182 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter.copy; + +import com.alibaba.cloud.analyticdb.adb4pgclient.*; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.record.DefaultRecord; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil; +import com.alibaba.datax.plugin.writer.adbpgwriter.util.Constant; +import com.alibaba.datax.plugin.writer.adbpgwriter.util.Key; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +/** + * Created by yuncheng on 07/15/2019. + */ +public class Adb4pgClientProxy implements AdbProxy { + private static final Logger LOG = LoggerFactory.getLogger(Adb4pgClientProxy.class); + + private Adb4pgClient adb4pgClient; + private String table; + private String schema; + List columns; + private TableInfo tableInfo; + private TaskPluginCollector taskPluginCollector; + private boolean useRawData[]; + public Adb4pgClientProxy(Configuration configuration,TaskPluginCollector taskPluginCollector) { + this.taskPluginCollector = taskPluginCollector; + + DatabaseConfig databaseConfig = Adb4pgUtil.convertConfiguration(configuration); + + // If the value of column is empty, set null + boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false); + databaseConfig.setEmptyAsNull(emptyAsNull); + + // 使用insert ignore into方式进行插入 + boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false); + databaseConfig.setInsertIgnore(ignoreInsert); + + // commit时,写入ADB出现异常时重试的3次 + int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES); + databaseConfig.setRetryTimes(retryTimes); + + // 重试间隔的时间为1s,单位是ms + int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000); + databaseConfig.setRetryIntervalTime(retryIntervalTime); + + // 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置 + int commitSize = configuration.getInt("commitSize", 10 * 1024 * 1024); + databaseConfig.setCommitSize(commitSize); + + + // 设置写入adb时的并发线程数,默认4,针对配置的所有表 + int parallelNumber = configuration.getInt("parallelNumber", 4); + databaseConfig.setParallelNumber(parallelNumber); + + // 设置client中使用的logger对象,此处使用slf4j.Logger + databaseConfig.setLogger(Adb4pgClientProxy.LOG); + + // sdk 默认值为true + boolean shareDataSource = configuration.getBool("shareDataSource", true); + databaseConfig.setShareDataSource(shareDataSource); + + //List columns = configuration.getList(Key.COLUMN, String.class); + + this.table = configuration.getString(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE); + this.schema = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.SCHEMA); + this.adb4pgClient = new Adb4pgClient(databaseConfig); + this.columns = databaseConfig.getColumns(table,schema); + this.tableInfo = adb4pgClient.getTableInfo(table, schema); + + + this.useRawData = new boolean[this.columns.size()]; + List columnInfos = tableInfo.getColumns(); + for (int i = 0; i < this.columns.size(); i++) { + String oriEachColumn = columns.get(i); + String eachColumn = oriEachColumn; + // 防御性保留字 + if (eachColumn.startsWith(Constant.COLUMN_QUOTE_CHARACTER) + && eachColumn.endsWith(Constant.COLUMN_QUOTE_CHARACTER)) { + eachColumn = eachColumn.substring(1, eachColumn.length() - 1); + } + for (ColumnInfo eachAdsColumn : columnInfos) { + if (eachColumn.equals(eachAdsColumn.getName())) { + + int columnSqltype = eachAdsColumn.getDataType().sqlType; + switch (columnSqltype) { + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + this.useRawData[i] = false; + break; + default: + this.useRawData[i] = true; + break; + } + } + } + } + + } + @Override + public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection) { + try { + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + Row row = new Row(); + List values = new ArrayList(); + this.prepareColumnTypeValue(record, values); + row.setColumnValues(values); + + try { + this.adb4pgClient.addRow(row,this.table, this.schema); + } catch (Adb4pgClientException e) { + if (101 == e.getCode()) { + for (String each : e.getErrData()) { + Record dirtyData = new DefaultRecord(); + dirtyData.addColumn(new StringColumn(each)); + this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage()); + } + } else { + throw e; + } + } + + } + + try { + this.adb4pgClient.commit(); + } catch (Adb4pgClientException e) { + if (101 == e.getCode()) { + for (String each : e.getErrData()) { + Record dirtyData = new DefaultRecord(); + dirtyData.addColumn(new StringColumn(each)); + this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage()); + } + } else { + throw e; + } + } + + }catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + }finally { + DBUtil.closeDBResources(null, null, connection); + } + return; + } + + private void prepareColumnTypeValue(Record record, List values) { + for (int i = 0; i < this.columns.size(); i++) { + Column column = record.getColumn(i); + if (this.useRawData[i]) { + values.add(column.getRawData()); + } else { + values.add(column.asString()); + } + + } + } + + @Override + public void closeResource() { + try { + LOG.info("stop the adb4pgClient"); + this.adb4pgClient.stop(); + } catch (Exception e) { + LOG.warn("stop adbClient meet a exception, ignore it: {}", e.getMessage(), e); + } + } +} diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java new file mode 100644 index 00000000..7b66678a --- /dev/null +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java @@ -0,0 +1,13 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter.copy; + +import com.alibaba.datax.common.plugin.RecordReceiver; + +import java.sql.Connection; +/** + * Created by yuncheng on 07/15/2019. + */ +public interface AdbProxy { + public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection); + + public void closeResource(); +} diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java new file mode 100644 index 00000000..3e1efb04 --- /dev/null +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java @@ -0,0 +1,146 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter.util; + +import com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClient; +import com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClientException; +import com.alibaba.cloud.analyticdb.adb4pgclient.DatabaseConfig; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.spi.ErrorCode; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.datax.plugin.rdbms.writer.Key; +import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.*; + +import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.COLUMN_SPLIT_ERROR; + +/** + * Created by yuncheng on 07/13/2019. + */ +public class Adb4pgUtil { + + private static final Logger LOG = LoggerFactory.getLogger(Adb4pgUtil.class); + private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL; + public static void checkConfig(Configuration originalConfig) { + try { + + DatabaseConfig databaseConfig = convertConfiguration(originalConfig); + + Adb4pgClient testConfigClient = new Adb4pgClient(databaseConfig); + } catch (Exception e) { + throw new Adb4pgClientException(Adb4pgClientException.CONFIG_ERROR, "Check config exception: " + e.getMessage(), null); + } + } + + public static DatabaseConfig convertConfiguration(Configuration originalConfig) { + originalConfig.getNecessaryValue(Key.USERNAME, COLUMN_SPLIT_ERROR); + originalConfig.getNecessaryValue(Key.PASSWORD, COLUMN_SPLIT_ERROR); + + + String userName = originalConfig.getString(Key.USERNAME); + String passWord = originalConfig.getString(Key.PASSWORD); + String tableName = originalConfig.getString(Key.TABLE); + String schemaName = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.SCHEMA); + String host = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.HOST); + String port = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.PORT); + String databseName = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.DATABASE); + + List columns = originalConfig.getList(Key.COLUMN, String.class); + DatabaseConfig databaseConfig = new DatabaseConfig(); + databaseConfig.setHost(host); + databaseConfig.setPort(Integer.valueOf(port)); + databaseConfig.setDatabase(databseName); + + databaseConfig.setUser(userName); + + databaseConfig.setPassword(passWord); + databaseConfig.setLogger(LOG); + + databaseConfig.setInsertIgnore(originalConfig.getBool(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.IS_INSERTINGORE, true)); + databaseConfig.addTable(Collections.singletonList(tableName), schemaName); + databaseConfig.setColumns(columns, tableName, schemaName); + + return databaseConfig; + } + + private static Map> splitBySchemaName(List tables) { + HashMap> res = new HashMap>(16); + + for (String schemaNameTableName: tables) { + String[] s = schemaNameTableName.split("\\."); + if (!res.containsKey(s[0])) { + res.put(s[0], new ArrayList()); + } + res.get(s[0]).add(s[1]); + + } + + return res; + } + + public static Connection getAdbpgConnect(Configuration conf) { + String userName = conf.getString(Key.USERNAME); + String passWord = conf.getString(Key.PASSWORD); + + return DBUtil.getConnection(DataBaseType.PostgreSQL, generateJdbcUrl(conf), userName, passWord); + + } + + private static String generateJdbcUrl(Configuration configuration) { + String host = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.HOST); + String port = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.PORT); + String databseName = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.DATABASE); + String jdbcUrl = "jdbc:postgresql://" + host + ":" + port + "/" + databseName; + return jdbcUrl; + + } + public static void prepare(Configuration originalConfig) { + List preSqls = originalConfig.getList(Key.PRE_SQL, + String.class); + + String tableName = originalConfig.getString(Key.TABLE); + List renderedPreSqls = WriterUtil.renderPreOrPostSqls( + preSqls, tableName); + + if (renderedPreSqls.size() == 0) { + return; + } + + originalConfig.remove(Key.PRE_SQL); + + Connection conn = getAdbpgConnect(originalConfig); + WriterUtil.executeSqls(conn, renderedPreSqls, generateJdbcUrl(originalConfig), DATABASE_TYPE); + DBUtil.closeDBResources(null, null, conn); + + + } + + public static void post(Configuration configuration) { + List postSqls = configuration.getList(Key.POST_SQL, + String.class); + String tableName = configuration.getString(Key.TABLE); + List renderedPostSqls = WriterUtil.renderPreOrPostSqls( + postSqls, tableName); + + if (renderedPostSqls.size() == 0) { + return; + } + + configuration.remove(Key.POST_SQL); + + Connection conn = getAdbpgConnect(configuration); + + WriterUtil.executeSqls(conn, renderedPostSqls, generateJdbcUrl(configuration), DATABASE_TYPE); + DBUtil.closeDBResources(null, null, conn); + } + + +} diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java new file mode 100644 index 00000000..66840297 --- /dev/null +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java @@ -0,0 +1,12 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter.util; +/** + * Created by yuncheng on 07/13/2019. + */ +public class Constant { + public static final int DEFAULT_RETRY_TIMES = 3; + + public static final String COLUMN_QUOTE_CHARACTER = "\""; + + + +} diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java new file mode 100644 index 00000000..d398188d --- /dev/null +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java @@ -0,0 +1,26 @@ +package com.alibaba.datax.plugin.writer.adbpgwriter.util; +/** + * Created by yuncheng on 07/13/2019. + */ +public class Key { + + public final static String COLUMN = "column"; + public final static String IS_INSERTINGORE = "insertIgnore"; + public final static String HOST = "host"; + public final static String PORT = "port"; + public final static String DATABASE = "database"; + public final static String SCHEMA = "schema"; + public final static String EMPTY_AS_NULL = "emptyAsNull"; + + public final static String IGNORE_INSERT = "ignoreInsert"; + + public final static String RETRY_CONNECTION_TIME = "retryTimes"; + + public final static String RETRY_INTERVAL_TIME = "retryIntervalTime"; + + public final static String COMMIT_SIZE = "commitSize"; + + public final static String PARALLEL_NUMBER = "parallelNumber"; + + public final static String SHARED_DATASOURCE = "shareDataSource"; +} diff --git a/adbpgwriter/src/main/resources/plugin.json b/adbpgwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..f4fb54f7 --- /dev/null +++ b/adbpgwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "adbpgwriter", + "class": "com.alibaba.datax.plugin.writer.adbpgwriter.AdbpgWriter", + "description": "", + "developer": "alibaba" +} \ No newline at end of file diff --git a/adbpgwriter/src/main/resources/plugin_job_template.json b/adbpgwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..cc93f08d --- /dev/null +++ b/adbpgwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "adbpgwriter", + "parameter": { + "username": "", + "password": "", + "host": "", + "port": "", + "database": "", + "schema": "", + "table": "", + "column": ["*"] + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 1ff6391d..8b7c72f7 100755 --- a/package.xml +++ b/package.xml @@ -336,5 +336,12 @@ datax + + adbpgwriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index 6e68418e..79865473 100755 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,8 @@ hbase11xsqlreader elasticsearchwriter tsdbwriter - + adbpgwriter + plugin-rdbms-util plugin-unstructured-storage-util From 6f798436c504d9ef4954546d46aa9a038799720e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=B5=E6=88=90?= Date: Thu, 25 Jul 2019 11:30:58 +0800 Subject: [PATCH 2/5] add adbpgwriter plugin --- adbpgwriter/src/main/doc/adbpgwriter.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/adbpgwriter/src/main/doc/adbpgwriter.md b/adbpgwriter/src/main/doc/adbpgwriter.md index 800e5619..80427241 100644 --- a/adbpgwriter/src/main/doc/adbpgwriter.md +++ b/adbpgwriter/src/main/doc/adbpgwriter.md @@ -72,6 +72,8 @@ COPY命令将数据写入ADB PG数据库中。 "database": "database", "schema": "schema", "table": "table", + "preSql": ["delete * from table"], + "postSql": ["select * from table"], "column": ["*"] } } @@ -151,7 +153,21 @@ COPY命令将数据写入ADB PG数据库中。 * 必选:是
* 默认值:否
- +* **preSql** + + * 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
+ + * 必选:否
+ + * 默认值:否
+ +* **postSql** + + * 描述:写入数据到目的表后,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。
+ + * 必选:否
+ + * 默认值:否
### 3.3 类型转换 目前 AdbpgWriter 支持大部分 ADB PG 数据库的类型,但也存在部分没有支持的情况,请注意检查你的类型。 From 864ce4d482d3b091c338a3ca08189d2124b5f232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=B5=E6=88=90?= Date: Thu, 1 Aug 2019 19:36:26 +0800 Subject: [PATCH 3/5] change package structure --- .../alibaba/datax/plugin/writer/adbpgwriter}/AdbpgWriter.java | 2 +- .../datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java | 2 +- .../alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java | 2 +- .../alibaba/datax/plugin/writer/adbpgwriter}/package-info.java | 0 .../datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java | 2 +- .../alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java | 2 +- .../com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java | 2 +- 7 files changed, 6 insertions(+), 6 deletions(-) rename adbpgwriter/src/main/java/{com.alibaba.datax.plugin.writer.adbpgwriter => com/alibaba/datax/plugin/writer/adbpgwriter}/AdbpgWriter.java (98%) rename adbpgwriter/src/main/java/{com.alibaba.datax.plugin.writer.adbpgwriter => com/alibaba/datax/plugin/writer/adbpgwriter}/package-info.java (100%) diff --git a/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java similarity index 98% rename from adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java rename to adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java index 7c1c6379..9e5b5487 100644 --- a/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java @@ -21,7 +21,7 @@ import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.*; import static com.alibaba.datax.plugin.rdbms.util.DataBaseType.PostgreSQL; /** - * Created by yuncheng on 07/13/2019. + * @author yuncheng */ public class AdbpgWriter extends Writer { private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL; diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java index 5a09aeb0..49986076 100644 --- a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java @@ -23,7 +23,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.List; /** - * Created by yuncheng on 07/15/2019. + * @author yuncheng */ public class Adb4pgClientProxy implements AdbProxy { private static final Logger LOG = LoggerFactory.getLogger(Adb4pgClientProxy.class); diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java index 7b66678a..0edf5436 100644 --- a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java @@ -4,7 +4,7 @@ import com.alibaba.datax.common.plugin.RecordReceiver; import java.sql.Connection; /** - * Created by yuncheng on 07/15/2019. + * @author yuncheng */ public interface AdbProxy { public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection); diff --git a/adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/package-info.java similarity index 100% rename from adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java rename to adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/package-info.java diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java index 3e1efb04..944d951c 100644 --- a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java @@ -23,7 +23,7 @@ import java.util.*; import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.COLUMN_SPLIT_ERROR; /** - * Created by yuncheng on 07/13/2019. + * @author yuncheng */ public class Adb4pgUtil { diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java index 66840297..3184d0da 100644 --- a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java @@ -1,6 +1,6 @@ package com.alibaba.datax.plugin.writer.adbpgwriter.util; /** - * Created by yuncheng on 07/13/2019. + * @author yuncheng */ public class Constant { public static final int DEFAULT_RETRY_TIMES = 3; diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java index d398188d..ec5017ae 100644 --- a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java +++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java @@ -1,6 +1,6 @@ package com.alibaba.datax.plugin.writer.adbpgwriter.util; /** - * Created by yuncheng on 07/13/2019. + * @author yuncheng */ public class Key { From 256c65b392d60bad4a9a71615e31d726788417ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=B5=E6=88=90?= Date: Mon, 5 Aug 2019 21:03:55 +0800 Subject: [PATCH 4/5] delete the unused file e 'duplicate' --- adbpgwriter/duplicate | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 adbpgwriter/duplicate diff --git a/adbpgwriter/duplicate b/adbpgwriter/duplicate deleted file mode 100644 index e69de29b..00000000 From 469a59fe19f773a5c9c2a23d0172f34baf04e8ee Mon Sep 17 00:00:00 2001 From: Chase Date: Tue, 6 Aug 2019 11:32:12 +0800 Subject: [PATCH 5/5] Update pom.xml use adb4pgclient 1.0.0 --- adbpgwriter/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adbpgwriter/pom.xml b/adbpgwriter/pom.xml index 2b78aaf0..094dbd9c 100644 --- a/adbpgwriter/pom.xml +++ b/adbpgwriter/pom.xml @@ -73,7 +73,7 @@ com.alibaba.cloud.analyticdb adb4pgclient - 1.0.0-SNAPSHOT + 1.0.0 @@ -110,4 +110,4 @@ - \ No newline at end of file +