From bfabe29127fde9d1867b61673505d321cc072e3d Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:38:13 +0800 Subject: [PATCH 01/10] Update Constant.java --- .../com/alibaba/datax/plugin/reader/ftpreader/Constant.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java index 15019fdb..ccf4780e 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Constant.java @@ -9,6 +9,6 @@ public class Constant { public static final int DEFAULT_TIMEOUT = 60000; public static final int DEFAULT_MAX_TRAVERSAL_LEVEL = 100; public static final String DEFAULT_FTP_CONNECT_PATTERN = "PASV"; - + public static final boolean DEFAULT_FTP_DELETE_SRC = false; } From 980fe38fe440dd1337f1b4aa0031739ae9a5b00e Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:40:51 +0800 Subject: [PATCH 02/10] Update FtpHelper.java --- .../datax/plugin/reader/ftpreader/FtpHelper.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java index f8b3f56f..57201999 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java @@ -103,5 +103,16 @@ public abstract class FtpHelper { } return sourceAllFiles; } + + /** + * + * @Title: deleteFile + * @Description: 删除相应的文件 + * @param @param filePath 文件路径 + * @param @return + * @return boolean + * @throws + */ + public abstract boolean deleteFile(String filePath); } From fae4214e45bb3428023c64ca5697986eff0c6bad Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:44:56 +0800 Subject: [PATCH 03/10] Update StandardFtpHelper.java --- .../plugin/reader/ftpreader/StandardFtpHelper.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index 79b23f8b..e7a2eed9 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -225,5 +225,17 @@ public class StandardFtpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } + + @Override + public void deleteFile(String filePath) { + + try{ + ftpClient.deleteFile(filePath); + }catch(IOException e){ + String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); + LOG.error(message); + throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); + } + } } From 1a362b350a54f77618ee34b0ab607125564df09d Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:47:03 +0800 Subject: [PATCH 04/10] Update FtpHelper.java --- .../com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java index 57201999..0982b08c 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java @@ -110,9 +110,9 @@ public abstract class FtpHelper { * @Description: 删除相应的文件 * @param @param filePath 文件路径 * @param @return - * @return boolean + * @return void * @throws */ - public abstract boolean deleteFile(String filePath); + public abstract void deleteFile(String filePath); } From fbec1932410e60bc0b31494d2c3634779aae0e43 Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:49:18 +0800 Subject: [PATCH 05/10] Update StandardFtpHelper.java --- .../plugin/reader/ftpreader/StandardFtpHelper.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index e7a2eed9..c876a1f2 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -237,5 +237,17 @@ public class StandardFtpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } + + @Override + public void deleteFile(String filePath) { + + try{ + channelSftp.rm(filePath); + }catch(SftpException e){ + String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); + LOG.error(message); + throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); + } + } } From 9ed3fbc74d78356a561b438ab5f065d583104408 Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:50:07 +0800 Subject: [PATCH 06/10] Update Key.java --- .../main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java index cdbd043c..9c77a357 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java @@ -10,4 +10,5 @@ public class Key { public static final String CONNECTPATTERN = "connectPattern"; public static final String PATH = "path"; public static final String MAXTRAVERSALLEVEL = "maxTraversalLevel"; + public static final String DELETESRC ="deletesrc"; } From 40aed187f9fa4833a03abfe99170c572c5ee079c Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:51:05 +0800 Subject: [PATCH 07/10] Update FtpReader.java --- .../com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java index c1f20dfd..8fe4e484 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java @@ -244,6 +244,11 @@ public class FtpReader extends Reader { UnstructuredStorageReaderUtil.readFromStream(inputStream, fileName, this.readerSliceConfig, recordSender, this.getTaskPluginCollector()); recordSender.flush(); + // 如果deletesrc为true,则删除源文件 + if(readerSliceConfig.getBool(Key.DELETESRC,Constant.DEFAULT_FTP_DELETE_SRC)){ + ftpHelper.deleteFile(fileName); + LOG.info(String.format("deleted file : [%s]", fileName)); + } } LOG.debug("end read source files..."); From 908d0fda44ec270c984c7b47067caeb7bc7a3ebe Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:58:52 +0800 Subject: [PATCH 08/10] Update StandardFtpHelper.java --- .../plugin/reader/ftpreader/StandardFtpHelper.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index c876a1f2..e7a2eed9 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -237,17 +237,5 @@ public class StandardFtpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } - - @Override - public void deleteFile(String filePath) { - - try{ - channelSftp.rm(filePath); - }catch(SftpException e){ - String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); - LOG.error(message); - throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); - } - } } From 42f125aabfff101ac16e029685dffd556c41d6f5 Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 22:59:14 +0800 Subject: [PATCH 09/10] Update SftpHelper.java --- .../datax/plugin/reader/ftpreader/SftpHelper.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java index d25b040c..48d994b8 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java @@ -242,5 +242,17 @@ public class SftpHelper extends FtpHelper { throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); } } + + @Override + public void deleteFile(String filePath) { + + try{ + channelSftp.rm(filePath); + }catch(SftpException e){ + String message = String.format("删除文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filePath, filePath); + LOG.error(message); + throw DataXException.asDataXException(FtpReaderErrorCode.OPEN_FILE_ERROR, message); + } + } } From b9bb8cc29ff9a0a47106c8fd88b92c2980424beb Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Thu, 9 Sep 2021 23:02:05 +0800 Subject: [PATCH 10/10] Update FtpReader.java --- .../com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java index 8fe4e484..b9396330 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java @@ -247,7 +247,7 @@ public class FtpReader extends Reader { // 如果deletesrc为true,则删除源文件 if(readerSliceConfig.getBool(Key.DELETESRC,Constant.DEFAULT_FTP_DELETE_SRC)){ ftpHelper.deleteFile(fileName); - LOG.info(String.format("deleted file : [%s]", fileName)); + LOG.info(String.format("deleted src file : [%s]", fileName)); } }