Merge pull request #411 from wuchase/master

support adb pg writer
This commit is contained in:
Trafalgar 2019-08-16 17:05:51 +08:00 committed by GitHub
commit 05a000db44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 912 additions and 1 deletions

113
adbpgwriter/pom.xml Normal file
View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>adbpgwriter</artifactId>
<name>adbpgwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adb4pgclient</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/adbpgwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>adbpgwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/adbpgwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/adbpgwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,232 @@
# DataX ADB PG Writer
---
## 1 快速介绍
AdbpgWriter 插件实现了写入数据到 ABD PG版数据库的功能。在底层实现上AdbpgWriter 插件会先缓存需要写入的数据,当缓存的
数据量达到 commitSize 时,插件会通过 JDBC 连接远程 ADB PG版 数据库,并执行 COPY 命令将数据写入 ADB PG 数据库。
AdbpgWriter 可以作为数据迁移工具为用户提供服务。
## 2 实现原理
AdbpgWriter 通过 DataX 框架获取 Reader 生成的协议数据首先会将数据缓存当缓存的数据量达到commitSize时插件根据你配置生成相应的COPY语句执行
COPY命令将数据写入ADB PG数据库中。
## 3 功能说明
### 3.1 配置样例
* 这里使用一份从内存产生到 AdbpgWriter导入的数据
```json
{
"job": {
"setting": {
"speed": {
"channel": 32
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
]
},
"sliceRecordCount": 1000
},
"writer": {
"name": "adbpgwriter",
"parameter": {
"username": "username",
"password": "password",
"host": "host",
"port": "1234",
"database": "database",
"schema": "schema",
"table": "table",
"preSql": ["delete * from table"],
"postSql": ["select * from table"],
"column": ["*"]
}
}
}
]
}
}
```
### 3.2 参数说明
* **name**
* 描述:插件名称 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **username**
* 描述:目的数据库的用户名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **password**
* 描述:目的数据库的密码 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **host**
* 描述:目的数据库主机名 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **port**
* 描述:目的数据库的端口 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **database**
* 描述:需要写入的表所属的数据库名称 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **schema**
* 描述需要写入的表所属的schema名称 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述:需要写入的表名称 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]
注意1、我们强烈不推荐你这样配置因为当你目的表字段个数、类型等有改动时你的任务可能运行不正确或者失败
2、此处 column 不能配置任何常量值
* 必选:是 <br />
* 默认值:否 <br />
* **preSql**
* 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:`"preSql":["delete from @table"]`,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称 <br />
* 必选:否 <br />
* 默认值:否 <br />
* **postSql**
* 描述:写入数据到目的表后,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,可以使用 `@table` 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。 <br />
* 必选:否 <br />
* 默认值:否 <br />
### 3.3 类型转换
目前 AdbpgWriter 支持大部分 ADB PG 数据库的类型,但也存在部分没有支持的情况,请注意检查你的类型。
下面列出 AdbpgWriter 针对 ADB PG 类型转换列表:
| DataX 内部类型| ADB PG 数据类型 |
| -------- | ----- |
| Long |bigint, bigserial, integer, smallint, serial |
| Double |double precision, float, numeric, real |
| String |varchar, char, text|
| Date |date, time, timestamp |
| Boolean |bool|
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
建表语句:
```sql
create table schematest.test_datax (
t1 int,
t2 bigint,
t3 bigserial,
t4 float,
t5 timestamp,
t6 varchar
)distributed by(t1);
```
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
1. cpu: 24核
2. mem: 96GB
* ADB PG数据库机器参数为:
1. 平均core数量:4
2. primary segment 数量: 4
3. 计算组数量:2
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数| commitSize MB | DataX速度(Rec/s)| DataX流量(M/s)
|--------|--------| --------|--------|
|1| 10 | 54098 | 15.54 |
|1| 20 | 55000 | 15.80 |
|4| 10 | 183333 | 52.66 |
|4| 20 | 173684 | 49.89 |
|8| 10 | 330000 | 94.79 |
|8| 20 | 300000 | 86.17 |
|16| 10 | 412500 | 118.48 |
|16| 20 | 366666 | 105.32 |
|32| 10 | 366666 | 105.32 |
#### 4.2.2 性能测试小结
1. `channel数对性能影响很大`
2. `通常不建议写入数据库时,通道个数 > 32`

View File

@ -0,0 +1,117 @@
package com.alibaba.datax.plugin.writer.adbpgwriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil;
import com.alibaba.datax.plugin.writer.adbpgwriter.copy.Adb4pgClientProxy;
import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.*;
import static com.alibaba.datax.plugin.rdbms.util.DataBaseType.PostgreSQL;
/**
* @author yuncheng
*/
public class AdbpgWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL;
public static class Job extends Writer.Job {
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;
private static final Logger LOG = LoggerFactory.getLogger(Writer.Job.class);
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
LOG.info("in Job.init(), config is:[\n{}\n]", originalConfig.toJSON());
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
//convert to DatabaseConfig, use DatabaseConfig to check user configuration
Adb4pgUtil.checkConfig(originalConfig);
}
@Override
public void prepare() {
Adb4pgUtil.prepare(originalConfig);
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> splitResult = new ArrayList<Configuration>();
for(int i = 0; i < adviceNumber; i++) {
splitResult.add(this.originalConfig.clone());
}
return splitResult;
}
@Override
public void post() {
Adb4pgUtil.post(originalConfig);
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
private Adb4pgClientProxy adb4pgClientProxy;
//Adb4pgClient client;
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.adb4pgClientProxy = new Adb4pgClientProxy(writerSliceConfig, super.getTaskPluginCollector());
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){
@Override
public String calcValueHolder(String columnType){
if("serial".equalsIgnoreCase(columnType)){
return "?::int";
}else if("bit".equalsIgnoreCase(columnType)){
return "?::bit varying";
}
return "?::" + columnType;
}
};
}
@Override
public void prepare() {
}
@Override
public void startWrite(RecordReceiver recordReceiver) {
this.adb4pgClientProxy.startWriteWithConnection(recordReceiver, Adb4pgUtil.getAdbpgConnect(writerSliceConfig));
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}

View File

@ -0,0 +1,182 @@
package com.alibaba.datax.plugin.writer.adbpgwriter.copy;
import com.alibaba.cloud.analyticdb.adb4pgclient.*;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil;
import com.alibaba.datax.plugin.writer.adbpgwriter.util.Constant;
import com.alibaba.datax.plugin.writer.adbpgwriter.util.Key;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
/**
* @author yuncheng
*/
public class Adb4pgClientProxy implements AdbProxy {
private static final Logger LOG = LoggerFactory.getLogger(Adb4pgClientProxy.class);
private Adb4pgClient adb4pgClient;
private String table;
private String schema;
List<String> columns;
private TableInfo tableInfo;
private TaskPluginCollector taskPluginCollector;
private boolean useRawData[];
public Adb4pgClientProxy(Configuration configuration,TaskPluginCollector taskPluginCollector) {
this.taskPluginCollector = taskPluginCollector;
DatabaseConfig databaseConfig = Adb4pgUtil.convertConfiguration(configuration);
// If the value of column is empty, set null
boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
databaseConfig.setEmptyAsNull(emptyAsNull);
// 使用insert ignore into方式进行插入
boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false);
databaseConfig.setInsertIgnore(ignoreInsert);
// commit时写入ADB出现异常时重试的3次
int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
databaseConfig.setRetryTimes(retryTimes);
// 重试间隔的时间为1s单位是ms
int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
databaseConfig.setRetryIntervalTime(retryIntervalTime);
// 设置自动提交的SQL长度单位Byte默认为32KB一般不建议设置
int commitSize = configuration.getInt("commitSize", 10 * 1024 * 1024);
databaseConfig.setCommitSize(commitSize);
// 设置写入adb时的并发线程数默认4针对配置的所有表
int parallelNumber = configuration.getInt("parallelNumber", 4);
databaseConfig.setParallelNumber(parallelNumber);
// 设置client中使用的logger对象此处使用slf4j.Logger
databaseConfig.setLogger(Adb4pgClientProxy.LOG);
// sdk 默认值为true
boolean shareDataSource = configuration.getBool("shareDataSource", true);
databaseConfig.setShareDataSource(shareDataSource);
//List<String> columns = configuration.getList(Key.COLUMN, String.class);
this.table = configuration.getString(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE);
this.schema = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.SCHEMA);
this.adb4pgClient = new Adb4pgClient(databaseConfig);
this.columns = databaseConfig.getColumns(table,schema);
this.tableInfo = adb4pgClient.getTableInfo(table, schema);
this.useRawData = new boolean[this.columns.size()];
List<ColumnInfo> columnInfos = tableInfo.getColumns();
for (int i = 0; i < this.columns.size(); i++) {
String oriEachColumn = columns.get(i);
String eachColumn = oriEachColumn;
// 防御性保留字
if (eachColumn.startsWith(Constant.COLUMN_QUOTE_CHARACTER)
&& eachColumn.endsWith(Constant.COLUMN_QUOTE_CHARACTER)) {
eachColumn = eachColumn.substring(1, eachColumn.length() - 1);
}
for (ColumnInfo eachAdsColumn : columnInfos) {
if (eachColumn.equals(eachAdsColumn.getName())) {
int columnSqltype = eachAdsColumn.getDataType().sqlType;
switch (columnSqltype) {
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
this.useRawData[i] = false;
break;
default:
this.useRawData[i] = true;
break;
}
}
}
}
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
Row row = new Row();
List<Object> values = new ArrayList<Object>();
this.prepareColumnTypeValue(record, values);
row.setColumnValues(values);
try {
this.adb4pgClient.addRow(row,this.table, this.schema);
} catch (Adb4pgClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
}
try {
this.adb4pgClient.commit();
} catch (Adb4pgClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
}catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}finally {
DBUtil.closeDBResources(null, null, connection);
}
return;
}
private void prepareColumnTypeValue(Record record, List<Object> values) {
for (int i = 0; i < this.columns.size(); i++) {
Column column = record.getColumn(i);
if (this.useRawData[i]) {
values.add(column.getRawData());
} else {
values.add(column.asString());
}
}
}
@Override
public void closeResource() {
try {
LOG.info("stop the adb4pgClient");
this.adb4pgClient.stop();
} catch (Exception e) {
LOG.warn("stop adbClient meet a exception, ignore it: {}", e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,13 @@
package com.alibaba.datax.plugin.writer.adbpgwriter.copy;
import com.alibaba.datax.common.plugin.RecordReceiver;
import java.sql.Connection;
/**
* @author yuncheng
*/
public interface AdbProxy {
public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection);
public void closeResource();
}

View File

@ -0,0 +1,8 @@
/**
* Greenplum Writer.
*
* @since 0.0.1
*/
package com.alibaba.datax.plugin.writer.adbpgwriter;

View File

@ -0,0 +1,146 @@
package com.alibaba.datax.plugin.writer.adbpgwriter.util;
import com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClient;
import com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClientException;
import com.alibaba.cloud.analyticdb.adb4pgclient.DatabaseConfig;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.*;
import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.COLUMN_SPLIT_ERROR;
/**
* @author yuncheng
*/
public class Adb4pgUtil {
private static final Logger LOG = LoggerFactory.getLogger(Adb4pgUtil.class);
private static final DataBaseType DATABASE_TYPE = DataBaseType.PostgreSQL;
public static void checkConfig(Configuration originalConfig) {
try {
DatabaseConfig databaseConfig = convertConfiguration(originalConfig);
Adb4pgClient testConfigClient = new Adb4pgClient(databaseConfig);
} catch (Exception e) {
throw new Adb4pgClientException(Adb4pgClientException.CONFIG_ERROR, "Check config exception: " + e.getMessage(), null);
}
}
public static DatabaseConfig convertConfiguration(Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.USERNAME, COLUMN_SPLIT_ERROR);
originalConfig.getNecessaryValue(Key.PASSWORD, COLUMN_SPLIT_ERROR);
String userName = originalConfig.getString(Key.USERNAME);
String passWord = originalConfig.getString(Key.PASSWORD);
String tableName = originalConfig.getString(Key.TABLE);
String schemaName = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.SCHEMA);
String host = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.HOST);
String port = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.PORT);
String databseName = originalConfig.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.DATABASE);
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
DatabaseConfig databaseConfig = new DatabaseConfig();
databaseConfig.setHost(host);
databaseConfig.setPort(Integer.valueOf(port));
databaseConfig.setDatabase(databseName);
databaseConfig.setUser(userName);
databaseConfig.setPassword(passWord);
databaseConfig.setLogger(LOG);
databaseConfig.setInsertIgnore(originalConfig.getBool(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.IS_INSERTINGORE, true));
databaseConfig.addTable(Collections.singletonList(tableName), schemaName);
databaseConfig.setColumns(columns, tableName, schemaName);
return databaseConfig;
}
private static Map<String, List<String>> splitBySchemaName(List<String> tables) {
HashMap<String, List<String>> res = new HashMap<String, List<String>>(16);
for (String schemaNameTableName: tables) {
String[] s = schemaNameTableName.split("\\.");
if (!res.containsKey(s[0])) {
res.put(s[0], new ArrayList<String>());
}
res.get(s[0]).add(s[1]);
}
return res;
}
public static Connection getAdbpgConnect(Configuration conf) {
String userName = conf.getString(Key.USERNAME);
String passWord = conf.getString(Key.PASSWORD);
return DBUtil.getConnection(DataBaseType.PostgreSQL, generateJdbcUrl(conf), userName, passWord);
}
private static String generateJdbcUrl(Configuration configuration) {
String host = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.HOST);
String port = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.PORT);
String databseName = configuration.getString(com.alibaba.datax.plugin.writer.adbpgwriter.util.Key.DATABASE);
String jdbcUrl = "jdbc:postgresql://" + host + ":" + port + "/" + databseName;
return jdbcUrl;
}
public static void prepare(Configuration originalConfig) {
List<String> preSqls = originalConfig.getList(Key.PRE_SQL,
String.class);
String tableName = originalConfig.getString(Key.TABLE);
List<String> renderedPreSqls = WriterUtil.renderPreOrPostSqls(
preSqls, tableName);
if (renderedPreSqls.size() == 0) {
return;
}
originalConfig.remove(Key.PRE_SQL);
Connection conn = getAdbpgConnect(originalConfig);
WriterUtil.executeSqls(conn, renderedPreSqls, generateJdbcUrl(originalConfig), DATABASE_TYPE);
DBUtil.closeDBResources(null, null, conn);
}
public static void post(Configuration configuration) {
List<String> postSqls = configuration.getList(Key.POST_SQL,
String.class);
String tableName = configuration.getString(Key.TABLE);
List<String> renderedPostSqls = WriterUtil.renderPreOrPostSqls(
postSqls, tableName);
if (renderedPostSqls.size() == 0) {
return;
}
configuration.remove(Key.POST_SQL);
Connection conn = getAdbpgConnect(configuration);
WriterUtil.executeSqls(conn, renderedPostSqls, generateJdbcUrl(configuration), DATABASE_TYPE);
DBUtil.closeDBResources(null, null, conn);
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.adbpgwriter.util;
/**
* @author yuncheng
*/
public class Constant {
public static final int DEFAULT_RETRY_TIMES = 3;
public static final String COLUMN_QUOTE_CHARACTER = "\"";
}

View File

@ -0,0 +1,26 @@
package com.alibaba.datax.plugin.writer.adbpgwriter.util;
/**
* @author yuncheng
*/
public class Key {
public final static String COLUMN = "column";
public final static String IS_INSERTINGORE = "insertIgnore";
public final static String HOST = "host";
public final static String PORT = "port";
public final static String DATABASE = "database";
public final static String SCHEMA = "schema";
public final static String EMPTY_AS_NULL = "emptyAsNull";
public final static String IGNORE_INSERT = "ignoreInsert";
public final static String RETRY_CONNECTION_TIME = "retryTimes";
public final static String RETRY_INTERVAL_TIME = "retryIntervalTime";
public final static String COMMIT_SIZE = "commitSize";
public final static String PARALLEL_NUMBER = "parallelNumber";
public final static String SHARED_DATASOURCE = "shareDataSource";
}

View File

@ -0,0 +1,6 @@
{
"name": "adbpgwriter",
"class": "com.alibaba.datax.plugin.writer.adbpgwriter.AdbpgWriter",
"description": "",
"developer": "alibaba"
}

View File

@ -0,0 +1,13 @@
{
"name": "adbpgwriter",
"parameter": {
"username": "",
"password": "",
"host": "",
"port": "",
"database": "",
"schema": "",
"table": "",
"column": ["*"]
}
}

View File

@ -336,5 +336,12 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>adbpgwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets> </fileSets>
</assembly> </assembly>

View File

@ -87,6 +87,7 @@
<module>hbase11xsqlreader</module> <module>hbase11xsqlreader</module>
<module>elasticsearchwriter</module> <module>elasticsearchwriter</module>
<module>tsdbwriter</module> <module>tsdbwriter</module>
<module>adbpgwriter</module>
<!-- common support module --> <!-- common support module -->
<module>plugin-rdbms-util</module> <module>plugin-rdbms-util</module>