From a57425d5581c3fd6e92a8067535d8b13851a4f83 Mon Sep 17 00:00:00 2001 From: "chase.wc" Date: Thu, 13 Dec 2018 15:33:44 +0800 Subject: [PATCH 1/3] Write AnalyticDB by adbclient --- adswriter/pom.xml | 16 ++ .../plugin/writer/adswriter/AdsWriter.java | 30 ++- .../adswriter/insert/AdsClientProxy.java | 222 ++++++++++++++++++ .../adswriter/insert/AdsInsertProxy.java | 104 ++++---- .../writer/adswriter/insert/AdsProxy.java | 12 + .../writer/adswriter/load/AdsHelper.java | 4 +- .../writer/adswriter/util/Constant.java | 2 +- .../plugin/writer/adswriter/util/Key.java | 8 +- 8 files changed, 339 insertions(+), 59 deletions(-) create mode 100644 adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java create mode 100644 adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsProxy.java diff --git a/adswriter/pom.xml b/adswriter/pom.xml index de407dfe..90be43e8 100644 --- a/adswriter/pom.xml +++ b/adswriter/pom.xml @@ -39,6 +39,22 @@ com.alibaba.datax plugin-rdbms-util ${datax-project-version} + + + com.alibaba + druid + + + + + com.alibaba.cloud.analyticdb + adbclient + 1.0.2 + + + com.alibaba + druid + 1.1.12 diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java index 7e04c844..866b93d0 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/AdsWriter.java @@ -10,8 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; import com.alibaba.datax.plugin.writer.adswriter.ads.ColumnInfo; import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo; +import com.alibaba.datax.plugin.writer.adswriter.insert.AdsClientProxy; import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertProxy; import com.alibaba.datax.plugin.writer.adswriter.insert.AdsInsertUtil; +import com.alibaba.datax.plugin.writer.adswriter.insert.AdsProxy; import com.alibaba.datax.plugin.writer.adswriter.load.AdsHelper; import com.alibaba.datax.plugin.writer.adswriter.load.TableMetaHelper; import com.alibaba.datax.plugin.writer.adswriter.load.TransferProjectConf; @@ -168,8 +170,9 @@ public class AdsWriter extends Writer { userConfiguredPartitions = Collections.emptyList(); } - if(userConfiguredPartitions.size() > 1) + if(userConfiguredPartitions.size() > 1) { throw DataXException.asDataXException(AdsWriterErrorCode.ODPS_PARTITION_FAILED, ""); + } if(userConfiguredPartitions.size() == 0) { loadAdsData(adsHelper, odpsTableName,null); @@ -304,7 +307,7 @@ public class AdsWriter extends Writer { private Configuration writerSliceConfig; private OdpsWriter.Task odpsWriterTaskProxy = new OdpsWriter.Task(); - + private String writeMode; private String schema; private String table; @@ -312,17 +315,28 @@ public class AdsWriter extends Writer { // warn: 只有在insert, stream模式才有, 对于load模式表明为odps临时表了 private TableInfo tableInfo; + private String writeProxy; + AdsProxy proxy = null; + @Override public void init() { writerSliceConfig = super.getPluginJobConf(); this.writeMode = this.writerSliceConfig.getString(Key.WRITE_MODE); this.schema = writerSliceConfig.getString(Key.SCHEMA); this.table = writerSliceConfig.getString(Key.ADS_TABLE); - + if(Constant.LOADMODE.equalsIgnoreCase(this.writeMode)) { odpsWriterTaskProxy.setPluginJobConf(writerSliceConfig); odpsWriterTaskProxy.init(); } else if(Constant.INSERTMODE.equalsIgnoreCase(this.writeMode) || Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) { + + if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) { + this.writeProxy = "datax"; + } else { + this.writeProxy = this.writerSliceConfig.getString("writeProxy", "adbClient"); + } + this.writerSliceConfig.set("writeProxy", this.writeProxy); + try { this.tableInfo = AdsUtil.createAdsHelper(this.writerSliceConfig).getTableInfo(this.table); } catch (AdsException e) { @@ -361,7 +375,12 @@ public class AdsWriter extends Writer { List columns = writerSliceConfig.getList(Key.COLUMN, String.class); Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig); TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector(); - AdsInsertProxy proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo); + + if (StringUtils.equalsIgnoreCase(this.writeProxy, "adbClient")) { + this.proxy = new AdsClientProxy(table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo); + } else { + this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo); + } proxy.startWriteWithConnection(recordReceiver, connection, columnNumber); } } @@ -381,6 +400,9 @@ public class AdsWriter extends Writer { odpsWriterTaskProxy.destroy(); } else { //do noting until now + if (null != this.proxy) { + this.proxy.closeResource(); + } } } } diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java new file mode 100644 index 00000000..bc6414d8 --- /dev/null +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java @@ -0,0 +1,222 @@ +package com.alibaba.datax.plugin.writer.adswriter.insert; + +import com.alibaba.cloud.analyticdb.adbclient.AdbClient; +import com.alibaba.cloud.analyticdb.adbclient.AdbClientException; +import com.alibaba.cloud.analyticdb.adbclient.DatabaseConfig; +import com.alibaba.cloud.analyticdb.adbclient.Row; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.record.DefaultRecord; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode; +import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo; +import com.alibaba.datax.plugin.writer.adswriter.util.Constant; +import com.alibaba.datax.plugin.writer.adswriter.util.Key; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Types; +import java.util.*; + +public class AdsClientProxy implements AdsProxy { + + private static final Logger LOG = LoggerFactory.getLogger(AdsClientProxy.class); + + private String table; + private TaskPluginCollector taskPluginCollector; + public Configuration configuration; + + // columnName: + private Map> adsTableColumnsMetaData; + private Map> userConfigColumnsMetaData; + private boolean useRawData[]; + + private AdbClient adbClient; + + /** + * warn: not support columns as * + */ + public AdsClientProxy(String table, List columns, Configuration configuration, + TaskPluginCollector taskPluginCollector, TableInfo tableInfo) { + this.configuration = configuration; + this.taskPluginCollector = taskPluginCollector; + + this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, columns); + this.userConfigColumnsMetaData = new HashMap>(); + List adsColumnsNames = tableInfo.getColumnsNames(); + // 要使用用户配置的column顺序 + this.useRawData = new boolean[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + String oriEachColumn = columns.get(i); + String eachColumn = oriEachColumn; + // 防御性保留字 + if (eachColumn.startsWith(Constant.ADS_QUOTE_CHARACTER) + && eachColumn.endsWith(Constant.ADS_QUOTE_CHARACTER)) { + eachColumn = eachColumn.substring(1, eachColumn.length() - 1); + } + for (String eachAdsColumn : adsColumnsNames) { + if (eachColumn.equalsIgnoreCase(eachAdsColumn)) { + Pair eachMeta = this.adsTableColumnsMetaData.get(eachAdsColumn); + this.userConfigColumnsMetaData.put(oriEachColumn, eachMeta); + int columnSqltype = eachMeta.getLeft(); + switch (columnSqltype) { + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + this.useRawData[i] = false; + break; + default: + this.useRawData[i] = true; + break; + } + } + } + } + + DatabaseConfig databaseConfig = new DatabaseConfig(); + String url = configuration.getString(Key.ADS_URL); + String[] hostAndPort = StringUtils.split(url, ":"); + if (hostAndPort.length != 2) { + throw DataXException.asDataXException(AdsWriterErrorCode.INVALID_CONFIG_VALUE, + "url should be in host:port format!"); + } + databaseConfig.setHost(hostAndPort[0]); + databaseConfig.setPort(Integer.parseInt(hostAndPort[1])); + databaseConfig.setUser(configuration.getString(Key.USERNAME)); + databaseConfig.setPassword(configuration.getString(Key.PASSWORD)); + databaseConfig.setDatabase(configuration.getString(Key.SCHEMA)); + databaseConfig.setTable(Collections.singletonList(table)); + this.table = table; + databaseConfig.setColumns(table, columns); + + // 如果出现insert失败,是否跳过 + boolean ignoreInsertError = configuration.getBool("ignoreInsertError", false); + databaseConfig.setIgnoreInsertError(ignoreInsertError); + + // If the value of column is empty, set null + boolean emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false); + databaseConfig.setEmptyAsNull(emptyAsNull); + + // 使用insert ignore into方式进行插入 + boolean ignoreInsert = configuration.getBool(Key.IGNORE_INSERT, false); + databaseConfig.setInsertIgnore(ignoreInsert); + + // commit时,写入ADB出现异常时重试的3次 + int retryTimes = configuration.getInt(Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES); + databaseConfig.setRetryTimes(retryTimes); + + // 重试间隔的时间为1s,单位是ms + int retryIntervalTime = configuration.getInt(Key.RETRY_INTERVAL_TIME, 1000); + databaseConfig.setRetryIntervalTime(retryIntervalTime); + + // 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置 + int commitSize = configuration.getInt("commitSize", 32768); + databaseConfig.setCommitSize(commitSize); + + // sdk默认为true + boolean partitionBatch = configuration.getBool("partitionBatch", true); + databaseConfig.setPartitionBatch(partitionBatch); + + // 设置写入adb时的并发线程数,默认4,针对配置的所有表 + int parallelNumber = configuration.getInt("parallelNumber", 4); + databaseConfig.setParallelNumber(parallelNumber); + + // 设置client中使用的logger对象,此处使用slf4j.Logger + databaseConfig.setLogger(AdsClientProxy.LOG); + + // 设置在拼接insert sql时是否需要带上字段名,默认为true + boolean insertWithColumnName = configuration.getBool("insertWithColumnName", true); + databaseConfig.setInsertWithColumnName(insertWithColumnName); + + // sdk 默认值为true + boolean shareDataSource = configuration.getBool("shareDataSource", true); + databaseConfig.setShareDataSource(shareDataSource); + + String password = databaseConfig.getPassword(); + databaseConfig.setPassword(password.replaceAll(".", "*")); + // 避免敏感信息直接打印 + LOG.info("Adb database config is : {}", JSON.toJSONString(databaseConfig)); + databaseConfig.setPassword(password); + + // Initialize AdbClient,初始化实例之后,databaseConfig的配置信息不能再修改 + this.adbClient = new AdbClient(databaseConfig); + } + + @Override + public void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection, int columnNumber) { + try { + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + + Row row = new Row(); + List values = new ArrayList(); + this.prepareColumnTypeValue(record, values); + row.setColumnValues(values); + + try { + this.adbClient.addRow(this.table, row); + } catch (AdbClientException e) { + if (101 == e.getCode()) { + for (String each : e.getErrData()) { + Record dirtyData = new DefaultRecord(); + dirtyData.addColumn(new StringColumn(each)); + this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage()); + } + } else { + throw e; + } + } + } + + try { + this.adbClient.commit(); + } catch (AdbClientException e) { + if (101 == e.getCode()) { + for (String each : e.getErrData()) { + Record dirtyData = new DefaultRecord(); + dirtyData.addColumn(new StringColumn(each)); + this.taskPluginCollector.collectDirtyRecord(dirtyData, e.getMessage()); + } + } else { + throw e; + } + } + + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); + } finally { + DBUtil.closeDBResources(null, null, connection); + } + } + + private void prepareColumnTypeValue(Record record, List values) { + for (int i = 0; i < this.useRawData.length; i++) { + Column column = record.getColumn(i); + if (this.useRawData[i]) { + values.add(column.getRawData()); + } else { + values.add(column.asString()); + } + } + } + + @Override + public void closeResource() { + try { + LOG.info("stop the adbClient"); + this.adbClient.stop(); + } catch (Exception e) { + LOG.warn("stop adbClient meet a exception, ignore it: {}", e.getMessage(), e); + } + } +} diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java index 7211fb97..49abda40 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsInsertProxy.java @@ -34,7 +34,7 @@ import java.util.zip.CRC32; import java.util.zip.Checksum; -public class AdsInsertProxy { +public class AdsInsertProxy implements AdsProxy { private static final Logger LOG = LoggerFactory .getLogger(AdsInsertProxy.class); @@ -46,9 +46,9 @@ public class AdsInsertProxy { private TaskPluginCollector taskPluginCollector; private Configuration configuration; private Boolean emptyAsNull; - + private String writeMode; - + private String insertSqlPrefix; private String deleteSqlPrefix; private int opColumnIndex; @@ -58,10 +58,10 @@ public class AdsInsertProxy { private Map> userConfigColumnsMetaData; // columnName: index @ ads column private Map primaryKeyNameIndexMap; - + private int retryTimeUpperLimit; private Connection currentConnection; - + private String partitionColumn; private int partitionColumnIndex = -1; private int partitionCount; @@ -80,14 +80,14 @@ public class AdsInsertProxy { Key.RETRY_CONNECTION_TIME, Constant.DEFAULT_RETRY_TIMES); this.partitionCount = tableInfo.getPartitionCount(); this.partitionColumn = tableInfo.getPartitionColumn(); - + //目前ads新建的表如果未插入数据不能通过select colums from table where 1=2,获取列信息,需要读取ads数据字典 //not this: this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ",")); //no retry here(fetch meta data) 注意实时表列换序的可能 this.adsTableColumnsMetaData = AdsInsertUtil.getColumnMetaData(tableInfo, this.columns); this.userConfigColumnsMetaData = new HashMap>(); - - List primaryKeyColumnName = tableInfo.getPrimaryKeyColumns(); + + List primaryKeyColumnName = tableInfo.getPrimaryKeyColumns(); List adsColumnsNames = tableInfo.getColumnsNames(); this.primaryKeyNameIndexMap = new HashMap(); //warn: 要使用用户配置的column顺序, 不要使用从ads元数据获取的column顺序, 原来复用load列顺序其实有问题的 @@ -108,7 +108,7 @@ public class AdsInsertProxy { this.userConfigColumnsMetaData.put(oriEachColumn, this.adsTableColumnsMetaData.get(eachAdsColumn)); } } - + // 根据第几个column分区列排序,ads实时表只有一级分区、最多256个分区 if (eachColumn.equalsIgnoreCase(this.partitionColumn)) { this.partitionColumnIndex = i; @@ -117,8 +117,8 @@ public class AdsInsertProxy { } public void startWriteWithConnection(RecordReceiver recordReceiver, - Connection connection, - int columnNumber) { + Connection connection, + int columnNumber) { this.currentConnection = connection; int batchSize = this.configuration.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE); // 默认情况下bufferSize需要和batchSize一致 @@ -164,13 +164,13 @@ public class AdsInsertProxy { OperationType operationType = OperationType.asOperationType(optionColumnValue); if (operationType.isInsertTemplate()) { writeBuffer.add(record); - if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE ) { + if (this.lastDmlMode == null || this.lastDmlMode == Constant.INSERTMODE) { this.lastDmlMode = Constant.INSERTMODE; if (writeBuffer.size() >= bufferSize) { this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize); writeBuffer.clear(); } - } else { + } else { this.lastDmlMode = Constant.INSERTMODE; // 模式变换触发一次提交ads delete, 并进入insert模式 this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize); @@ -178,7 +178,7 @@ public class AdsInsertProxy { } } else if (operationType.isDeleteTemplate()) { deleteBuffer.add(record); - if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE ) { + if (this.lastDmlMode == null || this.lastDmlMode == Constant.DELETEMODE) { this.lastDmlMode = Constant.DELETEMODE; if (deleteBuffer.size() >= bufferSize) { this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize); @@ -196,14 +196,14 @@ public class AdsInsertProxy { } } } - + if (!writeBuffer.isEmpty()) { //doOneRecord(writeBuffer, Constant.INSERTMODE); this.doBatchRecordWithPartitionSort(writeBuffer, Constant.INSERTMODE, bufferSize, batchSize); writeBuffer.clear(); } // 2个缓冲最多一个不为空同时 - if (null!= deleteBuffer && !deleteBuffer.isEmpty()) { + if (null != deleteBuffer && !deleteBuffer.isEmpty()) { //doOneRecord(deleteBuffer, Constant.DELETEMODE); this.doBatchRecordWithPartitionSort(deleteBuffer, Constant.DELETEMODE, bufferSize, batchSize); deleteBuffer.clear(); @@ -216,14 +216,14 @@ public class AdsInsertProxy { DBUtil.closeDBResources(null, null, connection); } } - + /** * @param bufferSize datax缓冲记录条数 - * @param batchSize datax向ads系统一次发送数据条数 - * @param buffer datax缓冲区 - * @param mode 实时表模式insert 或者 stream - * */ - private void doBatchRecordWithPartitionSort(List buffer, String mode, int bufferSize, int batchSize) throws SQLException{ + * @param batchSize datax向ads系统一次发送数据条数 + * @param buffer datax缓冲区 + * @param mode 实时表模式insert 或者 stream + */ + private void doBatchRecordWithPartitionSort(List buffer, String mode, int bufferSize, int batchSize) throws SQLException { //warn: 排序会影响数据插入顺序, 如果源头没有数据约束, 排序可能造成数据不一致, 快速排序是一种不稳定的排序算法 //warn: 不明确配置bufferSize或者小于batchSize的情况下,不要进行排序;如果缓冲区实际内容条数少于batchSize也不排序了,最后一次的余量 int recordBufferedNumber = buffer.size(); @@ -261,7 +261,7 @@ public class AdsInsertProxy { return true; } }, this.retryTimeUpperLimit, 2000L, true, retryExceptionClasss); - }catch (SQLException e) { + } catch (SQLException e) { LOG.warn(String.format("after retry %s times, doBatchRecord meet a exception: ", this.retryTimeUpperLimit), e); LOG.info("try to re execute for each record..."); doOneRecord(buffer, mode); @@ -274,7 +274,7 @@ public class AdsInsertProxy { DBUtilErrorCode.WRITE_DATA_ERROR, e); } } - + //warn: ADS 无法支持事物roll back都是不管用 @SuppressWarnings("resource") private void doBatchRecordDml(List buffer, String mode) throws Exception { @@ -313,7 +313,7 @@ public class AdsInsertProxy { } else { try { Throwable causeThrowable = eachException.getCause(); - eachException = causeThrowable == null ? null : (Exception)causeThrowable; + eachException = causeThrowable == null ? null : (Exception) causeThrowable; } catch (Exception castException) { LOG.warn("doBatchRecordDml meet a no! retry exception: " + e.getMessage()); throw e; @@ -330,7 +330,7 @@ public class AdsInsertProxy { DBUtil.closeDBResources(statement, null); } } - + private void doOneRecord(List buffer, final String mode) { List> retryExceptionClasss = new ArrayList>(); retryExceptionClasss.add(com.mysql.jdbc.exceptions.jdbc4.CommunicationsException.class); @@ -350,7 +350,7 @@ public class AdsInsertProxy { } } } - + @SuppressWarnings("resource") private void doOneRecordDml(Record record, String mode) throws Exception { Statement statement = null; @@ -380,7 +380,7 @@ public class AdsInsertProxy { } else { try { Throwable causeThrowable = eachException.getCause(); - eachException = causeThrowable == null ? null : (Exception)causeThrowable; + eachException = causeThrowable == null ? null : (Exception) causeThrowable; } catch (Exception castException) { LOG.warn("doOneDml meet a no! retry exception: " + e.getMessage()); throw e; @@ -397,7 +397,7 @@ public class AdsInsertProxy { DBUtil.closeDBResources(statement, null); } } - + private boolean isRetryable(Throwable e) { Class meetExceptionClass = e.getClass(); if (meetExceptionClass == com.mysql.jdbc.exceptions.jdbc4.CommunicationsException.class) { @@ -408,7 +408,7 @@ public class AdsInsertProxy { } return false; } - + private String generateDmlSql(Connection connection, Record record, String mode) throws SQLException { String sql = null; StringBuilder sqlSb = new StringBuilder(); @@ -417,7 +417,7 @@ public class AdsInsertProxy { sqlSb.append("("); int columnsSize = this.columns.size(); for (int i = 0; i < columnsSize; i++) { - if((i + 1) != columnsSize) { + if ((i + 1) != columnsSize) { sqlSb.append("?,"); } else { sqlSb.append("?"); @@ -431,7 +431,7 @@ public class AdsInsertProxy { if (Constant.STREAMMODE.equalsIgnoreCase(this.writeMode)) { if (preparedParamsIndex >= this.opColumnIndex) { preparedParamsIndex = i + 1; - } + } } String columnName = this.columns.get(i); int columnSqltype = this.userConfigColumnsMetaData.get(columnName).getLeft(); @@ -446,7 +446,7 @@ public class AdsInsertProxy { int entrySetSize = primaryEntrySet.size(); int i = 0; for (Entry eachEntry : primaryEntrySet) { - if((i + 1) != entrySetSize) { + if ((i + 1) != entrySetSize) { sqlSb.append(String.format(" (%s = ?) and ", eachEntry.getKey())); } else { sqlSb.append(String.format(" (%s = ?) ", eachEntry.getKey())); @@ -463,7 +463,7 @@ public class AdsInsertProxy { int columnSqlType = this.userConfigColumnsMetaData.get(columnName).getLeft(); int primaryKeyInUserConfigIndex = this.primaryKeyNameIndexMap.get(columnName); if (primaryKeyInUserConfigIndex >= this.opColumnIndex) { - primaryKeyInUserConfigIndex ++; + primaryKeyInUserConfigIndex++; } prepareColumnTypeValue(statement, columnSqlType, record.getColumn(primaryKeyInUserConfigIndex), i, columnName); i++; @@ -473,8 +473,8 @@ public class AdsInsertProxy { } return sql; } - - private void appendDmlSqlValues(Connection connection, Record record, StringBuilder sqlSb, String mode) throws SQLException { + + private void appendDmlSqlValues(Connection connection, Record record, StringBuilder sqlSb, String mode) throws SQLException { String sqlResult = this.generateDmlSql(connection, record, mode); if (mode.equalsIgnoreCase(Constant.INSERTMODE)) { sqlSb.append(","); @@ -508,21 +508,21 @@ public class AdsInsertProxy { case Types.DECIMAL: case Types.REAL: String numValue = column.asString(); - if(emptyAsNull && "".equals(numValue) || numValue == null){ + if (emptyAsNull && "".equals(numValue) || numValue == null) { //statement.setObject(preparedPatamIndex + 1, null); statement.setNull(preparedPatamIndex + 1, Types.BIGINT); - } else{ + } else { statement.setLong(preparedPatamIndex + 1, column.asLong()); } break; - + case Types.FLOAT: case Types.DOUBLE: String floatValue = column.asString(); - if(emptyAsNull && "".equals(floatValue) || floatValue == null){ + if (emptyAsNull && "".equals(floatValue) || floatValue == null) { //statement.setObject(preparedPatamIndex + 1, null); statement.setNull(preparedPatamIndex + 1, Types.DOUBLE); - } else{ + } else { statement.setDouble(preparedPatamIndex + 1, column.asDouble()); } break; @@ -535,13 +535,13 @@ public class AdsInsertProxy { } else { statement.setLong(preparedPatamIndex + 1, longValue); } - + break; case Types.DATE: java.sql.Date sqlDate = null; try { - if("".equals(column.getRawData())) { + if ("".equals(column.getRawData())) { utilDate = null; } else { utilDate = column.asDate(); @@ -550,17 +550,17 @@ public class AdsInsertProxy { throw new SQLException(String.format( "Date 类型转换错误:[%s]", column)); } - + if (null != utilDate) { sqlDate = new java.sql.Date(utilDate.getTime()); - } + } statement.setDate(preparedPatamIndex + 1, sqlDate); break; case Types.TIME: java.sql.Time sqlTime = null; try { - if("".equals(column.getRawData())) { + if ("".equals(column.getRawData())) { utilDate = null; } else { utilDate = column.asDate(); @@ -579,7 +579,7 @@ public class AdsInsertProxy { case Types.TIMESTAMP: java.sql.Timestamp sqlTimestamp = null; try { - if("".equals(column.getRawData())) { + if ("".equals(column.getRawData())) { utilDate = null; } else { utilDate = column.asDate(); @@ -597,14 +597,14 @@ public class AdsInsertProxy { break; case Types.BOOLEAN: - //case Types.BIT: ads 没有bit + //case Types.BIT: ads 没有bit Boolean booleanValue = column.asBoolean(); if (null == booleanValue) { statement.setNull(preparedPatamIndex + 1, Types.BOOLEAN); } else { statement.setBoolean(preparedPatamIndex + 1, booleanValue); } - + break; default: Pair columnMetaPair = this.userConfigColumnsMetaData.get(columnName); @@ -616,7 +616,7 @@ public class AdsInsertProxy { columnName, columnMetaPair.getRight(), columnMetaPair.getLeft())); } } - + private static int getHashPartition(String value, int totalHashPartitionNum) { long crc32 = (value == null ? getCRC32("-1") : getCRC32(value)); return (int) (crc32 % totalHashPartitionNum); @@ -628,4 +628,8 @@ public class AdsInsertProxy { checksum.update(bytes, 0, bytes.length); return checksum.getValue(); } + + @Override + public void closeResource() { + } } diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsProxy.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsProxy.java new file mode 100644 index 00000000..93a27c23 --- /dev/null +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsProxy.java @@ -0,0 +1,12 @@ +package com.alibaba.datax.plugin.writer.adswriter.insert; + +import com.alibaba.datax.common.plugin.RecordReceiver; + +import java.sql.Connection; + +public interface AdsProxy { + public abstract void startWriteWithConnection(RecordReceiver recordReceiver, Connection connection, + int columnNumber); + + public void closeResource(); +} diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java index 924f6fcb..bebcd186 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/AdsHelper.java @@ -121,7 +121,7 @@ public class AdsHelper { connection = DriverManager.getConnection(url, connectionProps); statement = connection.createStatement(); // ads 表名、schema名不区分大小写, 提高用户易用性, 注意列顺序性 - String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase()); + String columnMetaSql = String.format("select ordinal_position,column_name,data_type,type_name,column_comment from information_schema.columns where table_schema = `'%s'` and table_name = `'%s'` order by ordinal_position", schema.toLowerCase(), table.toLowerCase()); LOG.info(String.format("检查列信息sql语句:%s", columnMetaSql)); rs = statement.executeQuery(columnMetaSql); @@ -145,7 +145,7 @@ public class AdsHelper { tableInfo.setTableName(table); DBUtil.closeDBResources(rs, statement, null); - String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where lower(table_schema) = `'%s'` and lower(table_name) = `'%s'`", schema.toLowerCase(), table.toLowerCase()); + String tableMetaSql = String.format("select update_type, partition_type, partition_column, partition_count, primary_key_columns from information_schema.tables where table_schema = `'%s'` and table_name = `'%s'`", schema.toLowerCase(), table.toLowerCase()); LOG.info(String.format("检查表信息sql语句:%s", tableMetaSql)); statement = connection.createStatement(); rs = statement.executeQuery(tableMetaSql); diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Constant.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Constant.java index f0ab71ec..8f37bdb8 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Constant.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Constant.java @@ -16,7 +16,7 @@ public class Constant { public static final long DEFAULT_SOCKET_TIMEOUT = 3600000L; - public static final int DEFAULT_RETRY_TIMES = 2; + public static final int DEFAULT_RETRY_TIMES = 3; public static final String INSERT_TEMPLATE = "insert into %s ( %s ) values "; diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Key.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Key.java index 3d31c818..c210515c 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Key.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/util/Key.java @@ -29,7 +29,9 @@ public final class Key { public final static String BATCH_SIZE = "batchSize"; public final static String BUFFER_SIZE = "bufferSize"; - + + public final static String IGNORE_INSERT = "ignoreInsert"; + public final static String PRE_SQL = "preSql"; public final static String POST_SQL = "postSql"; @@ -37,7 +39,9 @@ public final class Key { public final static String SOCKET_TIMEOUT = "socketTimeout"; public final static String RETRY_CONNECTION_TIME = "retryTimes"; - + + public final static String RETRY_INTERVAL_TIME = "retryIntervalTime"; + public final static String JDBC_URL_SUFFIX = "urlSuffix"; /** From 8b538d0a8dc51675ffe253b3a3dd84f908de8661 Mon Sep 17 00:00:00 2001 From: "chase.wc" Date: Mon, 17 Dec 2018 11:04:17 +0800 Subject: [PATCH 2/3] Ensure table name to lower --- .../plugin/writer/adswriter/insert/AdsClientProxy.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java index bc6414d8..8fdc70d6 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/insert/AdsClientProxy.java @@ -90,14 +90,14 @@ public class AdsClientProxy implements AdsProxy { throw DataXException.asDataXException(AdsWriterErrorCode.INVALID_CONFIG_VALUE, "url should be in host:port format!"); } + this.table = table.toLowerCase(); databaseConfig.setHost(hostAndPort[0]); databaseConfig.setPort(Integer.parseInt(hostAndPort[1])); databaseConfig.setUser(configuration.getString(Key.USERNAME)); databaseConfig.setPassword(configuration.getString(Key.PASSWORD)); databaseConfig.setDatabase(configuration.getString(Key.SCHEMA)); - databaseConfig.setTable(Collections.singletonList(table)); - this.table = table; - databaseConfig.setColumns(table, columns); + databaseConfig.setTable(Collections.singletonList(this.table)); + databaseConfig.setColumns(this.table, columns); // 如果出现insert失败,是否跳过 boolean ignoreInsertError = configuration.getBool("ignoreInsertError", false); From 92328083f35a9bf247b09b5819d2a7e9dabe1465 Mon Sep 17 00:00:00 2001 From: Chase Date: Wed, 26 Dec 2018 10:18:48 +0800 Subject: [PATCH 3/3] update adswriter.md --- adswriter/doc/adswriter.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adswriter/doc/adswriter.md b/adswriter/doc/adswriter.md index f80229bb..4a0fd961 100644 --- a/adswriter/doc/adswriter.md +++ b/adswriter/doc/adswriter.md @@ -38,7 +38,7 @@ DataX 将数据导入ADS为当前导入任务分配的ADS项目表,随后DataX ![中转导入](http://aligitlab.oss-cn-hangzhou-zmf.aliyuncs.com/uploads/cdp/cdp/f805dea46b/_____2015-04-10___12.06.21.png) -1. CDP底层得到明文的 jdbc://host:port/dbname + username + password + table, 以此连接ADS, 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意,此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。 +1. DataX底层得到明文的 jdbc://host:port/dbname + username + password + table, 以此连接ADS, 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意,此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。 2. 检查通过后,通过ADS中目标表的元数据反向生成ODPS DDL,在ODPS中间project中,以ADSWriter的账户建立ODPS表(非分区表,生命周期设为1-2Day), 并调用ODPSWriter把数据源的数据写入该ODPS表中。