Merge pull request #2232 from 7owen/master

gen default parquet schema if config not set
This commit is contained in:
jtchen-study 2025-03-31 19:10:19 +08:00 committed by GitHub
commit 129924c79e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 1 deletions

View File

@ -637,7 +637,14 @@ public class HdfsHelper {
MessageType messageType = null; MessageType messageType = null;
ParquetFileProccessor proccessor = null; ParquetFileProccessor proccessor = null;
Path outputPath = new Path(fileName); Path outputPath = new Path(fileName);
String schema = config.getString(Key.PARQUET_SCHEMA); String schema = config.getString(Key.PARQUET_SCHEMA, null);
if (schema == null) {
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException("parquetSchema or column can't be blank!");
}
schema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns);
}
try { try {
messageType = MessageTypeParser.parseMessageType(schema); messageType = MessageTypeParser.parseMessageType(schema);
} catch (Exception e) { } catch (Exception e) {

View File

@ -228,6 +228,12 @@ public class HdfsWriter extends Writer {
String endFullFileName = null; String endFullFileName = null;
fileSuffix = UUID.randomUUID().toString().replace('-', '_'); fileSuffix = UUID.randomUUID().toString().replace('-', '_');
if (fileType.equalsIgnoreCase("PARQUET")) {
if (StringUtils.isNotBlank(this.compress)) {
fileSuffix += "." + this.compress.toLowerCase();
}
fileSuffix += ".parquet";
}
fullFileName = String.format("%s%s%s__%s", defaultFS, storePath, filePrefix, fileSuffix); fullFileName = String.format("%s%s%s__%s", defaultFS, storePath, filePrefix, fileSuffix);
endFullFileName = String.format("%s%s%s__%s", defaultFS, endStorePath, filePrefix, fileSuffix); endFullFileName = String.format("%s%s%s__%s", defaultFS, endStorePath, filePrefix, fileSuffix);