增加skipEmptyDir参数

读取空目录时,默认为 true,即跳过不抛出异常。
This commit is contained in:
DeleiGuo 2024-05-07 14:25:38 +08:00
parent 29c3bb4c49
commit 3b9c790f2b
4 changed files with 32 additions and 2 deletions

View File

@ -278,6 +278,13 @@ HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转
* 默认值:无 <br />
* **skipEmptyDir**
* 描述:读取空目录时,跳过不抛出异常,默认跳过。<br />
* 必选:否 <br />
* 默认值true <br />
常见配置:

View File

@ -1,5 +1,6 @@
package com.alibaba.datax.plugin.reader.hdfsreader;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
@ -186,6 +187,12 @@ public class DFSUtil {
// 获取要读取的文件的根目录
Path listFiles = new Path(path);
if(!hdfs.exists(listFiles)){
String message = String.format("[%s] 路径目录不存在!",path);
LOG.warn(message);
System.exit(0);
}
// If the network disconnected, this method will retry 45 times
// each time the retry interval for 20 seconds
// 获取要读取的文件的根目录的所有二级子文件目录

View File

@ -43,6 +43,7 @@ public class HdfsReader extends Reader {
private List<String> path = null;
private boolean skipEmptyOrcFile = false;
private Integer orcFileEmptySize = null;
private Boolean skipEmptyDir=null;
@Override
public void init() {
@ -58,6 +59,7 @@ public class HdfsReader extends Reader {
public void validate(){
this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
skipEmptyDir = this.readerOriginConfig.getBool(Key.SKIP_EMPTY_DIR,true);
// path check
String pathInString = this.readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
@ -193,9 +195,18 @@ public class HdfsReader extends Reader {
// warn:每个slice拖且仅拖一个文件,
// int splitNumber = adviceNumber;
int splitNumber = this.sourceFiles.size();
LOG.info("split number:" + splitNumber);
if (0 == splitNumber) {
throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));
String message = String.format("未能找到待读取的文件,请确认您的配置项path: %s",
this.readerOriginConfig.getString(Key.PATH));
if(skipEmptyDir){
LOG.warn(message);
LOG.info("Task exited with return code 0");
System.exit(0);
}else {
throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,message);
}
}
List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);

View File

@ -63,4 +63,9 @@ public final class Key {
public static final String CDH_3_X_HIVE_VERSION = "3.1.3-cdh";
public static final String SUPPORT_ADD_MIDDLE_COLUMN = "supportAddMiddleColumn";
/**
* 是否跳过空目录异常
*/
public static final String SKIP_EMPTY_DIR="skipEmptyDir";
}