diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java index 1d9e90a0..08c630fc 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java @@ -83,9 +83,10 @@ public class HdfsReader extends Reader { !specifiedFileType.equalsIgnoreCase(Constant.TEXT) && !specifiedFileType.equalsIgnoreCase(Constant.CSV) && !specifiedFileType.equalsIgnoreCase(Constant.SEQ) && - !specifiedFileType.equalsIgnoreCase(Constant.RC)){ - String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC五种格式的文件," + - "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE 或者 RC"; + !specifiedFileType.equalsIgnoreCase(Constant.RC) && + !specifiedFileType.equalsIgnoreCase(Constant.PARQUET)){ + String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC, PARQUET 六种格式的文件," + + "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE,RC 和 PARQUET"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message); } diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 4f8c505a..e7707461 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -53,8 +53,8 @@ public class HdfsWriter extends Writer { this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE); //fileType check this.fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE); - if( !fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT")){ - String message = "HdfsWriter插件目前只支持ORC和TEXT两种格式的文件,请将filetype选项的值配置为ORC或者TEXT"; + if (!fileType.equalsIgnoreCase("ORC") && !fileType.equalsIgnoreCase("TEXT") && !fileType.equalsIgnoreCase("PARQUET")) { + String message = "HdfsWriter插件目前只支持ORC、TEXT、PARQUET三种格式的文件,请将filetype选项的值配置为ORC、TEXT或PARQUET"; throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //path diff --git a/osswriter/doc/osswriter.md b/osswriter/doc/osswriter.md index 1a3d3e47..0c23e698 100644 --- a/osswriter/doc/osswriter.md +++ b/osswriter/doc/osswriter.md @@ -18,7 +18,7 @@ OSSWriter提供了向OSS写入类CSV格式的一个或者多个表文件。 OSSWriter实现了从DataX协议转为OSS中的TXT文件功能,OSS本身是无结构化数据存储,OSSWriter需要在如下几个方面增加: -1. 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。 +1. 支持写入 TXT的文件,且要求TXT中shema为一张二维表。 2. 支持类CSV格式文件,自定义分隔符。 @@ -28,6 +28,8 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能,OSS本身是无 7. 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持] +8. 支持写 PARQUET、ORC 文件 + 我们不能做到: 1. 单个文件不能支持并发写入。 @@ -37,7 +39,7 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能,OSS本身是无 ### 3.1 配置样例 - +写 txt文件样例 ```json { "job": { @@ -65,7 +67,90 @@ OSSWriter实现了从DataX协议转为OSS中的TXT文件功能,OSS本身是无 } } ``` +写 orc 文件样例 +```json +{ + "job": { + "setting": {}, + "content": [ + { + "reader": {}, + "writer": { + "name": "osswriter", + "parameter": { + "endpoint": "http://oss.aliyuncs.com", + "accessId": "", + "accessKey": "", + "bucket": "myBucket", + "fileName": "test", + "encoding": "UTF-8", + "column": [ + { + "name": "col1", + "type": "BIGINT" + }, + { + "name": "col2", + "type": "DOUBLE" + }, + { + "name": "col3", + "type": "STRING" + } + ], + "fileFormat": "orc", + "path": "/tests/case61", + "writeMode": "append" + } + } + } + ] + } +} +``` +写 parquet 文件样例 +```json +{ + "job": { + "setting": {}, + "content": [ + { + "reader": {}, + "writer": { + "name": "osswriter", + "parameter": { + "endpoint": "http://oss.aliyuncs.com", + "accessId": "", + "accessKey": "", + "bucket": "myBucket", + "fileName": "test", + "encoding": "UTF-8", + "column": [ + { + "name": "col1", + "type": "BIGINT" + }, + { + "name": "col2", + "type": "DOUBLE" + }, + { + "name": "col3", + "type": "STRING" + } + ], + "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n repeated group list {\n required binary element (UTF8);\n }\n}\nrequired group params_struct {\n required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n repeated group list {\n required group element {\n required int64 id;\n required binary name (UTF8);\n}\n }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n required int64 id;\n required binary name (UTF8);\n }\n}\n}\nrequired group params_struct_complex {\n required int64 id;\n required group detail {\n required int64 id;\n required binary name (UTF8);\n }\n }\n}", + "fileFormat": "parquet", + "path": "/tests/case61", + "writeMode": "append" + } + } + } + ] + } +} +``` ### 3.2 参数说明 * **endpoint**