diff --git a/adbpgwriter/pom.xml b/adbpgwriter/pom.xml
new file mode 100644
index 00000000..094dbd9c
--- /dev/null
+++ b/adbpgwriter/pom.xml
@@ -0,0 +1,113 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ adbpgwriter
+ adbpgwriter
+ jar
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ mysql
+ mysql-connector-java
+
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+
+ com.alibaba.datax
+ plugin-rdbms-util
+ ${datax-project-version}
+
+
+ com.alibaba
+ druid
+
+
+
+
+ com.alibaba
+ druid
+ 1.1.17
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.commons
+ commons-exec
+ 1.3
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ commons-configuration
+ commons-configuration
+ 1.10
+
+
+
+ com.alibaba.cloud.analyticdb
+ adb4pgclient
+ 1.0.0
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/adbpgwriter/src/main/assembly/package.xml b/adbpgwriter/src/main/assembly/package.xml
new file mode 100644
index 00000000..6b5ba1a8
--- /dev/null
+++ b/adbpgwriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/adbpgwriter
+
+
+ target/
+
+ adbpgwriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/adbpgwriter
+
+
+
+
+
+ false
+ plugin/writer/adbpgwriter/libs
+ runtime
+
+
+
diff --git a/adbpgwriter/src/main/doc/adbpgwriter.md b/adbpgwriter/src/main/doc/adbpgwriter.md
new file mode 100644
index 00000000..80427241
--- /dev/null
+++ b/adbpgwriter/src/main/doc/adbpgwriter.md
@@ -0,0 +1,232 @@
+# DataX ADB PG Writer
+
+
+
+---
+
+
+## 1 快速介绍
+AdbpgWriter 插件实现了写入数据到 ABD PG版数据库的功能。在底层实现上,AdbpgWriter 插件会先缓存需要写入的数据,当缓存的
+数据量达到 commitSize 时,插件会通过 JDBC 连接远程 ADB PG版 数据库,并执行 COPY 命令将数据写入 ADB PG 数据库。
+
+AdbpgWriter 可以作为数据迁移工具为用户提供服务。
+
+## 2 实现原理
+
+AdbpgWriter 通过 DataX 框架获取 Reader 生成的协议数据,首先会将数据缓存,当缓存的数据量达到commitSize时,插件根据你配置生成相应的COPY语句,执行
+COPY命令将数据写入ADB PG数据库中。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 这里使用一份从内存产生到 AdbpgWriter导入的数据
+
+```json
+
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 32
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column" : [
+ {
+ "value": "DataX",
+ "type": "string"
+ },
+ {
+ "value": 19880808,
+ "type": "long"
+ },
+ {
+ "value": "1988-08-08 08:08:08",
+ "type": "date"
+ },
+ {
+ "value": true,
+ "type": "bool"
+ },
+ {
+ "value": "test",
+ "type": "bytes"
+ }
+ ]
+ },
+ "sliceRecordCount": 1000
+ },
+
+ "writer": {
+ "name": "adbpgwriter",
+ "parameter": {
+ "username": "username",
+ "password": "password",
+ "host": "host",
+ "port": "1234",
+ "database": "database",
+ "schema": "schema",
+ "table": "table",
+ "preSql": ["delete * from table"],
+ "postSql": ["select * from table"],
+ "column": ["*"]
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+### 3.2 参数说明
+
+* **name**
+ * 描述:插件名称
+
+ * 必选:是
+
+ * 默认值:无
+
+* **username**
+ * 描述:目的数据库的用户名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **password**
+
+ * 描述:目的数据库的密码
+
+ * 必选:是
+
+ * 默认值:无
+
+* **host**
+
+ * 描述:目的数据库主机名
+
+ * 必选:是
+
+ * 默认值:无
+
+* **port**
+
+ * 描述:目的数据库的端口
+
+ * 必选:是
+
+ * 默认值:无
+* **database**
+
+ * 描述:需要写入的表所属的数据库名称
+
+ * 必选:是
+
+ * 默认值:无
+* **schema**
+
+ * 描述:需要写入的表所属的schema名称
+
+ * 必选:是
+
+ * 默认值:无
+* **table**
+
+ * 描述:需要写入的表名称
+
+ * 必选:是
+
+ * 默认值:无
+* **column**
+
+ * 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]
+
+ 注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
+ 2、此处 column 不能配置任何常量值
+
+ * 必选:是
+
+ * 默认值:否
+* **preSql**
+
+ * 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
+
+ * 必选:否
+
+ * 默认值:否
+
+* **postSql**
+
+ * 描述:写入数据到目的表后,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。
+
+ * 必选:否
+
+ * 默认值:否
+### 3.3 类型转换
+
+目前 AdbpgWriter 支持大部分 ADB PG 数据库的类型,但也存在部分没有支持的情况,请注意检查你的类型。
+
+下面列出 AdbpgWriter 针对 ADB PG 类型转换列表:
+
+| DataX 内部类型| ADB PG 数据类型 |
+| -------- | ----- |
+| Long |bigint, bigserial, integer, smallint, serial |
+| Double |double precision, float, numeric, real |
+| String |varchar, char, text|
+| Date |date, time, timestamp |
+| Boolean |bool|
+
+## 4 性能报告
+
+### 4.1 环境准备
+
+#### 4.1.1 数据特征
+建表语句:
+```sql
+create table schematest.test_datax (
+ t1 int,
+ t2 bigint,
+ t3 bigserial,
+ t4 float,
+ t5 timestamp,
+ t6 varchar
+)distributed by(t1);
+
+```
+
+#### 4.1.2 机器参数
+
+* 执行DataX的机器参数为:
+ 1. cpu: 24核
+ 2. mem: 96GB
+
+
+* ADB PG数据库机器参数为:
+ 1. 平均core数量:4
+ 2. primary segment 数量: 4
+ 3. 计算组数量:2
+### 4.2 测试报告
+
+#### 4.2.1 单表测试报告
+
+| 通道数| commitSize MB | DataX速度(Rec/s)| DataX流量(M/s)
+|--------|--------| --------|--------|
+|1| 10 | 54098 | 15.54 |
+|1| 20 | 55000 | 15.80 |
+|4| 10 | 183333 | 52.66 |
+|4| 20 | 173684 | 49.89 |
+|8| 10 | 330000 | 94.79 |
+|8| 20 | 300000 | 86.17 |
+|16| 10 | 412500 | 118.48 |
+|16| 20 | 366666 | 105.32 |
+|32| 10 | 366666 | 105.32 |
+
+#### 4.2.2 性能测试小结
+1. `channel数对性能影响很大`
+2. `通常不建议写入数据库时,通道个数 > 32`
\ No newline at end of file
diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java
new file mode 100644
index 00000000..9e5b5487
--- /dev/null
+++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/AdbpgWriter.java
@@ -0,0 +1,117 @@
+package com.alibaba.datax.plugin.writer.adbpgwriter;
+
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
+import com.alibaba.datax.plugin.rdbms.writer.Key;
+import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil;
+import com.alibaba.datax.plugin.writer.adbpgwriter.copy.Adb4pgClientProxy;
+import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.*;
+import static com.alibaba.datax.plugin.rdbms.util.DataBaseType.PostgreSQL;
+
+/**
+ * @author yuncheng
+ */
+public class AdbpgWriter extends Writer {
+ private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL;
+
+ public static class Job extends Writer.Job {
+
+ private Configuration originalConfig;
+ private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
+ private static final Logger LOG = LoggerFactory.getLogger(Writer.Job.class);
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ LOG.info("in Job.init(), config is:[\n{}\n]", originalConfig.toJSON());
+ this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
+ //convert to DatabaseConfig, use DatabaseConfig to check user configuration
+ Adb4pgUtil.checkConfig(originalConfig);
+ }
+
+ @Override
+ public void prepare() {
+
+ Adb4pgUtil.prepare(originalConfig);
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ List splitResult = new ArrayList();
+ for(int i = 0; i < adviceNumber; i++) {
+ splitResult.add(this.originalConfig.clone());
+ }
+ return splitResult;
+ }
+
+ @Override
+ public void post() {
+
+ Adb4pgUtil.post(originalConfig);
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+
+
+ }
+
+ public static class Task extends Writer.Task {
+ private Configuration writerSliceConfig;
+ private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
+ private Adb4pgClientProxy adb4pgClientProxy;
+ //Adb4pgClient client;
+ @Override
+ public void init() {
+ this.writerSliceConfig = super.getPluginJobConf();
+ this.adb4pgClientProxy = new Adb4pgClientProxy(writerSliceConfig, super.getTaskPluginCollector());
+ this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){
+ @Override
+ public String calcValueHolder(String columnType){
+ if("serial".equalsIgnoreCase(columnType)){
+ return "?::int";
+ }else if("bit".equalsIgnoreCase(columnType)){
+ return "?::bit varying";
+ }
+ return "?::" + columnType;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() {
+
+ }
+
+ @Override
+ public void startWrite(RecordReceiver recordReceiver) {
+ this.adb4pgClientProxy.startWriteWithConnection(recordReceiver, Adb4pgUtil.getAdbpgConnect(writerSliceConfig));
+ }
+
+ @Override
+ public void post() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ }
+}
diff --git a/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java
new file mode 100644
index 00000000..49986076
--- /dev/null
+++ b/adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java
@@ -0,0 +1,182 @@
+package com.alibaba.datax.plugin.writer.adbpgwriter.copy;
+
+import com.alibaba.cloud.analyticdb.adb4pgclient.*;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.core.transport.record.DefaultRecord;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil;
+import com.alibaba.datax.plugin.writer.adbpgwriter.util.Constant;
+import com.alibaba.datax.plugin.writer.adbpgwriter.util.Key;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * @author yuncheng
+ */
+public class Adb4pgClientProxy implements AdbProxy {
+ private static final Logger LOG = LoggerFactory.getLogger(Adb4pgClientProxy.class);
+
+ private Adb4pgClient adb4pgClient;
+ private String table;
+ private String schema;
+ List columns;
+ private TableInfo tableInfo;
+ private TaskPluginCollector taskPluginCollector;
+ private boolean useRawData[];
+ public Adb4pgClientProxy(Configuration configuration,TaskPluginCollector taskPluginCollector) {
+ this.taskPluginCollector = taskPluginCollector;
+
+ DatabaseConfig databaseConfig = Adb4pgUtil.convertConfiguration(configuration);
+
+ // If the value of column is empty, set null
+ boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
+ databaseConfig.setEmptyAsNull(emptyAsNull);
+
+ // 使用insert ignore into方式进行插入
+ boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false);
+ databaseConfig.setInsertIgnore(ignoreInsert);
+
+ // commit时,写入ADB出现异常时重试的3次
+ int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
+ databaseConfig.setRetryTimes(retryTimes);
+
+ // 重试间隔的时间为1s,单位是ms
+ int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
+ databaseConfig.setRetryIntervalTime(retryIntervalTime);
+
+ // 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置
+ int commitSize = configuration.getInt("commitSize", 10 * 1024 * 1024);
+ databaseConfig.setCommitSize(commitSize);
+
+
+ // 设置写入adb时的并发线程数,默认4,针对配置的所有表
+ int parallelNumber = configuration.getInt("parallelNumber", 4);
+ databaseConfig.setParallelNumber(parallelNumber);
+
+ // 设置client中使用的logger对象,此处使用slf4j.Logger
+ databaseConfig.setLogger(Adb4pgClientProxy.LOG);
+
+ // sdk 默认值为true
+ boolean shareDataSource = configuration.getBool("shareDataSource", true);
+ databaseConfig.setShareDataSource(shareDataSource);
+
+ //List columns = configuration.getList(Key.COLUMN, String.class);
+
+ this.table = configuration.getString(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE);
+ this.schema = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.SCHEMA);
+ this.adb4pgClient = new Adb4pgClient(databaseConfig);
+ this.columns = databaseConfig.getColumns(table,schema);
+ this.tableInfo = adb4pgClient.getTableInfo(table, schema);
+
+
+ this.useRawData = new boolean[this.columns.size()];
+ List columnInfos = tableInfo.getColumns();
+ for (int i = 0; i < this.columns.size(); i++) {
+ String oriEachColumn = columns.get(i);
+ String eachColumn = oriEachColumn;
+ // 防御性保留字
+ if (eachColumn.startsWith(Constant.COLUMN_QUOTE_CHARACTER)
+ && eachColumn.endsWith(Constant.COLUMN_QUOTE_CHARACTER)) {
+ eachColumn = eachColumn.substring(1, eachColumn.length() - 1);
+ }
+ for (ColumnInfo eachAdsColumn : columnInfos) {
+ if (eachColumn.equals(eachAdsColumn.getName())) {
+
+ int columnSqltype = eachAdsColumn.getDataType().sqlType;
+ switch (columnSqltype) {
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ this.useRawData[i] = false;
+ break;
+ default:
+ this.useRawData[i] = true;
+ break;
+ }
+ }
+ }
+ }
+
+ }
+ @Override
+ public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection) {
+ try {
+ Record record;
+ while ((record = recordReceiver.getFromReader()) != null) {
+ Row row = new Row();
+ List