Update DFSUtil.java

在进入addSourceFileByType方法时进行判断,避免在文件大小为0时,程序报错退出
This commit is contained in:
cangos2012 2019-08-26 16:27:53 +08:00 committed by GitHub
parent 05a000db44
commit 25ca2bec12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -180,20 +180,36 @@ public class DFSUtil {
// 根据用户指定的文件类型将指定的文件类型的路径加入sourceHDFSAllFilesList // 根据用户指定的文件类型将指定的文件类型的路径加入sourceHDFSAllFilesList
private void addSourceFileByType(String filePath) { private void addSourceFileByType(String filePath) {
// 检查file的类型和用户配置的fileType类型是否一致 Path file = new Path(filePath);
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
if (isMatchedFileType) { try {
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType)); FileSystem fs = FileSystem.get(hadoopConf);
sourceHDFSAllFilesList.add(filePath); FSDataInputStream in = fs.open(file);
} else { long size = fs.getFileStatus(file).getLen();
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致" + //在进入addSourceFileByType方法时进行判断避免在文件大小为0时程序报错退出
if(size > 0){
// 检查file的类型和用户配置的fileType类型是否一致
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
if (isMatchedFileType) {
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType));
sourceHDFSAllFilesList.add(filePath);
} else {
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致" +
"请确认您配置的目录下面所有文件的类型均为[%s]" "请确认您配置的目录下面所有文件的类型均为[%s]"
, filePath, this.specifiedFileType); , filePath, this.specifiedFileType);
LOG.error(message); LOG.error(message);
throw DataXException.asDataXException( throw DataXException.asDataXException(
HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message); HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
} }
}else{
//文件大小为0时记录日志
String message = String.format("文件[%s]长度为0将会跳过不作处理", filePath);
LOG.info(message);
}
}catch(Exception e){
LOG.warn(String.format("未知错误, %s", e.getMessage()));
}
} }
public InputStream getInputStream(String filepath) { public InputStream getInputStream(String filepath) {