feat: MongoDBWriter supports ODPS array<double>, array<int>, array<long>, array<bool>, array<bytes>

This commit is contained in:
糖芋 2023-11-09 15:22:21 +08:00
parent 86b7935bb4
commit 9e13ebb882

View File

@ -9,6 +9,7 @@ import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.writer.mongodbwriter.util.MongoUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Strings;
import com.mongodb.*;
@ -154,8 +155,9 @@ public class MongoDBWriter extends Writer{
BasicDBObject data = new BasicDBObject();
for(int i = 0; i < record.getColumnNumber(); i++) {
String type = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE);
String name = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME);
//空记录处理
if (Strings.isNullOrEmpty(record.getColumn(i).asString())) {
if (KeyConstant.isArrayType(type.toLowerCase())) {
@ -182,49 +184,35 @@ public class MongoDBWriter extends Writer{
new ObjectId(record.getColumn(i).asString()));
} else if (KeyConstant.isArrayType(type.toLowerCase())) {
String splitter = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_SPLITTER);
if (Strings.isNullOrEmpty(splitter)) {
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,
MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription());
}
String itemType = columnMeta.getJSONObject(i).getString(KeyConstant.ITEM_TYPE);
if (itemType != null && !itemType.isEmpty()) {
//如果数组指定类型不为空将其转换为指定类型
//处理非数组: "g,h,i"
if (splitter != null){
String[] item = record.getColumn(i).asString().split(splitter);
if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) {
ArrayList<Double> list = new ArrayList<Double>();
for (String s : item) {
list.add(Double.parseDouble(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) {
ArrayList<Integer> list = new ArrayList<Integer>();
for (String s : item) {
list.add(Integer.parseInt(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) {
ArrayList<Long> list = new ArrayList<Long>();
for (String s : item) {
list.add(Long.parseLong(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) {
ArrayList<Boolean> list = new ArrayList<Boolean>();
for (String s : item) {
list.add(Boolean.parseBoolean(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0]));
} else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) {
ArrayList<Byte> list = new ArrayList<Byte>();
for (String s : item) {
list.add(Byte.parseByte(s));
}
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0]));
if (itemType != null && !itemType.isEmpty()) {
//如果数组指定类型不为空将其转换为指定类型
data.put(name, convertToItemTypeList(itemType, item));
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
data.put(name, item);
}
} else {
data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter));
//处理数组: "[\"0.1\",\"0.2\",\"0.3\"]"
try {
Object obj = JSON.parse(record.getColumn(i).asString());
if (obj instanceof JSONArray) {
//如果数组指定类型不为空将其转换为指定类型; 否则转成默认的String类型
if (itemType == null || itemType.isEmpty()) {
itemType = Column.Type.STRING.name();
}
data.put(name, convertToItemTypeList(itemType, (JSONArray)obj));
} else {
throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE,
MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription() + ". record's [" + i + "] column should be in JSONArray format when type is array and splitter is not configured");
}
} catch (JSONException e) {
throw DataXException.asDataXException(MongoDBWriterErrorCode.JSONCAST_EXCEPTION,
MongoDBWriterErrorCode.JSONCAST_EXCEPTION.getDescription() + ". record's [" + i + "] column should be in JSON format when type is array and splitter is not configured");
}
}
} else if(type.toLowerCase().equalsIgnoreCase("json")) {
//如果是json类型,将其进行转换
@ -313,6 +301,81 @@ public class MongoDBWriter extends Writer{
}
}
private Object[] convertToItemTypeList(String itemType, JSONArray item){
if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) {
Double[] list = new Double[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getDouble(i);
}
return list;
} else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) {
Integer[] list = new Integer[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getInteger(i);
}
return list;
} else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) {
Long[] list = new Long[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getLong(i);
}
return list;
} else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) {
Boolean[] list = new Boolean[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getBoolean(i);
}
return list;
} else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) {
Byte[] list = new Byte[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getByte(i);
}
return list;
} else {
String[] list = new String[item.size()];
for (int i = 0; i < item.size(); i++) {
list[i] = item.getString(i);
}
return list;
}
}
private Object[] convertToItemTypeList(String itemType, String[] item){
if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) {
ArrayList<Double> list = new ArrayList<Double>();
for (String s : item) {
list.add(Double.parseDouble(s));
}
return list.toArray(new Double[0]);
} else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) {
ArrayList<Integer> list = new ArrayList<Integer>();
for (String s : item) {
list.add(Integer.parseInt(s));
}
return list.toArray(new Integer[0]);
} else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) {
ArrayList<Long> list = new ArrayList<Long>();
for (String s : item) {
list.add(Long.parseLong(s));
}
return list.toArray(new Long[0]);
} else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) {
ArrayList<Boolean> list = new ArrayList<Boolean>();
for (String s : item) {
list.add(Boolean.parseBoolean(s));
}
return list.toArray(new Boolean[0]);
} else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) {
ArrayList<Byte> list = new ArrayList<Byte>();
for (String s : item) {
list.add(Byte.parseByte(s));
}
return list.toArray(new Byte[0]);
} else {
return item;
}
}
@Override
public void init() {