From 285a46cd655b1c55fd0c7704e2c46f6660a5964a Mon Sep 17 00:00:00 2001 From: dengyixiang Date: Tue, 4 Jul 2023 15:10:43 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=94=AF=E6=8C=81json=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/plugin/reader/mongodbreader/MongoDBReader.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java index 4d129a5a..14d4f179 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java @@ -173,6 +173,8 @@ public class MongoDBReader extends Reader { String tempArrayStr = Joiner.on(splitter).join(array); record.addColumn(new StringColumn(tempArrayStr)); } + }else if ("json".equalsIgnoreCase(column.getString("type"))) { + record.addColumn(new StringColumn(JSON.toJSONString(tempCol))); } else { record.addColumn(new StringColumn(tempCol.toString())); } From 129837e7203aebe462a8eedcf600064a9bd3b64a Mon Sep 17 00:00:00 2001 From: dengyixiang Date: Tue, 4 Jul 2023 15:50:58 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=94=AF=E6=8C=81json=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mongodbreader/doc/mongodbreader.md | 243 +++++++++++++++-------------- 1 file changed, 124 insertions(+), 119 deletions(-) diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index 297e598c..21805ab2 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -10,116 +10,120 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J #### 3 功能说明 * 该示例从MongoDB读一份数据到ODPS。 - { - "job": { - "setting": { - "speed": { - "channel": 2 - } - }, - "content": [ - { - "reader": { - "name": "mongodbreader", - "parameter": { - "address": ["127.0.0.1:27017"], - "userName": "", - "userPassword": "", - "dbName": "tag_per_data", - "collectionName": "tag_data12", - "column": [ - { - "name": "unique_id", - "type": "string" - }, - { - "name": "sid", - "type": "string" - }, - { - "name": "user_id", - "type": "string" - }, - { - "name": "auction_id", - "type": "string" - }, - { - "name": "content_type", - "type": "string" - }, - { - "name": "pool_type", - "type": "string" - }, - { - "name": "frontcat_id", - "type": "Array", - "spliter": "" - }, - { - "name": "categoryid", - "type": "Array", - "spliter": "" - }, - { - "name": "gmt_create", - "type": "string" - }, - { - "name": "taglist", - "type": "Array", - "spliter": " " - }, - { - "name": "property", - "type": "string" - }, - { - "name": "scorea", - "type": "int" - }, - { - "name": "scoreb", - "type": "int" - }, - { - "name": "scorec", - "type": "int" - } - ] - } - }, - "writer": { - "name": "odpswriter", - "parameter": { - "project": "tb_ai_recommendation", - "table": "jianying_tag_datax_read_test01", - "column": [ - "unique_id", - "sid", - "user_id", - "auction_id", - "content_type", - "pool_type", - "frontcat_id", - "categoryid", - "gmt_create", - "taglist", - "property", - "scorea", - "scoreb" - ], - "accessId": "**************", - "accessKey": "********************", - "truncate": true, - "odpsServer": "xxx/api", - "tunnelServer": "xxx" - } - } - } - ] - } + { + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": ["127.0.0.1:27017"], + "userName": "", + "userPassword": "", + "dbName": "tag_per_data", + "collectionName": "tag_data12", + "column": [ + { + "name": "unique_id", + "type": "string" + }, + { + "name": "sid", + "type": "string" + }, + { + "name": "user_id", + "type": "string" + }, + { + "name": "auction_id", + "type": "string" + }, + { + "name": "content_type", + "type": "string" + }, + { + "name": "pool_type", + "type": "string" + }, + { + "name": "frontcat_id", + "type": "Array", + "spliter": "" + }, + { + "name": "categoryid", + "type": "Array", + "spliter": "" + }, + { + "name": "gmt_create", + "type": "string" + }, + { + "name": "taglist", + "type": "Array", + "spliter": " " + }, + { + "name": "property", + "type": "string" + }, + { + "name": "scorea", + "type": "int" + }, + { + "name": "scoreb", + "type": "int" + }, + { + "name": "scorec", + "type": "int" + }, + { + "name": "appid_list", + "type": "json" + } + ] + } + }, + "writer": { + "name": "odpswriter", + "parameter": { + "project": "tb_ai_recommendation", + "table": "jianying_tag_datax_read_test01", + "column": [ + "unique_id", + "sid", + "user_id", + "auction_id", + "content_type", + "pool_type", + "frontcat_id", + "categoryid", + "gmt_create", + "taglist", + "property", + "scorea", + "scoreb" + ], + "accessId": "**************", + "accessKey": "********************", + "truncate": true, + "odpsServer": "xxx/api", + "tunnelServer": "xxx" + } + } + } + ] + } } #### 4 参数说明 @@ -133,17 +137,18 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J * type:Column的类型。【选填】 * splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】 * query: MongoDB的额外查询条件。【选填】 - +* json:因为MongoDB支持子文档和数组类型子文档,但是Datax框架本身不支持,所以mongoDB读出来的数据通过JSON序列化成字符串。 #### 5 类型转换 -| DataX 内部类型| MongoDB 数据类型 | -| -------- | ----- | -| Long | int, Long | -| Double | double | -| String | string, array | -| Date | date | -| Boolean | boolean | -| Bytes | bytes | +| DataX 内部类型 | MongoDB 数据类型 | +|------------|---------------| +| Long | int, Long | +| Double | double | +| String | string, array | +| Date | date | +| Boolean | boolean | +| Bytes | bytes | +| Object | json | #### 6 性能报告