Write AnalyticDB by adbclient

This commit is contained in:
chase.wc 2018-12-13 15:33:44 +08:00
parent d4d1ea6a15
commit a57425d558
8 changed files with 339 additions and 59 deletions

View File

@ -39,6 +39,22 @@
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adbclient</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<dependency>

View File

@ -10,8 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.adswriter.ads.ColumnInfo;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsClientProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertUtil;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsProxy;
import com.alibaba.datax.plugin.writer.adswriter.load.AdsHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TableMetaHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TransferProjectConf;
@ -168,8 +170,9 @@ public class AdsWriter extends Writer {
userConfiguredPartitions = Collections.emptyList();
}
if(userConfiguredPartitions.size() > 1)
if(userConfiguredPartitions.size() > 1) {
throw DataXException.asDataXException(AdsWriterErrorCode.ODPS_PARTITION_FAILED, "");
}
if(userConfiguredPartitions.size() == 0) {
loadAdsData(adsHelper, odpsTableName,null);
@ -312,6 +315,9 @@ public class AdsWriter extends Writer {
// warn: 只有在insert, stream模式才有, 对于load模式表明为odps临时表了
private TableInfo tableInfo;
private String writeProxy;
AdsProxy proxy = null;
@Override
public void init() {
writerSliceConfig = super.getPluginJobConf();
@ -323,6 +329,14 @@ public class AdsWriter extends Writer {
odpsWriterTaskProxy.setPluginJobConf(writerSliceConfig);
odpsWriterTaskProxy.init();
} else if(Constant.INSERTMODE.equalsIgnoreCase(this.writeMode) || Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
this.writeProxy = "datax";
} else {
this.writeProxy = this.writerSliceConfig.getString("writeProxy", "adbClient");
}
this.writerSliceConfig.set("writeProxy", this.writeProxy);
try {
this.tableInfo = AdsUtil.createAdsHelper(this.writerSliceConfig).getTableInfo(this.table);
} catch (AdsException e) {
@ -361,7 +375,12 @@ public class AdsWriter extends Writer {
List<String> columns = writerSliceConfig.getList(Key.COLUMN, String.class);
Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig);
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
AdsInsertProxy proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
if (StringUtils.equalsIgnoreCase(this.writeProxy, "adbClient")) {
this.proxy = new AdsClientProxy(table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
} else {
this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
}
proxy.startWriteWithConnection(recordReceiver, connection, columnNumber);
}
}
@ -381,6 +400,9 @@ public class AdsWriter extends Writer {
odpsWriterTaskProxy.destroy();
} else {
//do noting until now
if (null != this.proxy) {
this.proxy.closeResource();
}
}
}
}

View File

@ -0,0 +1,222 @@
package com.alibaba.datax.plugin.writer.adswriter.insert;
import com.alibaba.cloud.analyticdb.adbclient.AdbClient;
import com.alibaba.cloud.analyticdb.adbclient.AdbClientException;
import com.alibaba.cloud.analyticdb.adbclient.DatabaseConfig;
import com.alibaba.cloud.analyticdb.adbclient.Row;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.util.Constant;
import com.alibaba.datax.plugin.writer.adswriter.util.Key;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Types;
import java.util.*;
public class AdsClientProxy implements AdsProxy {
private static final Logger LOG = LoggerFactory.getLogger(AdsClientProxy.class);
private String table;
private TaskPluginCollector taskPluginCollector;
public Configuration configuration;
// columnName: <java sql type, ads type name>
private Map<String, Pair<Integer, String>> adsTableColumnsMetaData;
private Map<String, Pair<Integer, String>> userConfigColumnsMetaData;
private boolean useRawData[];
private AdbClient adbClient;
/**
* warn: not support columns as *
*/
public AdsClientProxy(String table, List<String> columns, Configuration configuration,
TaskPluginCollector taskPluginCollector, TableInfo tableInfo) {
this.configuration = configuration;
this.taskPluginCollector = taskPluginCollector;
this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, columns);
this.userConfigColumnsMetaData = new HashMap<String, Pair<Integer, String>>();
List<String> adsColumnsNames = tableInfo.getColumnsNames();
// 要使用用户配置的column顺序
this.useRawData = new boolean[columns.size()];
for (int i = 0; i < columns.size(); i++) {
String oriEachColumn = columns.get(i);
String eachColumn = oriEachColumn;
// 防御性保留字
if (eachColumn.startsWith(Constant.ADS_QUOTE_CHARACTER)
&& eachColumn.endsWith(Constant.ADS_QUOTE_CHARACTER)) {
eachColumn = eachColumn.substring(1, eachColumn.length() - 1);
}
for (String eachAdsColumn : adsColumnsNames) {
if (eachColumn.equalsIgnoreCase(eachAdsColumn)) {
Pair<Integer, String> eachMeta = this.adsTableColumnsMetaData.get(eachAdsColumn);
this.userConfigColumnsMetaData.put(oriEachColumn, eachMeta);
int columnSqltype = eachMeta.getLeft();
switch (columnSqltype) {
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
this.useRawData[i] = false;
break;
default:
this.useRawData[i] = true;
break;
}
}
}
}
DatabaseConfig databaseConfig = new DatabaseConfig();
String url = configuration.getString(Key.ADS_URL);
String[] hostAndPort = StringUtils.split(url, ":");
if (hostAndPort.length != 2) {
throw DataXException.asDataXException(AdsWriterErrorCode.INVALID_CONFIG_VALUE,
"url should be in host:port format!");
}
databaseConfig.setHost(hostAndPort[0]);
databaseConfig.setPort(Integer.parseInt(hostAndPort[1]));
databaseConfig.setUser(configuration.getString(Key.USERNAME));
databaseConfig.setPassword(configuration.getString(Key.PASSWORD));
databaseConfig.setDatabase(configuration.getString(Key.SCHEMA));
databaseConfig.setTable(Collections.singletonList(table));
this.table = table;
databaseConfig.setColumns(table, columns);
// 如果出现insert失败是否跳过
boolean ignoreInsertError = configuration.getBool("ignoreInsertError", false);
databaseConfig.setIgnoreInsertError(ignoreInsertError);
// If the value of column is empty, set null
boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
databaseConfig.setEmptyAsNull(emptyAsNull);
// 使用insert ignore into方式进行插入
boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false);
databaseConfig.setInsertIgnore(ignoreInsert);
// commit时写入ADB出现异常时重试的3次
int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
databaseConfig.setRetryTimes(retryTimes);
// 重试间隔的时间为1s单位是ms
int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
databaseConfig.setRetryIntervalTime(retryIntervalTime);
// 设置自动提交的SQL长度单位Byte默认为32KB一般不建议设置
int commitSize = configuration.getInt("commitSize", 32768);
databaseConfig.setCommitSize(commitSize);
// sdk默认为true
boolean partitionBatch = configuration.getBool("partitionBatch", true);
databaseConfig.setPartitionBatch(partitionBatch);
// 设置写入adb时的并发线程数默认4针对配置的所有表
int parallelNumber = configuration.getInt("parallelNumber", 4);
databaseConfig.setParallelNumber(parallelNumber);
// 设置client中使用的logger对象此处使用slf4j.Logger
databaseConfig.setLogger(AdsClientProxy.LOG);
// 设置在拼接insert sql时是否需要带上字段名默认为true
boolean insertWithColumnName = configuration.getBool("insertWithColumnName", true);
databaseConfig.setInsertWithColumnName(insertWithColumnName);
// sdk 默认值为true
boolean shareDataSource = configuration.getBool("shareDataSource", true);
databaseConfig.setShareDataSource(shareDataSource);
String password = databaseConfig.getPassword();
databaseConfig.setPassword(password.replaceAll(".", "*"));
// 避免敏感信息直接打印
LOG.info("Adb database config is : {}", JSON.toJSONString(databaseConfig));
databaseConfig.setPassword(password);
// Initialize AdbClient初始化实例之后databaseConfig的配置信息不能再修改
this.adbClient = new AdbClient(databaseConfig);
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection, int columnNumber) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
Row row = new Row();
List<Object> values = new ArrayList<Object>();
this.prepareColumnTypeValue(record, values);
row.setColumnValues(values);
try {
this.adbClient.addRow(this.table, row);
} catch (AdbClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
}
try {
this.adbClient.commit();
} catch (AdbClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(null, null, connection);
}
}
private void prepareColumnTypeValue(Record record, List<Object> values) {
for (int i = 0; i < this.useRawData.length; i++) {
Column column = record.getColumn(i);
if (this.useRawData[i]) {
values.add(column.getRawData());
} else {
values.add(column.asString());
}
}
}
@Override
public void closeResource() {
try {
LOG.info("stop the adbClient");
this.adbClient.stop();
} catch (Exception e) {
LOG.warn("stop adbClient meet a exception, ignore it: {}", e.getMessage(), e);
}
}
}

View File

@ -34,7 +34,7 @@ import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class AdsInsertProxy {
public class AdsInsertProxy implements AdsProxy {
private static final Logger LOG = LoggerFactory
.getLogger(AdsInsertProxy.class);
@ -222,7 +222,7 @@ public class AdsInsertProxy {
* @param batchSize datax向ads系统一次发送数据条数
* @param buffer datax缓冲区
* @param mode 实时表模式insert 或者 stream
* */
*/
private void doBatchRecordWithPartitionSort(List<Record> buffer, String mode, int bufferSize, int batchSize) throws SQLException {
//warn: 排序会影响数据插入顺序, 如果源头没有数据约束, 排序可能造成数据不一致, 快速排序是一种不稳定的排序算法
//warn: 不明确配置bufferSize或者小于batchSize的情况下不要进行排序;如果缓冲区实际内容条数少于batchSize也不排序了最后一次的余量
@ -628,4 +628,8 @@ public class AdsInsertProxy {
checksum.update(bytes, 0, bytes.length);
return checksum.getValue();
}
@Override
public void closeResource() {
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.adswriter.insert;
import com.alibaba.datax.common.plugin.RecordReceiver;
import java.sql.Connection;
public interface AdsProxy {
public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection,
int columnNumber);
public void closeResource();
}

View File

@ -121,7 +121,7 @@ public class AdsHelper {
connection = DriverManager.getConnection(url, connectionProps);
statement = connection.createStatement();
// ads 表名schema名不区分大小写, 提高用户易用性, 注意列顺序性
String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase());
String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where table_schema = `'%s'` and table_name = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase());
LOG.info(String.format("检查列信息sql语句:%s", columnMetaSql));
rs = statement.executeQuery(columnMetaSql);
@ -145,7 +145,7 @@ public class AdsHelper {
tableInfo.setTableName(table);
DBUtil.closeDBResources(rs, statement, null);
String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'`", schema.toLowerCase(), table.toLowerCase());
String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where table_schema = `'%s'` and table_name = `'%s'`", schema.toLowerCase(), table.toLowerCase());
LOG.info(String.format("检查表信息sql语句:%s", tableMetaSql));
statement = connection.createStatement();
rs = statement.executeQuery(tableMetaSql);

View File

@ -16,7 +16,7 @@ public class Constant {
public static final long DEFAULT_SOCKET_TIMEOUT = 3600000L;
public static final int DEFAULT_RETRY_TIMES = 2;
public static final int DEFAULT_RETRY_TIMES = 3;
public static final String INSERT_TEMPLATE = "insert into %s ( %s ) values ";

View File

@ -30,6 +30,8 @@ public final class Key {
public final static String BUFFER_SIZE = "bufferSize";
public final static String IGNORE_INSERT = "ignoreInsert";
public final static String PRE_SQL = "preSql";
public final static String POST_SQL = "postSql";
@ -38,6 +40,8 @@ public final class Key {
public final static String RETRY_CONNECTION_TIME = "retryTimes";
public final static String RETRY_INTERVAL_TIME = "retryIntervalTime";
public final static String JDBC_URL_SUFFIX = "urlSuffix";
/**