update oss parquet writer

This commit is contained in:
jt-chen 2023-09-14 17:24:52 +08:00
parent fd80754feb
commit d187c4657e
3 changed files with 93 additions and 7 deletions

View File

@ -83,9 +83,10 @@ public class HdfsReader extends Reader {
!specifiedFileType.equalsIgnoreCase(Constant.TEXT) &&
!specifiedFileType.equalsIgnoreCase(Constant.CSV) &&
!specifiedFileType.equalsIgnoreCase(Constant.SEQ) &&
!specifiedFileType.equalsIgnoreCase(Constant.RC)){
String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC五种格式的文件," +
"请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE 或者 RC";
!specifiedFileType.equalsIgnoreCase(Constant.RC) &&
!specifiedFileType.equalsIgnoreCase(Constant.PARQUET)){
String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC, PARQUET 六种格式的文件," +
"请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE,RC 和 PARQUET";
throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message);
}

View File

@ -53,8 +53,8 @@ public class HdfsWriter extends Writer {
this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE);
//fileType check
this.fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE);
if( !fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT")){
String message = "HdfsWriter插件目前只支持ORC和TEXT两种格式的文件,请将filetype选项的值配置为ORC或者TEXT";
if (!fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT") && !fileType.equalsIgnoreCase("PARQUET")) {
String message = "HdfsWriter插件目前只支持ORC、TEXT、PARQUET三种格式的文件,请将filetype选项的值配置为ORC、TEXT或PARQUET";
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
}
//path

View File

@ -18,7 +18,7 @@ OSSWriter提供了向OSS写入类CSV格式的一个或者多个表文件。
OSSWriter实现了从DataX协议转为OSS中的TXT文件功能OSS本身是无结构化数据存储OSSWriter需要在如下几个方面增加:
1. 支持且仅支持写入 TXT的文件且要求TXT中shema为一张二维表。
1. 支持写入 TXT的文件且要求TXT中shema为一张二维表。
2. 支持类CSV格式文件自定义分隔符。
@ -28,6 +28,8 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能OSS本身是无
7. 文件支持滚动当文件大于某个size值或者行数值文件需要切换。 [暂不支持]
8. 支持写 PARQUET、ORC 文件
我们不能做到:
1. 单个文件不能支持并发写入。
@ -37,7 +39,7 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能OSS本身是无
### 3.1 配置样例
写 txt文件样例
```json
{
"job": {
@ -65,7 +67,90 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能OSS本身是无
}
}
```
写 orc 文件样例
```json
{
"job": {
"setting": {},
"content": [
{
"reader": {},
"writer": {
"name": "osswriter",
"parameter": {
"endpoint": "http://oss.aliyuncs.com",
"accessId": "",
"accessKey": "",
"bucket": "myBucket",
"fileName": "test",
"encoding": "UTF-8",
"column": [
{
"name": "col1",
"type": "BIGINT"
},
{
"name": "col2",
"type": "DOUBLE"
},
{
"name": "col3",
"type": "STRING"
}
],
"fileFormat": "orc",
"path": "/tests/case61",
"writeMode": "append"
}
}
}
]
}
}
```
写 parquet 文件样例
```json
{
"job": {
"setting": {},
"content": [
{
"reader": {},
"writer": {
"name": "osswriter",
"parameter": {
"endpoint": "http://oss.aliyuncs.com",
"accessId": "",
"accessKey": "",
"bucket": "myBucket",
"fileName": "test",
"encoding": "UTF-8",
"column": [
{
"name": "col1",
"type": "BIGINT"
},
{
"name": "col2",
"type": "DOUBLE"
},
{
"name": "col3",
"type": "STRING"
}
],
"parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n repeated group list {\n required binary element (UTF8);\n }\n}\nrequired group params_struct {\n required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n repeated group list {\n required group element {\n required int64 id;\n required binary name (UTF8);\n}\n }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n required int64 id;\n required binary name (UTF8);\n }\n}\n}\nrequired group params_struct_complex {\n required int64 id;\n required group detail {\n required int64 id;\n required binary name (UTF8);\n }\n }\n}",
"fileFormat": "parquet",
"path": "/tests/case61",
"writeMode": "append"
}
}
}
]
}
}
```
### 3.2 参数说明
* **endpoint**