diff --git a/build.xml b/build.xml index cd2e9e29..f3975317 100644 --- a/build.xml +++ b/build.xml @@ -262,7 +262,9 @@ - + + + @@ -912,6 +914,9 @@ + + + diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java index 9d6a2fe7..9842daa6 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -31,7 +31,7 @@ public class MainframeConfiguration public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED = "p"; public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape"; - + public static final String MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME = MainframeFTPFileGdgEntryParser.class.getName(); public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser"; public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode"; diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java new file mode 100644 index 00000000..8467afa7 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileGdgEntryParser.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTPClientConfig; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl; + +public class MainframeFTPFileGdgEntryParser extends ConfigurableFTPFileEntryParserImpl { +/* Sample FTP listing +Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname +H19761 Tape G0034V00 +H81751 Tape G0035V00 +H73545 Tape G0036V00 +G10987 Tape G0037V00 +SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY +SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY +SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO + +* And what we need to get back from parsing are the following entries:- +H19761 Tape G0034V00 +H81751 Tape G0035V00 +H73545 Tape G0036V00 +G10987 Tape G0037V00 +*/ + + private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm"; + private static final String HEADER = "Volume Unit "; + private static String GDG_REGEX = "^\\S+\\s+.*?\\s+(G\\d{4}V\\d{2})$"; + private static final Log LOG = LogFactory.getLog(MainframeFTPFileGdgEntryParser.class.getName()); + + public MainframeFTPFileGdgEntryParser() { + super(GDG_REGEX); + LOG.info("MainframeFTPFileGdgEntryParser default constructor"); + } + + @Override + public FTPFile parseFTPEntry(String entry) { + LOG.info("parseFTPEntry: "+entry); + if (isFtpListingHeader(entry)) { + return null; + } + if (matches(entry)) { + String dsName = group(1); + return createFtpFile(entry,dsName); + } + return null; + } + + protected FTPFile createFtpFile(String entry, String dsName) { + FTPFile file = new FTPFile(); + file.setRawListing(entry); + file.setName(dsName); + file.setType(FTPFile.FILE_TYPE); + return file; + } + + protected Boolean isFtpListingHeader(String entry) { + return entry.startsWith(HEADER); + } + + @Override + protected FTPClientConfig getDefaultConfiguration() { + return new FTPClientConfig(FTPClientConfig.SYST_MVS, + DEFAULT_DATE_FORMAT, null, null, null, null); + } + +} diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index 654721e3..e7c48a6b 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -86,8 +86,14 @@ public static List listSequentialDatasets(String pdsName, Configuration ftp.changeWorkingDirectory("'" + pdsName + "'"); FTPFile[] ftpFiles = null; if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) { - // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets - FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); + FTPListParseEngine parser = null; + if (MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG.equals(dsType)) { + // use GDG specific parser to filter out non GDG datasets + parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME, ""); + } else { + // excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets + parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); + } List listing = new ArrayList(); while(parser.hasNext()) { FTPFile[] files = parser.getNext(25); diff --git a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml index 4648f545..b4cf4886 100644 --- a/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml +++ b/src/scripts/thirdpartytest/docker-compose/sqoop-thirdpartytest-db-services.yml @@ -112,7 +112,7 @@ services: timeout: 10s retries: 20 mainframe: - image: cntroversycubed/sqoopgdg:afdf57b15d8e71eb77d24d606b77e185ef39ceb3 + image: cntroversycubed/sqoopgdg:42e6c3a1229a6cdf346eb3976bd7298091ea11e2 container_name: sqoop_mainframe_gdg_container ports: - 2121:2121 diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java index 3b8ed236..af5c7541 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java @@ -149,6 +149,13 @@ public void testImportSequentialBinaryWithBufferSize() throws IOException { doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000"); } + @Test + public void testImportMixedBinaryWithBufferSize() throws IOException { + HashMap files = new HashMap(); + files.put(MainframeTestUtil.MIXED_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_MIXED_BINARY_DATASET_MD5); + doImportAndVerify(MainframeTestUtil.MIXED_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000"); + } + private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) { ArrayList args = new ArrayList(); diff --git a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java index 9f86f6cd..a65aea63 100644 --- a/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java +++ b/src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java @@ -68,4 +68,13 @@ public class MainframeTestUtil { public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty( "sqoop.test.mainframe.ftp.binary.dataset.seq.md5", "1591c0fcc718fda7e9c1f3561d232b2b"); + public static final String MIXED_BINARY_DATASET_NAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed", + "TSODIQ1.MIXED"); + public static final String MIXED_BINARY_DATASET_FILENAME = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed.filename", + "G0039V00"); + public static final String EXPECTED_MIXED_BINARY_DATASET_MD5 = System.getProperty( + "sqoop.test.mainframe.ftp.binary.dataset.mixed.md5", + "5e7f4ec7cbeae8e0e0b4d88346eb9349"); } diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java new file mode 100644 index 00000000..521a0426 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileGdgEntryParser.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce.mainframe; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.commons.net.ftp.FTPFile; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestMainframeFTPFileGdgEntryParser { + /* Sample FTP listing + Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname + H19761 Tape G0034V00 + H81751 Tape G0035V00 + H73545 Tape G0036V00 + G10987 Tape G0037V00 + SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY + SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY + SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO + + * And what we need to get back from parsing are the following entries:- + H19761 Tape G0034V00 + H81751 Tape G0035V00 + H73545 Tape G0036V00 + G10987 Tape G0037V00 + */ + private final static String FTP_LIST_HEADER = "Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname"; + private final String DSNAME = "G0034V00"; + private final String ENTRY = String.format("H19761 Tape %s",DSNAME); + private List listing; + private MainframeFTPFileGdgEntryParser parser; + @Before + public void setUpBefore() throws Exception { + parser = new MainframeFTPFileGdgEntryParser(); + listing = new ArrayList<>(); + listing.add("Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname"); + listing.add(ENTRY); + listing.add("H81751 Tape G0035V00"); + listing.add("H73545 Tape G0036V00"); + listing.add("G10987 Tape G0037V00"); + listing.add("SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY"); + listing.add("SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY"); + listing.add("SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO"); + } + + @Test + public void testIsHeader() { + assertTrue(parser.isFtpListingHeader(FTP_LIST_HEADER)); + } + + @Test + public void testCreateFtpFile() { + FTPFile file = parser.createFtpFile(ENTRY, DSNAME); + assertEquals(ENTRY,file.getRawListing()); + assertEquals(DSNAME,file.getName()); + } + + @Test + public void testParseFTPEntry() { + final int EXPECTED_RECORD_COUNT=4; + long i = listing.stream() + .map(parser::parseFTPEntry) + .filter(Objects::nonNull) + .count(); + assertEquals(EXPECTED_RECORD_COUNT,i); + } +} diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java index 90a85194..0714bdcf 100644 --- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -297,7 +297,7 @@ public void testGdgGetLatest() { FTPFile file2 = new FTPFile(); file2.setName("G0101V00"); file2.setType(FTPFile.FILE_TYPE); - when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); + when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine); when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false); when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2}); } catch (IOException e) {