From 1e9247821bc8209db756033be721811fc755d302 Mon Sep 17 00:00:00 2001 From: sunkang Date: Thu, 3 Nov 2022 16:39:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9PostgreSQL=20update?= =?UTF-8?q?=E5=86=99=E5=85=A5=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rdbms/writer/CommonRdbmsWriter.java | 25 +- .../util/OriginalConfPretreatmentUtil.java | 2 +- .../plugin/rdbms/writer/util/WriterUtil.java | 19 +- .../postgresqlwriter/PostgresqlWriter.java | 238 +++++++++++++++++- 4 files changed, 269 insertions(+), 15 deletions(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index bec3c683..ab0b8485 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -17,10 +17,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; +import java.sql.*; import java.util.ArrayList; import java.util.List; @@ -182,6 +179,7 @@ public class CommonRdbmsWriter { protected String jdbcUrl; protected String table; protected List columns; + protected List primaryColumns; protected List preSqls; protected List postSqls; protected int batchSize; @@ -265,6 +263,23 @@ public class CommonRdbmsWriter { // 用于写入数据的时候的类型根据目的表字段类型转换 this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ",")); + + + //PostgreSQL采用update时需要获取主键 + if (dataBaseType.equals(DataBaseType.PostgreSQL) && writeMode.trim().toLowerCase().startsWith("update") ) { + this.primaryColumns = new ArrayList<>(); + String[] schemaAndTable = this.table.split("\\."); + try(ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(null, schemaAndTable[0], schemaAndTable[1])) { + while(primaryKeys.next()) { + String pkColumnName = primaryKeys.getString("COLUMN_NAME"); + this.primaryColumns.add(pkColumnName); + } + } catch (Exception e) { + throw DataXException.asDataXException( + DBUtilErrorCode.WRITE_DATA_ERROR, e); + } + } + // 写数据库的SQL语句 calcWriteRecordSql(); @@ -561,7 +576,7 @@ public class CommonRdbmsWriter { forceUseUpdate = true; } - INSERT_OR_REPLACE_TEMPLATE = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode, dataBaseType, forceUseUpdate); + INSERT_OR_REPLACE_TEMPLATE = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode, dataBaseType, forceUseUpdate,this.primaryColumns); writeRecordSql = String.format(INSERT_OR_REPLACE_TEMPLATE, this.table); } } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java index 34d1b3af..a22ad420 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -160,7 +160,7 @@ public final class OriginalConfPretreatmentUtil { forceUseUpdate = true; } - String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate); + String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate,null); LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java index 5f5f0d51..7eeaed31 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/WriterUtil.java @@ -108,7 +108,7 @@ public final class WriterUtil { } } - public static String getWriteTemplate(List columnHolders, List valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) { + public static String getWriteTemplate(List columnHolders, List valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate, List primaryColumns) { boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert") || writeMode.trim().toLowerCase().startsWith("replace") || writeMode.trim().toLowerCase().startsWith("update"); @@ -130,6 +130,23 @@ public final class WriterUtil { .append(")") .append(onDuplicateKeyUpdateString(columnHolders)) .toString(); + } else if (dataBaseType == DataBaseType.PostgreSQL && primaryColumns != null && !primaryColumns.isEmpty() && writeMode.trim().toLowerCase().startsWith("update")) { + + List columnAndValueHolder = new ArrayList<>(); + for (int i = 0; i < columnHolders.size(); i++) { + String columnHolder = columnHolders.get(i); + String valueHolder = valueHolders.get(i); + columnAndValueHolder.add(columnHolder + "=" + valueHolder); + } + writeDataSqlTemplate = new StringBuilder() + .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")) + .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) + .append(")") + .append(" ON CONFLICT (") + .append(StringUtils.join(primaryColumns, ",")) + .append(" ) DO UPDATE SET ") + .append(StringUtils.join(columnAndValueHolder, ",")) + .toString(); } else { //这里是保护,如果其他错误的使用了update,需要更换为replace diff --git a/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java b/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java index 2d38db35..ceb643be 100755 --- a/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java +++ b/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.writer.postgresqlwriter; +import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; @@ -9,6 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; import com.alibaba.datax.plugin.rdbms.writer.Key; +import java.sql.Array; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.List; public class PostgresqlWriter extends Writer { @@ -21,14 +26,6 @@ public class PostgresqlWriter extends Writer { @Override public void init() { this.originalConfig = super.getPluginJobConf(); - - // warn:not like mysql, PostgreSQL only support insert mode, don't use - String writeMode = this.originalConfig.getString(Key.WRITE_MODE); - if (null != writeMode) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - String.format("写入模式(writeMode)配置有误. 因为PostgreSQL不支持配置参数项 writeMode: %s, PostgreSQL仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode)); - } - this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); this.commonRdbmsWriterMaster.init(this.originalConfig); } @@ -74,6 +71,231 @@ public class PostgresqlWriter extends Writer { } return "?::" + columnType; } + + @Override + public PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, + int columnSqltype, String typeName, Column column) throws SQLException { + java.util.Date utilDate; + + boolean forceUseUpdate = this.writeMode.trim().toLowerCase().startsWith("update"); + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, column + .asString()); + } + + break; + + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.NUMERIC: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setString(columnIndex + 1, null); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, null); + } + } else { + preparedStatement.setString(columnIndex + 1, strValue); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, strValue); + } + } + break; + + //tinyint is a little special in some database like mysql {boolean->tinyint(1)} + case Types.TINYINT: + Long longValue = column.asLong(); + if (null == longValue) { + preparedStatement.setString(columnIndex + 1, null); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, null); + } + + } else { + preparedStatement.setString(columnIndex + 1, longValue.toString()); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, longValue.toString()); + } + } + break; + + // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 + case Types.DATE: + if (typeName == null) { + typeName = this.resultSetMetaData.getRight().get(columnIndex); + } + + if (typeName.equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, null); + } + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + if (forceUseUpdate) { + preparedStatement.setInt(columnIndex + 1 + columnNumber, column.asBigInteger().intValue()); + } + } + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + if (forceUseUpdate) { + preparedStatement.setDate(columnIndex + 1 + columnNumber, sqlDate); + } + + } + break; + + case Types.TIME: + java.sql.Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIME 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + if (forceUseUpdate) { + preparedStatement.setTime(columnIndex + 1 + columnNumber,sqlTime); + } + + break; + + case Types.TIMESTAMP: + java.sql.Timestamp sqlTimestamp = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIMESTAMP 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new java.sql.Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + if (forceUseUpdate) { + preparedStatement.setTimestamp(columnIndex + 1 + columnNumber, sqlTimestamp); + } + break; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + if (forceUseUpdate) { + preparedStatement.setBytes(columnIndex + 1 + columnNumber, column + .asBytes()); + } + break; + case Types.BOOLEAN: + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + if (forceUseUpdate) { + preparedStatement.setBoolean(columnIndex + 1 + columnNumber, column.asBoolean()); + } + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); + if (forceUseUpdate) { + preparedStatement.setBoolean(columnIndex + 1 + columnNumber, column.asBoolean()); + } + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, column.asString()); + } + } + break; + case Types.OTHER: + preparedStatement.setString(columnIndex + 1, column.asString()); + if (forceUseUpdate) { + preparedStatement.setString(columnIndex + 1 + columnNumber, column.asString()); + } + break; + case Types.ARRAY: + Object rawData = column.getRawData(); + if (rawData == null) { + preparedStatement.setArray(columnIndex + 1,null); + if (forceUseUpdate) { + preparedStatement.setArray(columnIndex + 1 + columnNumber, null); + } + break; + } + String wType = this.resultSetMetaData.getRight() + .get(columnIndex); + Array data = null; + switch (wType) { + case "_varchar": + data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData); + break; + case "_int8": + data = preparedStatement.getConnection().createArrayOf("LONG", (Long[])rawData); + break; + case "_int4": + data = preparedStatement.getConnection().createArrayOf("INTEGER", (Integer[])rawData); + break; + default: + data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData); + } + preparedStatement.setArray(columnIndex + 1,data); + if (forceUseUpdate) { + preparedStatement.setArray(columnIndex + 1 + columnNumber, data); + } + break; + default: + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } + }; this.commonRdbmsWriterSlave.init(this.writerSliceConfig); }