From 7bb69f6c85c13ad1fa5fb6b9b569aeb91c19c9b3 Mon Sep 17 00:00:00 2001 From: liaoguangwen Date: Sat, 26 Oct 2024 01:26:47 +0800 Subject: [PATCH] gen default parquet schema if config not set --- .../datax/plugin/writer/hdfswriter/HdfsHelper.java | 9 ++++++++- .../datax/plugin/writer/hdfswriter/HdfsWriter.java | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java index 09fd2723..3d12e442 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java @@ -630,7 +630,14 @@ public class HdfsHelper { MessageType messageType = null; ParquetFileProccessor proccessor = null; Path outputPath = new Path(fileName); - String schema = config.getString(Key.PARQUET_SCHEMA); + String schema = config.getString(Key.PARQUET_SCHEMA, null); + if (schema == null) { + List columns = config.getListConfiguration(Key.COLUMN); + if (columns == null || columns.isEmpty()) { + throw DataXException.asDataXException("parquetSchema or column can't be blank!"); + } + schema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns); + } try { messageType = MessageTypeParser.parseMessageType(schema); } catch (Exception e) { diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index e7707461..7535687c 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -228,6 +228,12 @@ public class HdfsWriter extends Writer { String endFullFileName = null; fileSuffix = UUID.randomUUID().toString().replace('-', '_'); + if (fileType.equalsIgnoreCase("PARQUET")) { + if (StringUtils.isNotBlank(this.compress)) { + fileSuffix += "." + this.compress.toLowerCase(); + } + fileSuffix += ".parquet"; + } fullFileName = String.format("%s%s%s__%s", defaultFS, storePath, filePrefix, fileSuffix); endFullFileName = String.format("%s%s%s__%s", defaultFS, endStorePath, filePrefix, fileSuffix);