diff --git a/adswriter/doc/adswriter.md b/adswriter/doc/adswriter.md
index f80229bb..4a0fd961 100644
--- a/adswriter/doc/adswriter.md
+++ b/adswriter/doc/adswriter.md
@@ -38,7 +38,7 @@ DataX 将数据导入ADS为当前导入任务分配的ADS项目表,随后DataX

-1. CDP底层得到明文的 jdbc://host:port/dbname + username + password + table, 以此连接ADS, 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意,此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。
+1. DataX底层得到明文的 jdbc://host:port/dbname + username + password + table, 以此连接ADS, 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意,此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。
2. 检查通过后,通过ADS中目标表的元数据反向生成ODPS DDL,在ODPS中间project中,以ADSWriter的账户建立ODPS表(非分区表,生命周期设为1-2Day), 并调用ODPSWriter把数据源的数据写入该ODPS表中。
diff --git a/adswriter/pom.xml b/adswriter/pom.xml
index de407dfe..90be43e8 100644
--- a/adswriter/pom.xml
+++ b/adswriter/pom.xml
@@ -39,6 +39,22 @@
com.alibaba.datax
plugin-rdbms-util
${datax-project-version}
+
+
+ com.alibaba
+ druid
+
+
+
+
+ com.alibaba.cloud.analyticdb
+ adbclient
+ 1.0.2
+
+
+ com.alibaba
+ druid
+ 1.1.12
diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java
index 7e04c844..866b93d0 100644
--- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java
+++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java
@@ -10,8 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.adswriter.ads.ColumnInfo;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
+import com.alibaba.datax.plugin.writer.adswriter.insert.AdsClientProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertUtil;
+import com.alibaba.datax.plugin.writer.adswriter.insert.AdsProxy;
import com.alibaba.datax.plugin.writer.adswriter.load.AdsHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TableMetaHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TransferProjectConf;
@@ -168,8 +170,9 @@ public class AdsWriter extends Writer {
userConfiguredPartitions = Collections.emptyList();
}
- if(userConfiguredPartitions.size() > 1)
+ if(userConfiguredPartitions.size() > 1) {
throw DataXException.asDataXException(AdsWriterErrorCode.ODPS_PARTITION_FAILED, "");
+ }
if(userConfiguredPartitions.size() == 0) {
loadAdsData(adsHelper, odpsTableName,null);
@@ -304,7 +307,7 @@ public class AdsWriter extends Writer {
private Configuration writerSliceConfig;
private OdpsWriter.Task odpsWriterTaskProxy = new OdpsWriter.Task();
-
+
private String writeMode;
private String schema;
private String table;
@@ -312,17 +315,28 @@ public class AdsWriter extends Writer {
// warn: 只有在insert, stream模式才有, 对于load模式表明为odps临时表了
private TableInfo tableInfo;
+ private String writeProxy;
+ AdsProxy proxy = null;
+
@Override
public void init() {
writerSliceConfig = super.getPluginJobConf();
this.writeMode = this.writerSliceConfig.getString(Key.WRITE_MODE);
this.schema = writerSliceConfig.getString(Key.SCHEMA);
this.table = writerSliceConfig.getString(Key.ADS_TABLE);
-
+
if(Constant.LOADMODE.equalsIgnoreCase(this.writeMode)) {
odpsWriterTaskProxy.setPluginJobConf(writerSliceConfig);
odpsWriterTaskProxy.init();
} else if(Constant.INSERTMODE.equalsIgnoreCase(this.writeMode) || Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
+
+ if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
+ this.writeProxy = "datax";
+ } else {
+ this.writeProxy = this.writerSliceConfig.getString("writeProxy", "adbClient");
+ }
+ this.writerSliceConfig.set("writeProxy", this.writeProxy);
+
try {
this.tableInfo = AdsUtil.createAdsHelper(this.writerSliceConfig).getTableInfo(this.table);
} catch (AdsException e) {
@@ -361,7 +375,12 @@ public class AdsWriter extends Writer {
List columns = writerSliceConfig.getList(Key.COLUMN, String.class);
Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig);
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
- AdsInsertProxy proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
+
+ if (StringUtils.equalsIgnoreCase(this.writeProxy, "adbClient")) {
+ this.proxy = new AdsClientProxy(table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
+ } else {
+ this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
+ }
proxy.startWriteWithConnection(recordReceiver, connection, columnNumber);
}
}
@@ -381,6 +400,9 @@ public class AdsWriter extends Writer {
odpsWriterTaskProxy.destroy();
} else {
//do noting until now
+ if (null != this.proxy) {
+ this.proxy.closeResource();
+ }
}
}
}
diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java
new file mode 100644
index 00000000..8fdc70d6
--- /dev/null
+++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java
@@ -0,0 +1,222 @@
+package com.alibaba.datax.plugin.writer.adswriter.insert;
+
+import com.alibaba.cloud.analyticdb.adbclient.AdbClient;
+import com.alibaba.cloud.analyticdb.adbclient.AdbClientException;
+import com.alibaba.cloud.analyticdb.adbclient.DatabaseConfig;
+import com.alibaba.cloud.analyticdb.adbclient.Row;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.core.transport.record.DefaultRecord;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode;
+import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
+import com.alibaba.datax.plugin.writer.adswriter.util.Constant;
+import com.alibaba.datax.plugin.writer.adswriter.util.Key;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Types;
+import java.util.*;
+
+public class AdsClientProxy implements AdsProxy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AdsClientProxy.class);
+
+ private String table;
+ private TaskPluginCollector taskPluginCollector;
+ public Configuration configuration;
+
+ // columnName:
+ private Map> adsTableColumnsMetaData;
+ private Map> userConfigColumnsMetaData;
+ private boolean useRawData[];
+
+ private AdbClient adbClient;
+
+ /**
+ * warn: not support columns as *
+ */
+ public AdsClientProxy(String table, List columns, Configuration configuration,
+ TaskPluginCollector taskPluginCollector, TableInfo tableInfo) {
+ this.configuration = configuration;
+ this.taskPluginCollector = taskPluginCollector;
+
+ this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, columns);
+ this.userConfigColumnsMetaData = new HashMap>();
+ List adsColumnsNames = tableInfo.getColumnsNames();
+ // 要使用用户配置的column顺序
+ this.useRawData = new boolean[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ String oriEachColumn = columns.get(i);
+ String eachColumn = oriEachColumn;
+ // 防御性保留字
+ if (eachColumn.startsWith(Constant.ADS_QUOTE_CHARACTER)
+ && eachColumn.endsWith(Constant.ADS_QUOTE_CHARACTER)) {
+ eachColumn = eachColumn.substring(1, eachColumn.length() - 1);
+ }
+ for (String eachAdsColumn : adsColumnsNames) {
+ if (eachColumn.equalsIgnoreCase(eachAdsColumn)) {
+ Pair eachMeta = this.adsTableColumnsMetaData.get(eachAdsColumn);
+ this.userConfigColumnsMetaData.put(oriEachColumn, eachMeta);
+ int columnSqltype = eachMeta.getLeft();
+ switch (columnSqltype) {
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ this.useRawData[i] = false;
+ break;
+ default:
+ this.useRawData[i] = true;
+ break;
+ }
+ }
+ }
+ }
+
+ DatabaseConfig databaseConfig = new DatabaseConfig();
+ String url = configuration.getString(Key.ADS_URL);
+ String[] hostAndPort = StringUtils.split(url, ":");
+ if (hostAndPort.length != 2) {
+ throw DataXException.asDataXException(AdsWriterErrorCode.INVALID_CONFIG_VALUE,
+ "url should be in host:port format!");
+ }
+ this.table = table.toLowerCase();
+ databaseConfig.setHost(hostAndPort[0]);
+ databaseConfig.setPort(Integer.parseInt(hostAndPort[1]));
+ databaseConfig.setUser(configuration.getString(Key.USERNAME));
+ databaseConfig.setPassword(configuration.getString(Key.PASSWORD));
+ databaseConfig.setDatabase(configuration.getString(Key.SCHEMA));
+ databaseConfig.setTable(Collections.singletonList(this.table));
+ databaseConfig.setColumns(this.table, columns);
+
+ // 如果出现insert失败,是否跳过
+ boolean ignoreInsertError = configuration.getBool("ignoreInsertError", false);
+ databaseConfig.setIgnoreInsertError(ignoreInsertError);
+
+ // If the value of column is empty, set null
+ boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
+ databaseConfig.setEmptyAsNull(emptyAsNull);
+
+ // 使用insert ignore into方式进行插入
+ boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false);
+ databaseConfig.setInsertIgnore(ignoreInsert);
+
+ // commit时,写入ADB出现异常时重试的3次
+ int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
+ databaseConfig.setRetryTimes(retryTimes);
+
+ // 重试间隔的时间为1s,单位是ms
+ int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
+ databaseConfig.setRetryIntervalTime(retryIntervalTime);
+
+ // 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置
+ int commitSize = configuration.getInt("commitSize", 32768);
+ databaseConfig.setCommitSize(commitSize);
+
+ // sdk默认为true
+ boolean partitionBatch = configuration.getBool("partitionBatch", true);
+ databaseConfig.setPartitionBatch(partitionBatch);
+
+ // 设置写入adb时的并发线程数,默认4,针对配置的所有表
+ int parallelNumber = configuration.getInt("parallelNumber", 4);
+ databaseConfig.setParallelNumber(parallelNumber);
+
+ // 设置client中使用的logger对象,此处使用slf4j.Logger
+ databaseConfig.setLogger(AdsClientProxy.LOG);
+
+ // 设置在拼接insert sql时是否需要带上字段名,默认为true
+ boolean insertWithColumnName = configuration.getBool("insertWithColumnName", true);
+ databaseConfig.setInsertWithColumnName(insertWithColumnName);
+
+ // sdk 默认值为true
+ boolean shareDataSource = configuration.getBool("shareDataSource", true);
+ databaseConfig.setShareDataSource(shareDataSource);
+
+ String password = databaseConfig.getPassword();
+ databaseConfig.setPassword(password.replaceAll(".", "*"));
+ // 避免敏感信息直接打印
+ LOG.info("Adb database config is : {}", JSON.toJSONString(databaseConfig));
+ databaseConfig.setPassword(password);
+
+ // Initialize AdbClient,初始化实例之后,databaseConfig的配置信息不能再修改
+ this.adbClient = new AdbClient(databaseConfig);
+ }
+
+ @Override
+ public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection, int columnNumber) {
+ try {
+ Record record;
+ while ((record = recordReceiver.getFromReader()) != null) {
+
+ Row row = new Row();
+ List