This commit is contained in:
Zhang Xian 2025-04-10 16:22:00 +08:00 committed by GitHub
commit 4730a573c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 42 additions and 1 deletions

View File

@ -9,6 +9,6 @@ public class Constant {
public static final int DEFAULT_TIMEOUT = 60000; public static final int DEFAULT_TIMEOUT = 60000;
public static final int DEFAULT_MAX_TRAVERSAL_LEVEL = 100; public static final int DEFAULT_MAX_TRAVERSAL_LEVEL = 100;
public static final String DEFAULT_FTP_CONNECT_PATTERN = "PASV"; public static final String DEFAULT_FTP_CONNECT_PATTERN = "PASV";
public static final boolean DEFAULT_FTP_DELETE_SRC = false;
} }

View File

@ -104,4 +104,15 @@ public abstract class FtpHelper {
return sourceAllFiles; return sourceAllFiles;
} }
/**
*
* @Title: deleteFile
* @Description: 删除相应的文件
* @param @param filePath 文件路径
* @param @return
* @return void
* @throws
*/
public abstract void deleteFile(String filePath);
} }

View File

@ -244,6 +244,11 @@ public class FtpReader extends Reader {
UnstructuredStorageReaderUtil.readFromStream(inputStream, fileName, this.readerSliceConfig, UnstructuredStorageReaderUtil.readFromStream(inputStream, fileName, this.readerSliceConfig,
recordSender, this.getTaskPluginCollector()); recordSender, this.getTaskPluginCollector());
recordSender.flush(); recordSender.flush();
// 如果deletesrc为true则删除源文件
if(readerSliceConfig.getBool(Key.DELETESRC,Constant.DEFAULT_FTP_DELETE_SRC)){
ftpHelper.deleteFile(fileName);
LOG.info(String.format("deleted src file : [%s]", fileName));
}
} }
LOG.debug("end read source files..."); LOG.debug("end read source files...");

View File

@ -10,4 +10,5 @@ public class Key {
public static final String CONNECTPATTERN = "connectPattern"; public static final String CONNECTPATTERN = "connectPattern";
public static final String PATH = "path"; public static final String PATH = "path";
public static final String MAXTRAVERSALLEVEL = "maxTraversalLevel"; public static final String MAXTRAVERSALLEVEL = "maxTraversalLevel";
public static final String DELETESRC ="deletesrc";
} }

View File

@ -245,4 +245,16 @@ public class SftpHelper extends FtpHelper {
} }
} }
@Override
public void deleteFile(String filePath) {
try{
channelSftp.rm(filePath);
}catch(SftpException e){
String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message);
}
}
} }

View File

@ -226,4 +226,16 @@ public class StandardFtpHelper extends FtpHelper {
} }
} }
@Override
public void deleteFile(String filePath) {
try{
ftpClient.deleteFile(filePath);
}catch(IOException e){
String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message);
}
}
} }