diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index f61b9838..95bc0ecb 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -57,6 +57,12 @@ public static List listSequentialDatasets(String pdsName, Configuration String dsName = pdsName; String fileName = ""; MainframeDatasetPath p = null; + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + if (dsType == null) { + // default dataset type to partitioned dataset + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED); + dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + } try { p = new MainframeDatasetPath(dsName,conf); } catch (Exception e) { @@ -64,8 +70,6 @@ public static List listSequentialDatasets(String pdsName, Configuration LOG.error("MainframeDatasetPath helper class incorrectly initialised"); e.printStackTrace(); } - String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); - boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE)); boolean isSequentialDs = false; boolean isGDG = false; if (dsType != null && p != null) { @@ -80,7 +84,8 @@ public static List listSequentialDatasets(String pdsName, Configuration if (ftp != null) { ftp.changeWorkingDirectory("'" + pdsName + "'"); FTPFile[] ftpFiles = null; - if (isTape) { + if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) { + // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); List listing = new ArrayList(); while(parser.hasNext()) { @@ -102,7 +107,11 @@ public static List listSequentialDatasets(String pdsName, Configuration } } } - else { ftpFiles = ftp.listFiles(); } + else { + // partitioned datasets have a different FTP listing structure + LOG.info("Dataset is a partitioned dataset, using default FTP list parsing"); + ftpFiles = ftp.listFiles(); + } if (!isGDG) { for (FTPFile f : ftpFiles) { LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType())); diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java index d87c75df..90a85194 100644 --- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -21,12 +21,14 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.List; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPListParseEngine; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.sqoop.mapreduce.JobBase; @@ -42,12 +44,14 @@ public class TestMainframeFTPClientUtils { private JobConf conf; private FTPClient mockFTPClient; + private FTPListParseEngine mockFTPListParseEngine; @Before public void setUp() { conf = new JobConf(); mockFTPClient = mock(FTPClient.class); when(mockFTPClient.getReplyString()).thenReturn(""); + mockFTPListParseEngine = mock(FTPListParseEngine.class); MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); } @@ -173,7 +177,9 @@ public void testListSequentialDatasets() { FTPFile file2 = new FTPFile(); file2.setName("blah2"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2}); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPListParseEngine.hasNext()).thenReturn(true,false); + when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) { fail("No IOException should be thrown!"); } @@ -210,7 +216,9 @@ public void testListEmptySequentialDatasets() { FTPFile file2 = new FTPFile(); file2.setName("blah2"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPListParseEngine.hasNext()).thenReturn(true,false); + when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) { fail("No IOException should be thrown!"); } @@ -248,7 +256,9 @@ public void testTrailingDotSequentialDatasets() { FTPFile file2 = new FTPFile(); file2.setName("blah2"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false); + when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) { fail("No IOException should be thrown!"); } @@ -287,7 +297,9 @@ public void testGdgGetLatest() { FTPFile file2 = new FTPFile(); file2.setName("G0101V00"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false); + when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) { fail("No IOException should be thrown!"); } @@ -312,4 +324,42 @@ public void testGdgGetLatest() { Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString); } } + + @Test + public void testPartitionedDatasetsShouldReturnAllFiles() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.changeWorkingDirectory("a.b.c.blah1")).thenReturn(true); + FTPFile file1 = new FTPFile(); + file1.setName("blah1"); + file1.setType(FTPFile.FILE_TYPE); + FTPFile file2 = new FTPFile(); + file2.setName("blah2"); + file2.setType(FTPFile.FILE_TYPE); + // initiateListParsing should not be called here as it is a partitioned dataset and default to listFiles() + when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes()); + try { + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + List files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); + Assert.assertTrue(files != null && files.size() == 2); + verify(mockFTPClient).listFiles(); + } catch (IOException ioe) { + String ioeString = ioe.getMessage(); + Assert.fail(ioeString); + } + } }