增加column的type为json的支持,将object列转化为json字符串。

This commit is contained in:
linzm 2021-11-16 17:06:48 +08:00
parent 01e0723d65
commit 06e4b84bf0

View File

@ -128,6 +128,8 @@ public class MongoDBReader extends Reader {
while (columnItera.hasNext()) { while (columnItera.hasNext()) {
JSONObject column = (JSONObject)columnItera.next(); JSONObject column = (JSONObject)columnItera.next();
Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME)); Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME));
String columnName = column.getString(KeyConstant.COLUMN_NAME);
String columnType = column.getString(KeyConstant.COLUMN_TYPE);
if (tempCol == null) { if (tempCol == null) {
if (KeyConstant.isDocumentType(column.getString(KeyConstant.COLUMN_TYPE))) { if (KeyConstant.isDocumentType(column.getString(KeyConstant.COLUMN_TYPE))) {
String[] name = column.getString(KeyConstant.COLUMN_NAME).split("\\."); String[] name = column.getString(KeyConstant.COLUMN_NAME).split("\\.");
@ -151,6 +153,8 @@ public class MongoDBReader extends Reader {
if (tempCol == null) { if (tempCol == null) {
//continue; 这个不能直接continue会导致record到目的端错位 //continue; 这个不能直接continue会导致record到目的端错位
record.addColumn(new StringColumn(null)); record.addColumn(new StringColumn(null));
} else if (columnType.equalsIgnoreCase("json")) {
record.addColumn(new StringColumn(JSON.toJSONString(tempCol)));
} else if (tempCol instanceof Double) { } else if (tempCol instanceof Double) {
//TODO deal with Double.isNaN() //TODO deal with Double.isNaN()
record.addColumn(new DoubleColumn((Double) tempCol)); record.addColumn(new DoubleColumn((Double) tempCol));