From 9e13ebb88258ba678912b4ed9e4e5d97ff4e7d6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=B3=96=E8=8A=8B?= Date: Thu, 9 Nov 2023 15:22:21 +0800 Subject: [PATCH] feat: MongoDBWriter supports ODPS array, array, array, array, array --- .../writer/mongodbwriter/MongoDBWriter.java | 141 +++++++++++++----- 1 file changed, 102 insertions(+), 39 deletions(-) diff --git a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java index 76f35a40..7605699d 100644 --- a/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java +++ b/mongodbwriter/src/main/java/com/alibaba/datax/plugin/writer/mongodbwriter/MongoDBWriter.java @@ -9,6 +9,7 @@ import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.writer.mongodbwriter.util.MongoUtil; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONException; import com.alibaba.fastjson2.JSONObject; import com.google.common.base.Strings; import com.mongodb.*; @@ -154,8 +155,9 @@ public class MongoDBWriter extends Writer{ BasicDBObject data = new BasicDBObject(); for(int i = 0; i < record.getColumnNumber(); i++) { - String type = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); + String name = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); + //空记录处理 if (Strings.isNullOrEmpty(record.getColumn(i).asString())) { if (KeyConstant.isArrayType(type.toLowerCase())) { @@ -182,49 +184,35 @@ public class MongoDBWriter extends Writer{ new ObjectId(record.getColumn(i).asString())); } else if (KeyConstant.isArrayType(type.toLowerCase())) { String splitter = columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_SPLITTER); - if (Strings.isNullOrEmpty(splitter)) { - throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE, - MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription()); - } String itemType = columnMeta.getJSONObject(i).getString(KeyConstant.ITEM_TYPE); - if (itemType != null && !itemType.isEmpty()) { - //如果数组指定类型不为空,将其转换为指定类型 + + //处理非数组,如: "g,h,i" + if (splitter != null){ String[] item = record.getColumn(i).asString().split(splitter); - if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) { - ArrayList list = new ArrayList(); - for (String s : item) { - list.add(Double.parseDouble(s)); - } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Double[0])); - } else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) { - ArrayList list = new ArrayList(); - for (String s : item) { - list.add(Integer.parseInt(s)); - } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Integer[0])); - } else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) { - ArrayList list = new ArrayList(); - for (String s : item) { - list.add(Long.parseLong(s)); - } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Long[0])); - } else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) { - ArrayList list = new ArrayList(); - for (String s : item) { - list.add(Boolean.parseBoolean(s)); - } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Boolean[0])); - } else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) { - ArrayList list = new ArrayList(); - for (String s : item) { - list.add(Byte.parseByte(s)); - } - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), list.toArray(new Byte[0])); + if (itemType != null && !itemType.isEmpty()) { + //如果数组指定类型不为空,将其转换为指定类型 + data.put(name, convertToItemTypeList(itemType, item)); } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); + data.put(name, item); } } else { - data.put(columnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME), record.getColumn(i).asString().split(splitter)); + //处理数组,如: "[\"0.1\",\"0.2\",\"0.3\"]" + try { + Object obj = JSON.parse(record.getColumn(i).asString()); + if (obj instanceof JSONArray) { + //如果数组指定类型不为空,将其转换为指定类型; 否则转成默认的String类型 + if (itemType == null || itemType.isEmpty()) { + itemType = Column.Type.STRING.name(); + } + data.put(name, convertToItemTypeList(itemType, (JSONArray)obj)); + } else { + throw DataXException.asDataXException(MongoDBWriterErrorCode.ILLEGAL_VALUE, + MongoDBWriterErrorCode.ILLEGAL_VALUE.getDescription() + ". record's [" + i + "] column should be in JSONArray format when type is array and splitter is not configured"); + } + } catch (JSONException e) { + throw DataXException.asDataXException(MongoDBWriterErrorCode.JSONCAST_EXCEPTION, + MongoDBWriterErrorCode.JSONCAST_EXCEPTION.getDescription() + ". record's [" + i + "] column should be in JSON format when type is array and splitter is not configured"); + } } } else if(type.toLowerCase().equalsIgnoreCase("json")) { //如果是json类型,将其进行转换 @@ -313,6 +301,81 @@ public class MongoDBWriter extends Writer{ } } + private Object[] convertToItemTypeList(String itemType, JSONArray item){ + if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) { + Double[] list = new Double[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getDouble(i); + } + return list; + } else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) { + Integer[] list = new Integer[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getInteger(i); + } + return list; + } else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) { + Long[] list = new Long[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getLong(i); + } + return list; + } else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) { + Boolean[] list = new Boolean[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getBoolean(i); + } + return list; + } else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) { + Byte[] list = new Byte[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getByte(i); + } + return list; + } else { + String[] list = new String[item.size()]; + for (int i = 0; i < item.size(); i++) { + list[i] = item.getString(i); + } + return list; + } + } + + private Object[] convertToItemTypeList(String itemType, String[] item){ + if (itemType.equalsIgnoreCase(Column.Type.DOUBLE.name())) { + ArrayList list = new ArrayList(); + for (String s : item) { + list.add(Double.parseDouble(s)); + } + return list.toArray(new Double[0]); + } else if (itemType.equalsIgnoreCase(Column.Type.INT.name())) { + ArrayList list = new ArrayList(); + for (String s : item) { + list.add(Integer.parseInt(s)); + } + return list.toArray(new Integer[0]); + } else if (itemType.equalsIgnoreCase(Column.Type.LONG.name())) { + ArrayList list = new ArrayList(); + for (String s : item) { + list.add(Long.parseLong(s)); + } + return list.toArray(new Long[0]); + } else if (itemType.equalsIgnoreCase(Column.Type.BOOL.name())) { + ArrayList list = new ArrayList(); + for (String s : item) { + list.add(Boolean.parseBoolean(s)); + } + return list.toArray(new Boolean[0]); + } else if (itemType.equalsIgnoreCase(Column.Type.BYTES.name())) { + ArrayList list = new ArrayList(); + for (String s : item) { + list.add(Byte.parseByte(s)); + } + return list.toArray(new Byte[0]); + } else { + return item; + } + } @Override public void init() {