mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 05:11:42 +08:00
Merge 25ca2bec12
into 0824b45c5e
This commit is contained in:
commit
35e3170511
@ -219,20 +219,36 @@ public class DFSUtil {
|
|||||||
|
|
||||||
// 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
|
// 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
|
||||||
private void addSourceFileByType(String filePath) {
|
private void addSourceFileByType(String filePath) {
|
||||||
// 检查file的类型和用户配置的fileType类型是否一致
|
Path file = new Path(filePath);
|
||||||
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
|
|
||||||
|
|
||||||
if (isMatchedFileType) {
|
try {
|
||||||
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType));
|
FileSystem fs = FileSystem.get(hadoopConf);
|
||||||
sourceHDFSAllFilesList.add(filePath);
|
FSDataInputStream in = fs.open(file);
|
||||||
} else {
|
long size = fs.getFileStatus(file).getLen();
|
||||||
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
|
//在进入addSourceFileByType方法时进行判断,避免在文件大小为0时,程序报错退出
|
||||||
|
if(size > 0){
|
||||||
|
// 检查file的类型和用户配置的fileType类型是否一致
|
||||||
|
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
|
||||||
|
|
||||||
|
if (isMatchedFileType) {
|
||||||
|
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType));
|
||||||
|
sourceHDFSAllFilesList.add(filePath);
|
||||||
|
} else {
|
||||||
|
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
|
||||||
"请确认您配置的目录下面所有文件的类型均为[%s]"
|
"请确认您配置的目录下面所有文件的类型均为[%s]"
|
||||||
, filePath, this.specifiedFileType);
|
, filePath, this.specifiedFileType);
|
||||||
LOG.error(message);
|
LOG.error(message);
|
||||||
throw DataXException.asDataXException(
|
throw DataXException.asDataXException(
|
||||||
HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
|
HdfsReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
|
||||||
}
|
}
|
||||||
|
}else{
|
||||||
|
//文件大小为0时,记录日志
|
||||||
|
String message = String.format("文件[%s]长度为0,将会跳过不作处理!", filePath);
|
||||||
|
LOG.info(message);
|
||||||
|
}
|
||||||
|
}catch(Exception e){
|
||||||
|
LOG.warn(String.format("未知错误, %s", e.getMessage()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public InputStream getInputStream(String filepath) {
|
public InputStream getInputStream(String filepath) {
|
||||||
|
Loading…
Reference in New Issue
Block a user