mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:59:12 +08:00
fix orcFileStartRead
This commit is contained in:
parent
d4d1ea6a15
commit
e2dcbcd532
@ -331,26 +331,30 @@ public class DFSUtil {
|
|||||||
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
|
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
|
||||||
//Each file as a split
|
//Each file as a split
|
||||||
//TODO multy threads
|
//TODO multy threads
|
||||||
InputSplit[] splits = in.getSplits(conf, 1);
|
// OrcInputFormat getSplits params numSplits not used, splits size = block numbers
|
||||||
|
InputSplit[] splits = in.getSplits(conf, -1);
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
{
|
||||||
|
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
|
||||||
|
Object key = reader.createKey();
|
||||||
|
Object value = reader.createValue();
|
||||||
|
// 获取列信息
|
||||||
|
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
|
||||||
|
|
||||||
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
|
List<Object> recordFields;
|
||||||
Object key = reader.createKey();
|
while (reader.next(key, value)) {
|
||||||
Object value = reader.createValue();
|
recordFields = new ArrayList<Object>();
|
||||||
// 获取列信息
|
|
||||||
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
|
|
||||||
|
|
||||||
List<Object> recordFields;
|
for (int i = 0; i <= columnIndexMax; i++) {
|
||||||
while (reader.next(key, value)) {
|
Object field = inspector.getStructFieldData(value, fields.get(i));
|
||||||
recordFields = new ArrayList<Object>();
|
recordFields.add(field);
|
||||||
|
}
|
||||||
for (int i = 0; i <= columnIndexMax; i++) {
|
transportOneRecord(column, recordFields, recordSender,
|
||||||
Object field = inspector.getStructFieldData(value, fields.get(i));
|
taskPluginCollector, isReadAllColumns, nullFormat);
|
||||||
recordFields.add(field);
|
}
|
||||||
|
reader.close();
|
||||||
}
|
}
|
||||||
transportOneRecord(column, recordFields, recordSender,
|
|
||||||
taskPluginCollector, isReadAllColumns, nullFormat);
|
|
||||||
}
|
}
|
||||||
reader.close();
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
|
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
|
||||||
, sourceOrcFilePath);
|
, sourceOrcFilePath);
|
||||||
|
Loading…
Reference in New Issue
Block a user