This commit is contained in:
thinking24k 2025-04-10 16:20:58 +08:00 committed by GitHub
commit fa666eb76d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 126 additions and 119 deletions

View File

@ -10,116 +10,120 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
#### 3 功能说明 #### 3 功能说明
* 该示例从MongoDB读一份数据到ODPS。 * 该示例从MongoDB读一份数据到ODPS。
{ {
"job": { "job": {
"setting": { "setting": {
"speed": { "speed": {
"channel": 2 "channel": 2
} }
}, },
"content": [ "content": [
{ {
"reader": { "reader": {
"name": "mongodbreader", "name": "mongodbreader",
"parameter": { "parameter": {
"address": ["127.0.0.1:27017"], "address": ["127.0.0.1:27017"],
"userName": "", "userName": "",
"userPassword": "", "userPassword": "",
"dbName": "tag_per_data", "dbName": "tag_per_data",
"collectionName": "tag_data12", "collectionName": "tag_data12",
"column": [ "column": [
{ {
"name": "unique_id", "name": "unique_id",
"type": "string" "type": "string"
}, },
{ {
"name": "sid", "name": "sid",
"type": "string" "type": "string"
}, },
{ {
"name": "user_id", "name": "user_id",
"type": "string" "type": "string"
}, },
{ {
"name": "auction_id", "name": "auction_id",
"type": "string" "type": "string"
}, },
{ {
"name": "content_type", "name": "content_type",
"type": "string" "type": "string"
}, },
{ {
"name": "pool_type", "name": "pool_type",
"type": "string" "type": "string"
}, },
{ {
"name": "frontcat_id", "name": "frontcat_id",
"type": "Array", "type": "Array",
"spliter": "" "spliter": ""
}, },
{ {
"name": "categoryid", "name": "categoryid",
"type": "Array", "type": "Array",
"spliter": "" "spliter": ""
}, },
{ {
"name": "gmt_create", "name": "gmt_create",
"type": "string" "type": "string"
}, },
{ {
"name": "taglist", "name": "taglist",
"type": "Array", "type": "Array",
"spliter": " " "spliter": " "
}, },
{ {
"name": "property", "name": "property",
"type": "string" "type": "string"
}, },
{ {
"name": "scorea", "name": "scorea",
"type": "int" "type": "int"
}, },
{ {
"name": "scoreb", "name": "scoreb",
"type": "int" "type": "int"
}, },
{ {
"name": "scorec", "name": "scorec",
"type": "int" "type": "int"
} },
] {
} "name": "appid_list",
}, "type": "json"
"writer": { }
"name": "odpswriter", ]
"parameter": { }
"project": "tb_ai_recommendation", },
"table": "jianying_tag_datax_read_test01", "writer": {
"column": [ "name": "odpswriter",
"unique_id", "parameter": {
"sid", "project": "tb_ai_recommendation",
"user_id", "table": "jianying_tag_datax_read_test01",
"auction_id", "column": [
"content_type", "unique_id",
"pool_type", "sid",
"frontcat_id", "user_id",
"categoryid", "auction_id",
"gmt_create", "content_type",
"taglist", "pool_type",
"property", "frontcat_id",
"scorea", "categoryid",
"scoreb" "gmt_create",
], "taglist",
"accessId": "**************", "property",
"accessKey": "********************", "scorea",
"truncate": true, "scoreb"
"odpsServer": "xxx/api", ],
"tunnelServer": "xxx" "accessId": "**************",
} "accessKey": "********************",
} "truncate": true,
} "odpsServer": "xxx/api",
] "tunnelServer": "xxx"
} }
}
}
]
}
} }
#### 4 参数说明 #### 4 参数说明
@ -133,17 +137,18 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
* typeColumn的类型。【选填】 * typeColumn的类型。【选填】
* splitter因为MongoDB支持数组类型但是Datax框架本身不支持数组类型所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】 * splitter因为MongoDB支持数组类型但是Datax框架本身不支持数组类型所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】
* query: MongoDB的额外查询条件。【选填】 * query: MongoDB的额外查询条件。【选填】
* json因为MongoDB支持子文档和数组类型子文档但是Datax框架本身不支持所以mongoDB读出来的数据通过JSON序列化成字符串。
#### 5 类型转换 #### 5 类型转换
| DataX 内部类型| MongoDB 数据类型 | | DataX 内部类型 | MongoDB 数据类型 |
| -------- | ----- | |------------|---------------|
| Long | int, Long | | Long | int, Long |
| Double | double | | Double | double |
| String | string, array | | String | string, array |
| Date | date | | Date | date |
| Boolean | boolean | | Boolean | boolean |
| Bytes | bytes | | Bytes | bytes |
| Object | json |
#### 6 性能报告 #### 6 性能报告

View File

@ -173,6 +173,8 @@ public class MongoDBReader extends Reader {
String tempArrayStr = Joiner.on(splitter).join(array); String tempArrayStr = Joiner.on(splitter).join(array);
record.addColumn(new StringColumn(tempArrayStr)); record.addColumn(new StringColumn(tempArrayStr));
} }
}else if ("json".equalsIgnoreCase(column.getString("type"))) {
record.addColumn(new StringColumn(JSON.toJSONString(tempCol)));
} else { } else {
record.addColumn(new StringColumn(tempCol.toString())); record.addColumn(new StringColumn(tempCol.toString()));
} }