From f6fdd6eeee8d554b51c259dccca56ad33edf996e Mon Sep 17 00:00:00 2001 From: sunkang Date: Thu, 3 Nov 2022 17:17:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9PostgreSQL=20?= =?UTF-8?q?=E6=95=B0=E7=BB=84=E5=92=8CJson=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/common/element/ArrayColumn.java | 85 +++++++++++++++++++ .../alibaba/datax/common/element/Column.java | 2 +- .../rdbms/reader/CommonRdbmsReader.java | 31 ++++--- .../rdbms/writer/CommonRdbmsWriter.java | 27 ++++++ 4 files changed, 132 insertions(+), 13 deletions(-) create mode 100644 common/src/main/java/com/alibaba/datax/common/element/ArrayColumn.java diff --git a/common/src/main/java/com/alibaba/datax/common/element/ArrayColumn.java b/common/src/main/java/com/alibaba/datax/common/element/ArrayColumn.java new file mode 100644 index 00000000..8b374208 --- /dev/null +++ b/common/src/main/java/com/alibaba/datax/common/element/ArrayColumn.java @@ -0,0 +1,85 @@ +package com.alibaba.datax.common.element; + +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Date; + +import static com.alibaba.datax.common.exception.DataXException.asDataXException; + +/** + * Created by SunKang on 22-11-3. + */ +public class ArrayColumn extends Column { + + public ArrayColumn(Object[] array) { + super(array, Type.ARRAY, 1); + } + + public ArrayColumn() { + super(null, Type.ARRAY, 1); + } + + @Override + public Long asLong() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Long.", this.asString())); + } + + @Override + public Double asDouble() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Double.", this.asString())); + } + + @Override + public String asString() { + return this.toString(); + } + + @Override + public Date asDate() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Date.", this.asString())); + } + + @Override + public Date asDate(String dateFormat) { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Date.", this.asString())); + } + + @Override + public byte[] asBytes() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Bytes.", this.asString())); + } + + @Override + public Boolean asBoolean() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为Boolean.", this.asString())); + } + + @Override + public BigDecimal asBigDecimal() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为BigDecimal.", this.asString())); + } + + @Override + public BigInteger asBigInteger() { + throw DataXException.asDataXException( + CommonErrorCode.CONVERT_NOT_SUPPORT, String.format( + "数组类型 [\"%s\"] 不能转为BigInteger.", this.asString())); + } +} diff --git a/common/src/main/java/com/alibaba/datax/common/element/Column.java b/common/src/main/java/com/alibaba/datax/common/element/Column.java index 2e093a7a..eefeb515 100755 --- a/common/src/main/java/com/alibaba/datax/common/element/Column.java +++ b/common/src/main/java/com/alibaba/datax/common/element/Column.java @@ -72,6 +72,6 @@ public abstract class Column { } public enum Type { - BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES + BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES, ARRAY } } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f3180402..7d18dbf1 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -1,12 +1,6 @@ package com.alibaba.datax.plugin.rdbms.reader; -import com.alibaba.datax.common.element.BoolColumn; -import com.alibaba.datax.common.element.BytesColumn; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.DoubleColumn; -import com.alibaba.datax.common.element.LongColumn; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; @@ -27,10 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.Types; +import java.sql.*; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -315,7 +306,23 @@ public class CommonRdbmsReader { case Types.BIT: record.addColumn(new BoolColumn(rs.getBoolean(i))); break; - + case Types.ARRAY: + Array array = rs.getArray(i); + if (array != null) { + ArrayColumn arrayColumn = new ArrayColumn((Object[]) array.getArray()); + record.addColumn(arrayColumn); + } else { + record.addColumn(new ArrayColumn(null)); + } + break; + case Types.OTHER: + Object object = rs.getObject(i); + if (object == null) { + record.addColumn(new StringColumn(null)); + } else { + record.addColumn(new StringColumn(object.toString())); + } + break; case Types.NULL: String stringData = null; if(rs.getObject(i) != null) { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index ab0b8485..81fc7f08 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -546,6 +546,33 @@ public class CommonRdbmsWriter { preparedStatement.setString(columnIndex + 1, column.asString()); } break; + case Types.OTHER: + preparedStatement.setString(columnIndex + 1, column.asString()); + break; + case Types.ARRAY: + Object rawData = column.getRawData(); + if (rawData == null) { + preparedStatement.setArray(columnIndex + 1,null); + break; + } + String wType = this.resultSetMetaData.getRight() + .get(columnIndex); + Array data; + switch (wType) { + case "_varchar": + data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData); + break; + case "_int8": + data = preparedStatement.getConnection().createArrayOf("LONG", (Long[])rawData); + break; + case "_int4": + data = preparedStatement.getConnection().createArrayOf("INTEGER", (Integer[])rawData); + break; + default: + data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData); + } + preparedStatement.setArray(columnIndex + 1,data); + break; default: throw DataXException .asDataXException(