diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java index 15019fdb..ccf4780e 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java @@ -9,6 +9,6 @@ public class Constant { public static final int DEFAULT_TIMEOUT = 60000; public static final int DEFAULT_MAX_TRAVERSAL_LEVEL = 100; public static final String DEFAULT_FTP_CONNECT_PATTERN = "PASV"; - + public static final boolean DEFAULT_FTP_DELETE_SRC = false; } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java index f8b3f56f..0982b08c 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java @@ -103,5 +103,16 @@ public abstract class FtpHelper { } return sourceAllFiles; } + + /** + * + * @Title: deleteFile + * @Description: 删除相应的文件 + * @param @param filePath 文件路径 + * @param @return + * @return void + * @throws + */ + public abstract void deleteFile(String filePath); } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java index c1f20dfd..b9396330 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java @@ -244,6 +244,11 @@ public class FtpReader extends Reader { UnstructuredStorageReaderUtil.readFromStream(inputStream, fileName, this.readerSliceConfig, recordSender, this.getTaskPluginCollector()); recordSender.flush(); + // 如果deletesrc为true,则删除源文件 + if(readerSliceConfig.getBool(Key.DELETESRC,Constant.DEFAULT_FTP_DELETE_SRC)){ + ftpHelper.deleteFile(fileName); + LOG.info(String.format("deleted src file : [%s]", fileName)); + } } LOG.debug("end read source files..."); diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java index cdbd043c..9c77a357 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java @@ -10,4 +10,5 @@ public class Key { public static final String CONNECTPATTERN = "connectPattern"; public static final String PATH = "path"; public static final String MAXTRAVERSALLEVEL = "maxTraversalLevel"; + public static final String DELETESRC ="deletesrc"; } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java index 6e42e10c..dc7af7e8 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java @@ -244,5 +244,17 @@ public class SftpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } + + @Override + public void deleteFile(String filePath) { + + try{ + channelSftp.rm(filePath); + }catch(SftpException e){ + String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); + LOG.error(message); + throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); + } + } } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index 79b23f8b..e7a2eed9 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -225,5 +225,17 @@ public class StandardFtpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } + + @Override + public void deleteFile(String filePath) { + + try{ + ftpClient.deleteFile(filePath); + }catch(IOException e){ + String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); + LOG.error(message); + throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); + } + } }