This commit is contained in:
sunkang 2025-04-10 16:21:20 +08:00 committed by GitHub
commit 10b360c268
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 401 additions and 28 deletions

View File

@ -0,0 +1,85 @@
package com.alibaba.datax.common.element;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import static com.alibaba.datax.common.exception.DataXException.asDataXException;
/**
* Created by SunKang on 22-11-3.
*/
public class ArrayColumn extends Column {
public ArrayColumn(Object[] array) {
super(array, Type.ARRAY, 1);
}
public ArrayColumn() {
super(null, Type.ARRAY, 1);
}
@Override
public Long asLong() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Long.", this.asString()));
}
@Override
public Double asDouble() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Double.", this.asString()));
}
@Override
public String asString() {
return this.toString();
}
@Override
public Date asDate() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Date.", this.asString()));
}
@Override
public Date asDate(String dateFormat) {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Date.", this.asString()));
}
@Override
public byte[] asBytes() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Bytes.", this.asString()));
}
@Override
public Boolean asBoolean() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Boolean.", this.asString()));
}
@Override
public BigDecimal asBigDecimal() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为BigDecimal.", this.asString()));
}
@Override
public BigInteger asBigInteger() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为BigInteger.", this.asString()));
}
}

View File

@ -72,6 +72,6 @@ public abstract class Column {
}
public enum Type {
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES, ARRAY
}
}

View File

@ -1,12 +1,6 @@
package com.alibaba.datax.plugin.rdbms.reader;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
@ -27,10 +21,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -315,7 +306,23 @@ public class CommonRdbmsReader {
case Types.BIT:
record.addColumn(new BoolColumn(rs.getBoolean(i)));
break;
case Types.ARRAY:
Array array = rs.getArray(i);
if (array != null) {
ArrayColumn arrayColumn = new ArrayColumn((Object[]) array.getArray());
record.addColumn(arrayColumn);
} else {
record.addColumn(new ArrayColumn(null));
}
break;
case Types.OTHER:
Object object = rs.getObject(i);
if (object == null) {
record.addColumn(new StringColumn(null));
} else {
record.addColumn(new StringColumn(object.toString()));
}
break;
case Types.NULL:
String stringData = null;
if(rs.getObject(i) != null) {

View File

@ -18,10 +18,7 @@ import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@ -183,6 +180,7 @@ public class CommonRdbmsWriter {
protected String jdbcUrl;
protected String table;
protected List<String> columns;
protected List<String> primaryColumns;
protected List<String> preSqls;
protected List<String> postSqls;
protected int batchSize;
@ -269,6 +267,23 @@ public class CommonRdbmsWriter {
// 用于写入数据的时候的类型根据目的表字段类型转换
this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
this.table, StringUtils.join(this.columns, ","));
//PostgreSQL采用update时需要获取主键
if (dataBaseType.equals(DataBaseType.PostgreSQL) && writeMode.trim().toLowerCase().startsWith("update") ) {
this.primaryColumns = new ArrayList<>();
String[] schemaAndTable = this.table.split("\\.");
try(ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(null, schemaAndTable[0], schemaAndTable[1])) {
while(primaryKeys.next()) {
String pkColumnName = primaryKeys.getString("COLUMN_NAME");
this.primaryColumns.add(pkColumnName);
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
// 写数据库的SQL语句
calcWriteRecordSql();
@ -542,6 +557,33 @@ public class CommonRdbmsWriter {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
case Types.OTHER:
preparedStatement.setString(columnIndex + 1, column.asString());
break;
case Types.ARRAY:
Object rawData = column.getRawData();
if (rawData == null) {
preparedStatement.setArray(columnIndex + 1,null);
break;
}
String wType = this.resultSetMetaData.getRight()
.get(columnIndex);
Array data;
switch (wType) {
case "_varchar":
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
break;
case "_int8":
data = preparedStatement.getConnection().createArrayOf("LONG", (Long[])rawData);
break;
case "_int4":
data = preparedStatement.getConnection().createArrayOf("INTEGER", (Integer[])rawData);
break;
default:
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
}
preparedStatement.setArray(columnIndex + 1,data);
break;
default:
throw DataXException
.asDataXException(
@ -572,7 +614,7 @@ public class CommonRdbmsWriter {
forceUseUpdate = true;
}
INSERT_OR_REPLACE_TEMPLATE = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode, dataBaseType, forceUseUpdate);
INSERT_OR_REPLACE_TEMPLATE = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode, dataBaseType, forceUseUpdate,this.primaryColumns);
writeRecordSql = String.format(INSERT_OR_REPLACE_TEMPLATE, this.table);
}
}

View File

@ -167,7 +167,7 @@ public final class OriginalConfPretreatmentUtil {
forceUseUpdate = true;
}
String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate);
String writeDataSqlTemplate = WriterUtil.getWriteTemplate(columns, valueHolders, writeMode,dataBaseType, forceUseUpdate,null);
LOG.info("Write data [\n{}\n], which jdbcUrl like:[{}]", writeDataSqlTemplate, jdbcUrl);

View File

@ -108,7 +108,7 @@ public final class WriterUtil {
}
}
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate, List<String> primaryColumns) {
boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
|| writeMode.trim().toLowerCase().startsWith("replace")
|| writeMode.trim().toLowerCase().startsWith("update");
@ -130,6 +130,23 @@ public final class WriterUtil {
.append(")")
.append(onDuplicateKeyUpdateString(columnHolders))
.toString();
} else if (dataBaseType == DataBaseType.PostgreSQL && primaryColumns != null && !primaryColumns.isEmpty() && writeMode.trim().toLowerCase().startsWith("update")) {
List<String> columnAndValueHolder = new ArrayList<>();
for (int i = 0; i < columnHolders.size(); i++) {
String columnHolder = columnHolders.get(i);
String valueHolder = valueHolders.get(i);
columnAndValueHolder.add(columnHolder + "=" + valueHolder);
}
writeDataSqlTemplate = new StringBuilder()
.append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
.append(") VALUES(").append(StringUtils.join(valueHolders, ","))
.append(")")
.append(" ON CONFLICT (")
.append(StringUtils.join(primaryColumns, ","))
.append(" ) DO UPDATE SET ")
.append(StringUtils.join(columnAndValueHolder, ","))
.toString();
} else {
//这里是保护,如果其他错误的使用了update,需要更换为replace

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.writer.postgresqlwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
@ -9,6 +10,10 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
public class PostgresqlWriter extends Writer {
@ -21,14 +26,6 @@ public class PostgresqlWriter extends Writer {
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// warnnot like mysql, PostgreSQL only support insert mode, don't use
String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
if (null != writeMode) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("写入模式(writeMode)配置有误. 因为PostgreSQL不支持配置参数项 writeMode: %s, PostgreSQL仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode));
}
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
}
@ -74,6 +71,231 @@ public class PostgresqlWriter extends Writer {
}
return "?::" + columnType;
}
@Override
public PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqltype, String typeName, Column column) throws SQLException {
java.util.Date utilDate;
boolean forceUseUpdate = this.writeMode.trim().toLowerCase().startsWith("update");
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
preparedStatement.setString(columnIndex + 1, column
.asString());
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, column
.asString());
}
break;
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.NUMERIC:
case Types.DECIMAL:
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
String strValue = column.asString();
if (emptyAsNull && "".equals(strValue)) {
preparedStatement.setString(columnIndex + 1, null);
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, null);
}
} else {
preparedStatement.setString(columnIndex + 1, strValue);
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, strValue);
}
}
break;
//tinyint is a little special in some database like mysql {boolean->tinyint(1)}
case Types.TINYINT:
Long longValue = column.asLong();
if (null == longValue) {
preparedStatement.setString(columnIndex + 1, null);
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, null);
}
} else {
preparedStatement.setString(columnIndex + 1, longValue.toString());
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, longValue.toString());
}
}
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (typeName == null) {
typeName = this.resultSetMetaData.getRight().get(columnIndex);
}
if (typeName.equalsIgnoreCase("year")) {
if (column.asBigInteger() == null) {
preparedStatement.setString(columnIndex + 1, null);
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, null);
}
} else {
preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue());
if (forceUseUpdate) {
preparedStatement.setInt(columnIndex + 1 + columnNumber, column.asBigInteger().intValue());
}
}
} else {
java.sql.Date sqlDate = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"Date 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlDate = new java.sql.Date(utilDate.getTime());
}
preparedStatement.setDate(columnIndex + 1, sqlDate);
if (forceUseUpdate) {
preparedStatement.setDate(columnIndex + 1 + columnNumber, sqlDate);
}
}
break;
case Types.TIME:
java.sql.Time sqlTime = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIME 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTime = new java.sql.Time(utilDate.getTime());
}
preparedStatement.setTime(columnIndex + 1, sqlTime);
if (forceUseUpdate) {
preparedStatement.setTime(columnIndex + 1 + columnNumber,sqlTime);
}
break;
case Types.TIMESTAMP:
java.sql.Timestamp sqlTimestamp = null;
try {
utilDate = column.asDate();
} catch (DataXException e) {
throw new SQLException(String.format(
"TIMESTAMP 类型转换错误:[%s]", column));
}
if (null != utilDate) {
sqlTimestamp = new java.sql.Timestamp(
utilDate.getTime());
}
preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp);
if (forceUseUpdate) {
preparedStatement.setTimestamp(columnIndex + 1 + columnNumber, sqlTimestamp);
}
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
preparedStatement.setBytes(columnIndex + 1, column
.asBytes());
if (forceUseUpdate) {
preparedStatement.setBytes(columnIndex + 1 + columnNumber, column
.asBytes());
}
break;
case Types.BOOLEAN:
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
if (forceUseUpdate) {
preparedStatement.setBoolean(columnIndex + 1 + columnNumber, column.asBoolean());
}
break;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case Types.BIT:
if (this.dataBaseType == DataBaseType.MySql) {
preparedStatement.setBoolean(columnIndex + 1, column.asBoolean());
if (forceUseUpdate) {
preparedStatement.setBoolean(columnIndex + 1 + columnNumber, column.asBoolean());
}
} else {
preparedStatement.setString(columnIndex + 1, column.asString());
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, column.asString());
}
}
break;
case Types.OTHER:
preparedStatement.setString(columnIndex + 1, column.asString());
if (forceUseUpdate) {
preparedStatement.setString(columnIndex + 1 + columnNumber, column.asString());
}
break;
case Types.ARRAY:
Object rawData = column.getRawData();
if (rawData == null) {
preparedStatement.setArray(columnIndex + 1,null);
if (forceUseUpdate) {
preparedStatement.setArray(columnIndex + 1 + columnNumber, null);
}
break;
}
String wType = this.resultSetMetaData.getRight()
.get(columnIndex);
Array data = null;
switch (wType) {
case "_varchar":
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
break;
case "_int8":
data = preparedStatement.getConnection().createArrayOf("LONG", (Long[])rawData);
break;
case "_int4":
data = preparedStatement.getConnection().createArrayOf("INTEGER", (Integer[])rawData);
break;
default:
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
}
preparedStatement.setArray(columnIndex + 1,data);
if (forceUseUpdate) {
preparedStatement.setArray(columnIndex + 1 + columnNumber, data);
}
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",
this.resultSetMetaData.getLeft()
.get(columnIndex),
this.resultSetMetaData.getMiddle()
.get(columnIndex),
this.resultSetMetaData.getRight()
.get(columnIndex)));
}
return preparedStatement;
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}