Merge pull request #245 from wuchase/master

upgrade ads writer by adbclient sdk
This commit is contained in:
binaryWorld 2019-01-29 17:10:28 +08:00 committed by GitHub
commit 27cd1cf720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 340 additions and 60 deletions

View File

@ -38,7 +38,7 @@ DataX 将数据导入ADS为当前导入任务分配的ADS项目表随后DataX
![中转导入](http://aligitlab.oss-cn-hangzhou-zmf.aliyuncs.com/uploads/cdp/cdp/f805dea46b/_____2015-04-10___12.06.21.png)
1. CDP底层得到明文的 jdbc://host:port/dbname + username + password + table 以此连接ADS 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。
1. DataX底层得到明文的 jdbc://host:port/dbname + username + password + table 以此连接ADS 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。
2. 检查通过后通过ADS中目标表的元数据反向生成ODPS DDL在ODPS中间project中以ADSWriter的账户建立ODPS表非分区表生命周期设为1-2Day), 并调用ODPSWriter把数据源的数据写入该ODPS表中。

View File

@ -39,6 +39,22 @@
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adbclient</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<dependency>

View File

@ -10,8 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.adswriter.ads.ColumnInfo;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsClientProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertProxy;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertUtil;
import com.alibaba.datax.plugin.writer.adswriter.insert.AdsProxy;
import com.alibaba.datax.plugin.writer.adswriter.load.AdsHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TableMetaHelper;
import com.alibaba.datax.plugin.writer.adswriter.load.TransferProjectConf;
@ -168,8 +170,9 @@ public class AdsWriter extends Writer {
userConfiguredPartitions = Collections.emptyList();
}
if(userConfiguredPartitions.size() > 1)
if(userConfiguredPartitions.size() > 1) {
throw DataXException.asDataXException(AdsWriterErrorCode.ODPS_PARTITION_FAILED, "");
}
if(userConfiguredPartitions.size() == 0) {
loadAdsData(adsHelper, odpsTableName,null);
@ -304,7 +307,7 @@ public class AdsWriter extends Writer {
private Configuration writerSliceConfig;
private OdpsWriter.Task odpsWriterTaskProxy = new OdpsWriter.Task();
private String writeMode;
private String schema;
private String table;
@ -312,17 +315,28 @@ public class AdsWriter extends Writer {
// warn: 只有在insert, stream模式才有, 对于load模式表明为odps临时表了
private TableInfo tableInfo;
private String writeProxy;
AdsProxy proxy = null;
@Override
public void init() {
writerSliceConfig = super.getPluginJobConf();
this.writeMode = this.writerSliceConfig.getString(Key.WRITE_MODE);
this.schema = writerSliceConfig.getString(Key.SCHEMA);
this.table = writerSliceConfig.getString(Key.ADS_TABLE);
if(Constant.LOADMODE.equalsIgnoreCase(this.writeMode)) {
odpsWriterTaskProxy.setPluginJobConf(writerSliceConfig);
odpsWriterTaskProxy.init();
} else if(Constant.INSERTMODE.equalsIgnoreCase(this.writeMode) || Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
this.writeProxy = "datax";
} else {
this.writeProxy = this.writerSliceConfig.getString("writeProxy", "adbClient");
}
this.writerSliceConfig.set("writeProxy", this.writeProxy);
try {
this.tableInfo = AdsUtil.createAdsHelper(this.writerSliceConfig).getTableInfo(this.table);
} catch (AdsException e) {
@ -361,7 +375,12 @@ public class AdsWriter extends Writer {
List<String> columns = writerSliceConfig.getList(Key.COLUMN, String.class);
Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig);
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();
AdsInsertProxy proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
if (StringUtils.equalsIgnoreCase(this.writeProxy, "adbClient")) {
this.proxy = new AdsClientProxy(table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
} else {
this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
}
proxy.startWriteWithConnection(recordReceiver, connection, columnNumber);
}
}
@ -381,6 +400,9 @@ public class AdsWriter extends Writer {
odpsWriterTaskProxy.destroy();
} else {
//do noting until now
if (null != this.proxy) {
this.proxy.closeResource();
}
}
}
}

View File

@ -0,0 +1,222 @@
package com.alibaba.datax.plugin.writer.adswriter.insert;
import com.alibaba.cloud.analyticdb.adbclient.AdbClient;
import com.alibaba.cloud.analyticdb.adbclient.AdbClientException;
import com.alibaba.cloud.analyticdb.adbclient.DatabaseConfig;
import com.alibaba.cloud.analyticdb.adbclient.Row;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.util.Constant;
import com.alibaba.datax.plugin.writer.adswriter.util.Key;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Types;
import java.util.*;
public class AdsClientProxy implements AdsProxy {
private static final Logger LOG = LoggerFactory.getLogger(AdsClientProxy.class);
private String table;
private TaskPluginCollector taskPluginCollector;
public Configuration configuration;
// columnName: <java sql type, ads type name>
private Map<String, Pair<Integer, String>> adsTableColumnsMetaData;
private Map<String, Pair<Integer, String>> userConfigColumnsMetaData;
private boolean useRawData[];
private AdbClient adbClient;
/**
* warn: not support columns as *
*/
public AdsClientProxy(String table, List<String> columns, Configuration configuration,
TaskPluginCollector taskPluginCollector, TableInfo tableInfo) {
this.configuration = configuration;
this.taskPluginCollector = taskPluginCollector;
this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, columns);
this.userConfigColumnsMetaData = new HashMap<String, Pair<Integer, String>>();
List<String> adsColumnsNames = tableInfo.getColumnsNames();
// 要使用用户配置的column顺序
this.useRawData = new boolean[columns.size()];
for (int i = 0; i < columns.size(); i++) {
String oriEachColumn = columns.get(i);
String eachColumn = oriEachColumn;
// 防御性保留字
if (eachColumn.startsWith(Constant.ADS_QUOTE_CHARACTER)
&& eachColumn.endsWith(Constant.ADS_QUOTE_CHARACTER)) {
eachColumn = eachColumn.substring(1, eachColumn.length() - 1);
}
for (String eachAdsColumn : adsColumnsNames) {
if (eachColumn.equalsIgnoreCase(eachAdsColumn)) {
Pair<Integer, String> eachMeta = this.adsTableColumnsMetaData.get(eachAdsColumn);
this.userConfigColumnsMetaData.put(oriEachColumn, eachMeta);
int columnSqltype = eachMeta.getLeft();
switch (columnSqltype) {
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
this.useRawData[i] = false;
break;
default:
this.useRawData[i] = true;
break;
}
}
}
}
DatabaseConfig databaseConfig = new DatabaseConfig();
String url = configuration.getString(Key.ADS_URL);
String[] hostAndPort = StringUtils.split(url, ":");
if (hostAndPort.length != 2) {
throw DataXException.asDataXException(AdsWriterErrorCode.INVALID_CONFIG_VALUE,
"url should be in host:port format!");
}
this.table = table.toLowerCase();
databaseConfig.setHost(hostAndPort[0]);
databaseConfig.setPort(Integer.parseInt(hostAndPort[1]));
databaseConfig.setUser(configuration.getString(Key.USERNAME));
databaseConfig.setPassword(configuration.getString(Key.PASSWORD));
databaseConfig.setDatabase(configuration.getString(Key.SCHEMA));
databaseConfig.setTable(Collections.singletonList(this.table));
databaseConfig.setColumns(this.table, columns);
// 如果出现insert失败是否跳过
boolean ignoreInsertError = configuration.getBool("ignoreInsertError", false);
databaseConfig.setIgnoreInsertError(ignoreInsertError);
// If the value of column is empty, set null
boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
databaseConfig.setEmptyAsNull(emptyAsNull);
// 使用insert ignore into方式进行插入
boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false);
databaseConfig.setInsertIgnore(ignoreInsert);
// commit时写入ADB出现异常时重试的3次
int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
databaseConfig.setRetryTimes(retryTimes);
// 重试间隔的时间为1s单位是ms
int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000);
databaseConfig.setRetryIntervalTime(retryIntervalTime);
// 设置自动提交的SQL长度单位Byte默认为32KB一般不建议设置
int commitSize = configuration.getInt("commitSize", 32768);
databaseConfig.setCommitSize(commitSize);
// sdk默认为true
boolean partitionBatch = configuration.getBool("partitionBatch", true);
databaseConfig.setPartitionBatch(partitionBatch);
// 设置写入adb时的并发线程数默认4针对配置的所有表
int parallelNumber = configuration.getInt("parallelNumber", 4);
databaseConfig.setParallelNumber(parallelNumber);
// 设置client中使用的logger对象此处使用slf4j.Logger
databaseConfig.setLogger(AdsClientProxy.LOG);
// 设置在拼接insert sql时是否需要带上字段名默认为true
boolean insertWithColumnName = configuration.getBool("insertWithColumnName", true);
databaseConfig.setInsertWithColumnName(insertWithColumnName);
// sdk 默认值为true
boolean shareDataSource = configuration.getBool("shareDataSource", true);
databaseConfig.setShareDataSource(shareDataSource);
String password = databaseConfig.getPassword();
databaseConfig.setPassword(password.replaceAll(".", "*"));
// 避免敏感信息直接打印
LOG.info("Adb database config is : {}", JSON.toJSONString(databaseConfig));
databaseConfig.setPassword(password);
// Initialize AdbClient初始化实例之后databaseConfig的配置信息不能再修改
this.adbClient = new AdbClient(databaseConfig);
}
@Override
public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection, int columnNumber) {
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
Row row = new Row();
List<Object> values = new ArrayList<Object>();
this.prepareColumnTypeValue(record, values);
row.setColumnValues(values);
try {
this.adbClient.addRow(this.table, row);
} catch (AdbClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
}
try {
this.adbClient.commit();
} catch (AdbClientException e) {
if (101 == e.getCode()) {
for (String each : e.getErrData()) {
Record dirtyData = new DefaultRecord();
dirtyData.addColumn(new StringColumn(each));
this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage());
}
} else {
throw e;
}
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
DBUtil.closeDBResources(null, null, connection);
}
}
private void prepareColumnTypeValue(Record record, List<Object> values) {
for (int i = 0; i < this.useRawData.length; i++) {
Column column = record.getColumn(i);
if (this.useRawData[i]) {
values.add(column.getRawData());
} else {
values.add(column.asString());
}
}
}
@Override
public void closeResource() {
try {
LOG.info("stop the adbClient");
this.adbClient.stop();
} catch (Exception e) {
LOG.warn("stop adbClient meet a exception, ignore it: {}", e.getMessage(), e);
}
}
}

View File

@ -34,7 +34,7 @@ import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class AdsInsertProxy {
public class AdsInsertProxy implements AdsProxy {
private static final Logger LOG = LoggerFactory
.getLogger(AdsInsertProxy.class);
@ -46,9 +46,9 @@ public class AdsInsertProxy {
private TaskPluginCollector taskPluginCollector;
private Configuration configuration;
private Boolean emptyAsNull;
private String writeMode;
private String insertSqlPrefix;
private String deleteSqlPrefix;
private int opColumnIndex;
@ -58,10 +58,10 @@ public class AdsInsertProxy {
private Map<String, Pair<Integer, String>> userConfigColumnsMetaData;
// columnName: index @ ads column
private Map<String, Integer> primaryKeyNameIndexMap;
private int retryTimeUpperLimit;
private Connection currentConnection;
private String partitionColumn;
private int partitionColumnIndex = -1;
private int partitionCount;
@ -80,14 +80,14 @@ public class AdsInsertProxy {
Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES);
this.partitionCount = tableInfo.getPartitionCount();
this.partitionColumn = tableInfo.getPartitionColumn();
//目前ads新建的表如果未插入数据不能通过select colums from table where 1=2获取列信息需要读取ads数据字典
//not this: this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ","));
//no retry here(fetch meta data) 注意实时表列换序的可能
this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, this.columns);
this.userConfigColumnsMetaData = new HashMap<String, Pair<Integer, String>>();
List<String> primaryKeyColumnName = tableInfo.getPrimaryKeyColumns();
List<String> primaryKeyColumnName = tableInfo.getPrimaryKeyColumns();
List<String> adsColumnsNames = tableInfo.getColumnsNames();
this.primaryKeyNameIndexMap = new HashMap<String, Integer>();
//warn: 要使用用户配置的column顺序, 不要使用从ads元数据获取的column顺序, 原来复用load列顺序其实有问题的
@ -108,7 +108,7 @@ public class AdsInsertProxy {
this.userConfigColumnsMetaData.put(oriEachColumn, this.adsTableColumnsMetaData.get(eachAdsColumn));
}
}
// 根据第几个column分区列排序ads实时表只有一级分区最多256个分区
if (eachColumn.equalsIgnoreCase(this.partitionColumn)) {
this.partitionColumnIndex = i;
@ -117,8 +117,8 @@ public class AdsInsertProxy {
}
public void startWriteWithConnection(RecordReceiver recordReceiver,
Connection connection,
int columnNumber) {
Connection connection,
int columnNumber) {
this.currentConnection = connection;
int batchSize = this.configuration.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
// 默认情况下bufferSize需要和batchSize一致
@ -164,13 +164,13 @@ public class AdsInsertProxy {
OperationType operationType = OperationType.asOperationType(optionColumnValue);
if (operationType.isInsertTemplate()) {
writeBuffer.add(record);
if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE ) {
if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE) {
this.lastDmlMode = Constant.INSERTMODE;
if (writeBuffer.size() >= bufferSize) {
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
} else {
} else {
this.lastDmlMode = Constant.INSERTMODE;
// 模式变换触发一次提交ads delete, 并进入insert模式
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
@ -178,7 +178,7 @@ public class AdsInsertProxy {
}
} else if (operationType.isDeleteTemplate()) {
deleteBuffer.add(record);
if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE ) {
if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE) {
this.lastDmlMode = Constant.DELETEMODE;
if (deleteBuffer.size() >= bufferSize) {
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
@ -196,14 +196,14 @@ public class AdsInsertProxy {
}
}
}
if (!writeBuffer.isEmpty()) {
//doOneRecord(writeBuffer, Constant.INSERTMODE);
this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize);
writeBuffer.clear();
}
// 2个缓冲最多一个不为空同时
if (null!= deleteBuffer && !deleteBuffer.isEmpty()) {
if (null != deleteBuffer && !deleteBuffer.isEmpty()) {
//doOneRecord(deleteBuffer, Constant.DELETEMODE);
this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize);
deleteBuffer.clear();
@ -216,14 +216,14 @@ public class AdsInsertProxy {
DBUtil.closeDBResources(null, null, connection);
}
}
/**
* @param bufferSize datax缓冲记录条数
* @param batchSize datax向ads系统一次发送数据条数
* @param buffer datax缓冲区
* @param mode 实时表模式insert 或者 stream
* */
private void doBatchRecordWithPartitionSort(List<Record> buffer, String mode, int bufferSize, int batchSize) throws SQLException{
* @param batchSize datax向ads系统一次发送数据条数
* @param buffer datax缓冲区
* @param mode 实时表模式insert 或者 stream
*/
private void doBatchRecordWithPartitionSort(List<Record> buffer, String mode, int bufferSize, int batchSize) throws SQLException {
//warn: 排序会影响数据插入顺序, 如果源头没有数据约束, 排序可能造成数据不一致, 快速排序是一种不稳定的排序算法
//warn: 不明确配置bufferSize或者小于batchSize的情况下不要进行排序;如果缓冲区实际内容条数少于batchSize也不排序了最后一次的余量
int recordBufferedNumber = buffer.size();
@ -261,7 +261,7 @@ public class AdsInsertProxy {
return true;
}
}, this.retryTimeUpperLimit, 2000L, true, retryExceptionClasss);
}catch (SQLException e) {
} catch (SQLException e) {
LOG.warn(String.format("after retry %s times, doBatchRecord meet a exception: ", this.retryTimeUpperLimit), e);
LOG.info("try to re execute for each record...");
doOneRecord(buffer, mode);
@ -274,7 +274,7 @@ public class AdsInsertProxy {
DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
//warn: ADS 无法支持事物roll back都是不管用
@SuppressWarnings("resource")
private void doBatchRecordDml(List<Record> buffer, String mode) throws Exception {
@ -313,7 +313,7 @@ public class AdsInsertProxy {
} else {
try {
Throwable causeThrowable = eachException.getCause();
eachException = causeThrowable == null ? null : (Exception)causeThrowable;
eachException = causeThrowable == null ? null : (Exception) causeThrowable;
} catch (Exception castException) {
LOG.warn("doBatchRecordDml meet a no! retry exception: " + e.getMessage());
throw e;
@ -330,7 +330,7 @@ public class AdsInsertProxy {
DBUtil.closeDBResources(statement, null);
}
}
private void doOneRecord(List<Record> buffer, final String mode) {
List<Class<?>> retryExceptionClasss = new ArrayList<Class<?>>();
retryExceptionClasss.add(com.mysql.jdbc.exceptions.jdbc4.CommunicationsException.class);
@ -350,7 +350,7 @@ public class AdsInsertProxy {
}
}
}
@SuppressWarnings("resource")
private void doOneRecordDml(Record record, String mode) throws Exception {
Statement statement = null;
@ -380,7 +380,7 @@ public class AdsInsertProxy {
} else {
try {
Throwable causeThrowable = eachException.getCause();
eachException = causeThrowable == null ? null : (Exception)causeThrowable;
eachException = causeThrowable == null ? null : (Exception) causeThrowable;
} catch (Exception castException) {
LOG.warn("doOneDml meet a no! retry exception: " + e.getMessage());
throw e;
@ -397,7 +397,7 @@ public class AdsInsertProxy {
DBUtil.closeDBResources(statement, null);
}
}
private boolean isRetryable(Throwable e) {
Class<?> meetExceptionClass = e.getClass();
if (meetExceptionClass == com.mysql.jdbc.exceptions.jdbc4.CommunicationsException.class) {
@ -408,7 +408,7 @@ public class AdsInsertProxy {
}
return false;
}
private String generateDmlSql(Connection connection, Record record, String mode) throws SQLException {
String sql = null;
StringBuilder sqlSb = new StringBuilder();
@ -417,7 +417,7 @@ public class AdsInsertProxy {
sqlSb.append("(");
int columnsSize = this.columns.size();
for (int i = 0; i < columnsSize; i++) {
if((i + 1) != columnsSize) {
if ((i + 1) != columnsSize) {
sqlSb.append("?,");
} else {
sqlSb.append("?");
@ -431,7 +431,7 @@ public class AdsInsertProxy {
if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) {
if (preparedParamsIndex >= this.opColumnIndex) {
preparedParamsIndex = i + 1;
}
}
}
String columnName = this.columns.get(i);
int columnSqltype = this.userConfigColumnsMetaData.get(columnName).getLeft();
@ -446,7 +446,7 @@ public class AdsInsertProxy {
int entrySetSize = primaryEntrySet.size();
int i = 0;
for (Entry<String, Integer> eachEntry : primaryEntrySet) {
if((i + 1) != entrySetSize) {
if ((i + 1) != entrySetSize) {
sqlSb.append(String.format(" (%s = ?) and ", eachEntry.getKey()));
} else {
sqlSb.append(String.format(" (%s = ?) ", eachEntry.getKey()));
@ -463,7 +463,7 @@ public class AdsInsertProxy {
int columnSqlType = this.userConfigColumnsMetaData.get(columnName).getLeft();
int primaryKeyInUserConfigIndex = this.primaryKeyNameIndexMap.get(columnName);
if (primaryKeyInUserConfigIndex >= this.opColumnIndex) {
primaryKeyInUserConfigIndex ++;
primaryKeyInUserConfigIndex++;
}
prepareColumnTypeValue(statement, columnSqlType, record.getColumn(primaryKeyInUserConfigIndex), i, columnName);
i++;
@ -473,8 +473,8 @@ public class AdsInsertProxy {
}
return sql;
}
private void appendDmlSqlValues(Connection connection, Record record, StringBuilder sqlSb, String mode) throws SQLException {
private void appendDmlSqlValues(Connection connection, Record record, StringBuilder sqlSb, String mode) throws SQLException {
String sqlResult = this.generateDmlSql(connection, record, mode);
if (mode.equalsIgnoreCase(Constant.INSERTMODE)) {
sqlSb.append(",");
@ -508,21 +508,21 @@ public class AdsInsertProxy {
case Types.DECIMAL:
case Types.REAL:
String numValue = column.asString();
if(emptyAsNull && "".equals(numValue) || numValue == null){
if (emptyAsNull && "".equals(numValue) || numValue == null) {
//statement.setObject(preparedPatamIndex + 1, null);
statement.setNull(preparedPatamIndex + 1, Types.BIGINT);
} else{
} else {
statement.setLong(preparedPatamIndex + 1, column.asLong());
}
break;
case Types.FLOAT:
case Types.DOUBLE:
String floatValue = column.asString();
if(emptyAsNull && "".equals(floatValue) || floatValue == null){
if (emptyAsNull && "".equals(floatValue) || floatValue == null) {
//statement.setObject(preparedPatamIndex + 1, null);
statement.setNull(preparedPatamIndex + 1, Types.DOUBLE);
} else{
} else {
statement.setDouble(preparedPatamIndex + 1, column.asDouble());
}
break;
@ -535,13 +535,13 @@ public class AdsInsertProxy {
} else {
statement.setLong(preparedPatamIndex + 1, longValue);
}
break;
case Types.DATE:
java.sql.Date sqlDate = null;
try {
if("".equals(column.getRawData())) {
if ("".equals(column.getRawData())) {
utilDate = null;
} else {
utilDate = column.asDate();
@ -550,17 +550,17 @@ public class AdsInsertProxy {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
}
statement.setDate(preparedPatamIndex + 1, sqlDate);
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
if("".equals(column.getRawData())) {
if ("".equals(column.getRawData())) {
utilDate = null;
} else {
utilDate = column.asDate();
@ -579,7 +579,7 @@ public class AdsInsertProxy {
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
if("".equals(column.getRawData())) {
if ("".equals(column.getRawData())) {
utilDate = null;
} else {
utilDate = column.asDate();
@ -597,14 +597,14 @@ public class AdsInsertProxy {
break;
case Types.BOOLEAN:
//case Types.BIT: ads 没有bit
//case Types.BIT: ads 没有bit
Boolean booleanValue = column.asBoolean();
if (null == booleanValue) {
statement.setNull(preparedPatamIndex + 1, Types.BOOLEAN);
} else {
statement.setBoolean(preparedPatamIndex + 1, booleanValue);
}
break;
default:
Pair<Integer, String> columnMetaPair = this.userConfigColumnsMetaData.get(columnName);
@ -616,7 +616,7 @@ public class AdsInsertProxy {
columnName, columnMetaPair.getRight(), columnMetaPair.getLeft()));
}
}
private static int getHashPartition(String value, int totalHashPartitionNum) {
long crc32 = (value == null ? getCRC32("-1") : getCRC32(value));
return (int) (crc32 % totalHashPartitionNum);
@ -628,4 +628,8 @@ public class AdsInsertProxy {
checksum.update(bytes, 0, bytes.length);
return checksum.getValue();
}
@Override
public void closeResource() {
}
}

View File

@ -0,0 +1,12 @@
package com.alibaba.datax.plugin.writer.adswriter.insert;
import com.alibaba.datax.common.plugin.RecordReceiver;
import java.sql.Connection;
public interface AdsProxy {
public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection,
int columnNumber);
public void closeResource();
}

View File

@ -121,7 +121,7 @@ public class AdsHelper {
connection = DriverManager.getConnection(url, connectionProps);
statement = connection.createStatement();
// ads 表名schema名不区分大小写, 提高用户易用性, 注意列顺序性
String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase());
String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where table_schema = `'%s'` and table_name = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase());
LOG.info(String.format("检查列信息sql语句:%s", columnMetaSql));
rs = statement.executeQuery(columnMetaSql);
@ -145,7 +145,7 @@ public class AdsHelper {
tableInfo.setTableName(table);
DBUtil.closeDBResources(rs, statement, null);
String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'`", schema.toLowerCase(), table.toLowerCase());
String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where table_schema = `'%s'` and table_name = `'%s'`", schema.toLowerCase(), table.toLowerCase());
LOG.info(String.format("检查表信息sql语句:%s", tableMetaSql));
statement = connection.createStatement();
rs = statement.executeQuery(tableMetaSql);

View File

@ -16,7 +16,7 @@ public class Constant {
public static final long DEFAULT_SOCKET_TIMEOUT = 3600000L;
public static final int DEFAULT_RETRY_TIMES = 2;
public static final int DEFAULT_RETRY_TIMES = 3;
public static final String INSERT_TEMPLATE = "insert into %s ( %s ) values ";

View File

@ -29,7 +29,9 @@ public final class Key {
public final static String BATCH_SIZE = "batchSize";
public final static String BUFFER_SIZE = "bufferSize";
public final static String IGNORE_INSERT = "ignoreInsert";
public final static String PRE_SQL = "preSql";
public final static String POST_SQL = "postSql";
@ -37,7 +39,9 @@ public final class Key {
public final static String SOCKET_TIMEOUT = "socketTimeout";
public final static String RETRY_CONNECTION_TIME = "retryTimes";
public final static String RETRY_INTERVAL_TIME = "retryIntervalTime";
public final static String JDBC_URL_SUFFIX = "urlSuffix";
/**