增加对PostgreSQL 数组和Json格式

This commit is contained in:
sunkang 2022-11-03 17:17:50 +08:00
parent 1e9247821b
commit f6fdd6eeee
4 changed files with 132 additions and 13 deletions

View File

@ -0,0 +1,85 @@
package com.alibaba.datax.common.element;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import static com.alibaba.datax.common.exception.DataXException.asDataXException;
/**
* Created by SunKang on 22-11-3.
*/
public class ArrayColumn extends Column {
public ArrayColumn(Object[] array) {
super(array, Type.ARRAY, 1);
}
public ArrayColumn() {
super(null, Type.ARRAY, 1);
}
@Override
public Long asLong() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Long.", this.asString()));
}
@Override
public Double asDouble() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Double.", this.asString()));
}
@Override
public String asString() {
return this.toString();
}
@Override
public Date asDate() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Date.", this.asString()));
}
@Override
public Date asDate(String dateFormat) {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Date.", this.asString()));
}
@Override
public byte[] asBytes() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Bytes.", this.asString()));
}
@Override
public Boolean asBoolean() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为Boolean.", this.asString()));
}
@Override
public BigDecimal asBigDecimal() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为BigDecimal.", this.asString()));
}
@Override
public BigInteger asBigInteger() {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
"数组类型 [\"%s\"] 不能转为BigInteger.", this.asString()));
}
}

View File

@ -72,6 +72,6 @@ public abstract class Column {
}
public enum Type {
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES, ARRAY
}
}

View File

@ -1,12 +1,6 @@
package com.alibaba.datax.plugin.rdbms.reader;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
@ -27,10 +21,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -315,7 +306,23 @@ public class CommonRdbmsReader {
case Types.BIT:
record.addColumn(new BoolColumn(rs.getBoolean(i)));
break;
case Types.ARRAY:
Array array = rs.getArray(i);
if (array != null) {
ArrayColumn arrayColumn = new ArrayColumn((Object[]) array.getArray());
record.addColumn(arrayColumn);
} else {
record.addColumn(new ArrayColumn(null));
}
break;
case Types.OTHER:
Object object = rs.getObject(i);
if (object == null) {
record.addColumn(new StringColumn(null));
} else {
record.addColumn(new StringColumn(object.toString()));
}
break;
case Types.NULL:
String stringData = null;
if(rs.getObject(i) != null) {

View File

@ -546,6 +546,33 @@ public class CommonRdbmsWriter {
preparedStatement.setString(columnIndex + 1, column.asString());
}
break;
case Types.OTHER:
preparedStatement.setString(columnIndex + 1, column.asString());
break;
case Types.ARRAY:
Object rawData = column.getRawData();
if (rawData == null) {
preparedStatement.setArray(columnIndex + 1,null);
break;
}
String wType = this.resultSetMetaData.getRight()
.get(columnIndex);
Array data;
switch (wType) {
case "_varchar":
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
break;
case "_int8":
data = preparedStatement.getConnection().createArrayOf("LONG", (Long[])rawData);
break;
case "_int4":
data = preparedStatement.getConnection().createArrayOf("INTEGER", (Integer[])rawData);
break;
default:
data = preparedStatement.getConnection().createArrayOf("VARCHAR", (String[])rawData);
}
preparedStatement.setArray(columnIndex + 1,data);
break;
default:
throw DataXException
.asDataXException(