mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 23:52:00 +08:00
Merge 3b9c790f2b
into 0824b45c5e
This commit is contained in:
commit
489af24794
@ -278,6 +278,13 @@ HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转
|
|||||||
|
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
|
|
||||||
|
* **skipEmptyDir**
|
||||||
|
|
||||||
|
* 描述:读取空目录时,跳过不抛出异常,默认跳过。<br />
|
||||||
|
|
||||||
|
* 必选:否 <br />
|
||||||
|
|
||||||
|
* 默认值:true <br />
|
||||||
|
|
||||||
常见配置:
|
常见配置:
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.alibaba.datax.plugin.reader.hdfsreader;
|
package com.alibaba.datax.plugin.reader.hdfsreader;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
import com.alibaba.datax.common.element.*;
|
import com.alibaba.datax.common.element.*;
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
import com.alibaba.datax.common.plugin.RecordSender;
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
@ -186,6 +187,12 @@ public class DFSUtil {
|
|||||||
// 获取要读取的文件的根目录
|
// 获取要读取的文件的根目录
|
||||||
Path listFiles = new Path(path);
|
Path listFiles = new Path(path);
|
||||||
|
|
||||||
|
if(!hdfs.exists(listFiles)){
|
||||||
|
String message = String.format("[%s] 路径目录不存在!",path);
|
||||||
|
LOG.warn(message);
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
// If the network disconnected, this method will retry 45 times
|
// If the network disconnected, this method will retry 45 times
|
||||||
// each time the retry interval for 20 seconds
|
// each time the retry interval for 20 seconds
|
||||||
// 获取要读取的文件的根目录的所有二级子文件目录
|
// 获取要读取的文件的根目录的所有二级子文件目录
|
||||||
|
@ -43,6 +43,7 @@ public class HdfsReader extends Reader {
|
|||||||
private List<String> path = null;
|
private List<String> path = null;
|
||||||
private boolean skipEmptyOrcFile = false;
|
private boolean skipEmptyOrcFile = false;
|
||||||
private Integer orcFileEmptySize = null;
|
private Integer orcFileEmptySize = null;
|
||||||
|
private Boolean skipEmptyDir=null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -58,6 +59,7 @@ public class HdfsReader extends Reader {
|
|||||||
public void validate(){
|
public void validate(){
|
||||||
this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
|
this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
|
||||||
HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
|
HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
|
||||||
|
skipEmptyDir = this.readerOriginConfig.getBool(Key.SKIP_EMPTY_DIR,true);
|
||||||
|
|
||||||
// path check
|
// path check
|
||||||
String pathInString = this.readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
|
String pathInString = this.readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
|
||||||
@ -193,9 +195,18 @@ public class HdfsReader extends Reader {
|
|||||||
// warn:每个slice拖且仅拖一个文件,
|
// warn:每个slice拖且仅拖一个文件,
|
||||||
// int splitNumber = adviceNumber;
|
// int splitNumber = adviceNumber;
|
||||||
int splitNumber = this.sourceFiles.size();
|
int splitNumber = this.sourceFiles.size();
|
||||||
|
LOG.info("split number:" + splitNumber);
|
||||||
|
|
||||||
if (0 == splitNumber) {
|
if (0 == splitNumber) {
|
||||||
throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
|
String message = String.format("未能找到待读取的文件,请确认您的配置项path: %s",
|
||||||
String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));
|
this.readerOriginConfig.getString(Key.PATH));
|
||||||
|
if(skipEmptyDir){
|
||||||
|
LOG.warn(message);
|
||||||
|
LOG.info("Task exited with return code 0");
|
||||||
|
System.exit(0);
|
||||||
|
}else {
|
||||||
|
throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
|
List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
|
||||||
|
@ -63,4 +63,9 @@ public final class Key {
|
|||||||
public static final String CDH_3_X_HIVE_VERSION = "3.1.3-cdh";
|
public static final String CDH_3_X_HIVE_VERSION = "3.1.3-cdh";
|
||||||
|
|
||||||
public static final String SUPPORT_ADD_MIDDLE_COLUMN = "supportAddMiddleColumn";
|
public static final String SUPPORT_ADD_MIDDLE_COLUMN = "supportAddMiddleColumn";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否跳过空目录异常
|
||||||
|
*/
|
||||||
|
public static final String SKIP_EMPTY_DIR="skipEmptyDir";
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user