Merge pull request #262 from wangchuande/hotfix/hdfs-reader-split

fix orcFileStartRead
This commit is contained in:
dingxiaobo 2023-03-23 13:46:57 +08:00 committed by GitHub
commit 430f08b16a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -331,26 +331,30 @@ public class DFSUtil {
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
// OrcInputFormat getSplits params numSplits not used, splits size = block numbers
InputSplit[] splits = in.getSplits(conf, -1);
for (InputSplit split : splits) {
{
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
} catch (Exception e) {
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceOrcFilePath);