mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 23:40:04 +08:00
SQOOP-3225: Mainframe module FTP listing parser should cater for larger datasets on disk
(Chris Teoh via Boglarka Egyed)
This commit is contained in:
parent
1c1905b08f
commit
e8588e243e
@ -57,6 +57,12 @@ public static List<String> listSequentialDatasets(String pdsName, Configuration
|
||||
String dsName = pdsName;
|
||||
String fileName = "";
|
||||
MainframeDatasetPath p = null;
|
||||
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
|
||||
if (dsType == null) {
|
||||
// default dataset type to partitioned dataset
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
|
||||
dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
|
||||
}
|
||||
try {
|
||||
p = new MainframeDatasetPath(dsName,conf);
|
||||
} catch (Exception e) {
|
||||
@ -64,8 +70,6 @@ public static List<String> listSequentialDatasets(String pdsName, Configuration
|
||||
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
|
||||
e.printStackTrace();
|
||||
}
|
||||
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
|
||||
boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE));
|
||||
boolean isSequentialDs = false;
|
||||
boolean isGDG = false;
|
||||
if (dsType != null && p != null) {
|
||||
@ -80,7 +84,8 @@ public static List<String> listSequentialDatasets(String pdsName, Configuration
|
||||
if (ftp != null) {
|
||||
ftp.changeWorkingDirectory("'" + pdsName + "'");
|
||||
FTPFile[] ftpFiles = null;
|
||||
if (isTape) {
|
||||
if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) {
|
||||
// excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
|
||||
FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
|
||||
List<FTPFile> listing = new ArrayList<FTPFile>();
|
||||
while(parser.hasNext()) {
|
||||
@ -102,7 +107,11 @@ public static List<String> listSequentialDatasets(String pdsName, Configuration
|
||||
}
|
||||
}
|
||||
}
|
||||
else { ftpFiles = ftp.listFiles(); }
|
||||
else {
|
||||
// partitioned datasets have a different FTP listing structure
|
||||
LOG.info("Dataset is a partitioned dataset, using default FTP list parsing");
|
||||
ftpFiles = ftp.listFiles();
|
||||
}
|
||||
if (!isGDG) {
|
||||
for (FTPFile f : ftpFiles) {
|
||||
LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
|
||||
|
@ -21,12 +21,14 @@
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.apache.commons.net.ftp.FTPListParseEngine;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.sqoop.mapreduce.JobBase;
|
||||
@ -42,12 +44,14 @@ public class TestMainframeFTPClientUtils {
|
||||
private JobConf conf;
|
||||
|
||||
private FTPClient mockFTPClient;
|
||||
private FTPListParseEngine mockFTPListParseEngine;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conf = new JobConf();
|
||||
mockFTPClient = mock(FTPClient.class);
|
||||
when(mockFTPClient.getReplyString()).thenReturn("");
|
||||
mockFTPListParseEngine = mock(FTPListParseEngine.class);
|
||||
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
|
||||
}
|
||||
|
||||
@ -173,7 +177,9 @@ public void testListSequentialDatasets() {
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("blah2");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2});
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
|
||||
when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
@ -210,7 +216,9 @@ public void testListEmptySequentialDatasets() {
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("blah2");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPListParseEngine.hasNext()).thenReturn(true,false);
|
||||
when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
@ -248,7 +256,9 @@ public void testTrailingDotSequentialDatasets() {
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("blah2");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
|
||||
when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
@ -287,7 +297,9 @@ public void testGdgGetLatest() {
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("G0101V00");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
|
||||
when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
@ -312,4 +324,42 @@ public void testGdgGetLatest() {
|
||||
Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionedDatasetsShouldReturnAllFiles() {
|
||||
try {
|
||||
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
|
||||
when(mockFTPClient.logout()).thenReturn(true);
|
||||
when(mockFTPClient.isConnected()).thenReturn(false);
|
||||
when(mockFTPClient.getReplyCode()).thenReturn(200);
|
||||
when(mockFTPClient.changeWorkingDirectory("a.b.c.blah1")).thenReturn(true);
|
||||
FTPFile file1 = new FTPFile();
|
||||
file1.setName("blah1");
|
||||
file1.setType(FTPFile.FILE_TYPE);
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("blah2");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
// initiateListParsing should not be called here as it is a partitioned dataset and default to listFiles()
|
||||
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
|
||||
try {
|
||||
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
|
||||
List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
|
||||
Assert.assertTrue(files != null && files.size() == 2);
|
||||
verify(mockFTPClient).listFiles();
|
||||
} catch (IOException ioe) {
|
||||
String ioeString = ioe.getMessage();
|
||||
Assert.fail(ioeString);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user