mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 20:09:32 +08:00
SQOOP-3326: Mainframe FTP listing for GDG should filter out non-GDG datasets in a heterogeneous listing
(Chris Teoh via Szabolcs Vasas)
This commit is contained in:
parent
973629912c
commit
18212becec
@ -262,7 +262,9 @@
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq" value="TSODIQ1.FOLDER.FOLDERTXT" />
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="FOLDERTXT" />
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="1591c0fcc718fda7e9c1f3561d232b2b" />
|
||||
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="TSODIQ1.MIXED" />
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="G0039V00" />
|
||||
<property name="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="5e7f4ec7cbeae8e0e0b4d88346eb9349" />
|
||||
<property name="s3.bucket.url" value="" />
|
||||
<property name="s3.generator.command" value="" />
|
||||
|
||||
@ -912,6 +914,9 @@
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq" value="${sqoop.test.mainframe.ftp.binary.dataset.seq}" />
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.filename}" />
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.md5}" />
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed}" />
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.filename}" />
|
||||
<sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.mixed.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.mixed.md5}" />
|
||||
<!-- Location of Hive logs -->
|
||||
<!--<sysproperty key="hive.log.dir"
|
||||
value="${test.build.data}/sqoop/logs"/> -->
|
||||
|
@ -31,7 +31,7 @@ public class MainframeConfiguration
|
||||
public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED
|
||||
= "p";
|
||||
public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
|
||||
|
||||
public static final String MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME = MainframeFTPFileGdgEntryParser.class.getName();
|
||||
public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
|
||||
|
||||
public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode";
|
||||
|
@ -0,0 +1,86 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.mainframe;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.net.ftp.FTPClientConfig;
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl;
|
||||
|
||||
public class MainframeFTPFileGdgEntryParser extends ConfigurableFTPFileEntryParserImpl {
|
||||
/* Sample FTP listing
|
||||
Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
|
||||
H19761 Tape G0034V00
|
||||
H81751 Tape G0035V00
|
||||
H73545 Tape G0036V00
|
||||
G10987 Tape G0037V00
|
||||
SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY
|
||||
SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY
|
||||
SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO
|
||||
|
||||
* And what we need to get back from parsing are the following entries:-
|
||||
H19761 Tape G0034V00
|
||||
H81751 Tape G0035V00
|
||||
H73545 Tape G0036V00
|
||||
G10987 Tape G0037V00
|
||||
*/
|
||||
|
||||
private static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm";
|
||||
private static final String HEADER = "Volume Unit ";
|
||||
private static String GDG_REGEX = "^\\S+\\s+.*?\\s+(G\\d{4}V\\d{2})$";
|
||||
private static final Log LOG = LogFactory.getLog(MainframeFTPFileGdgEntryParser.class.getName());
|
||||
|
||||
public MainframeFTPFileGdgEntryParser() {
|
||||
super(GDG_REGEX);
|
||||
LOG.info("MainframeFTPFileGdgEntryParser default constructor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public FTPFile parseFTPEntry(String entry) {
|
||||
LOG.info("parseFTPEntry: "+entry);
|
||||
if (isFtpListingHeader(entry)) {
|
||||
return null;
|
||||
}
|
||||
if (matches(entry)) {
|
||||
String dsName = group(1);
|
||||
return createFtpFile(entry,dsName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected FTPFile createFtpFile(String entry, String dsName) {
|
||||
FTPFile file = new FTPFile();
|
||||
file.setRawListing(entry);
|
||||
file.setName(dsName);
|
||||
file.setType(FTPFile.FILE_TYPE);
|
||||
return file;
|
||||
}
|
||||
|
||||
protected Boolean isFtpListingHeader(String entry) {
|
||||
return entry.startsWith(HEADER);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FTPClientConfig getDefaultConfiguration() {
|
||||
return new FTPClientConfig(FTPClientConfig.SYST_MVS,
|
||||
DEFAULT_DATE_FORMAT, null, null, null, null);
|
||||
}
|
||||
|
||||
}
|
@ -86,8 +86,14 @@ public static List<String> listSequentialDatasets(String pdsName, Configuration
|
||||
ftp.changeWorkingDirectory("'" + pdsName + "'");
|
||||
FTPFile[] ftpFiles = null;
|
||||
if (!MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED.equals(dsType)) {
|
||||
// excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
|
||||
FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
|
||||
FTPListParseEngine parser = null;
|
||||
if (MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG.equals(dsType)) {
|
||||
// use GDG specific parser to filter out non GDG datasets
|
||||
parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME, "");
|
||||
} else {
|
||||
// excepting partitioned datasets, use the MainframeFTPFileEntryParser, default doesn't match larger datasets
|
||||
parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
|
||||
}
|
||||
List<FTPFile> listing = new ArrayList<FTPFile>();
|
||||
while(parser.hasNext()) {
|
||||
FTPFile[] files = parser.getNext(25);
|
||||
|
@ -112,7 +112,7 @@ services:
|
||||
timeout: 10s
|
||||
retries: 20
|
||||
mainframe:
|
||||
image: cntroversycubed/sqoopgdg:afdf57b15d8e71eb77d24d606b77e185ef39ceb3
|
||||
image: cntroversycubed/sqoopgdg:42e6c3a1229a6cdf346eb3976bd7298091ea11e2
|
||||
container_name: sqoop_mainframe_gdg_container
|
||||
ports:
|
||||
- 2121:2121
|
||||
|
@ -149,6 +149,13 @@ public void testImportSequentialBinaryWithBufferSize() throws IOException {
|
||||
doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImportMixedBinaryWithBufferSize() throws IOException {
|
||||
HashMap<String,String> files = new HashMap<String,String>();
|
||||
files.put(MainframeTestUtil.MIXED_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_MIXED_BINARY_DATASET_MD5);
|
||||
doImportAndVerify(MainframeTestUtil.MIXED_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000");
|
||||
}
|
||||
|
||||
private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
|
@ -68,4 +68,13 @@ public class MainframeTestUtil {
|
||||
public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty(
|
||||
"sqoop.test.mainframe.ftp.binary.dataset.seq.md5",
|
||||
"1591c0fcc718fda7e9c1f3561d232b2b");
|
||||
public static final String MIXED_BINARY_DATASET_NAME = System.getProperty(
|
||||
"sqoop.test.mainframe.ftp.binary.dataset.mixed",
|
||||
"TSODIQ1.MIXED");
|
||||
public static final String MIXED_BINARY_DATASET_FILENAME = System.getProperty(
|
||||
"sqoop.test.mainframe.ftp.binary.dataset.mixed.filename",
|
||||
"G0039V00");
|
||||
public static final String EXPECTED_MIXED_BINARY_DATASET_MD5 = System.getProperty(
|
||||
"sqoop.test.mainframe.ftp.binary.dataset.mixed.md5",
|
||||
"5e7f4ec7cbeae8e0e0b4d88346eb9349");
|
||||
}
|
||||
|
@ -0,0 +1,88 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.sqoop.mapreduce.mainframe;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestMainframeFTPFileGdgEntryParser {
|
||||
/* Sample FTP listing
|
||||
Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
|
||||
H19761 Tape G0034V00
|
||||
H81751 Tape G0035V00
|
||||
H73545 Tape G0036V00
|
||||
G10987 Tape G0037V00
|
||||
SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY
|
||||
SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY
|
||||
SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO
|
||||
|
||||
* And what we need to get back from parsing are the following entries:-
|
||||
H19761 Tape G0034V00
|
||||
H81751 Tape G0035V00
|
||||
H73545 Tape G0036V00
|
||||
G10987 Tape G0037V00
|
||||
*/
|
||||
private final static String FTP_LIST_HEADER = "Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname";
|
||||
private final String DSNAME = "G0034V00";
|
||||
private final String ENTRY = String.format("H19761 Tape %s",DSNAME);
|
||||
private List<String> listing;
|
||||
private MainframeFTPFileGdgEntryParser parser;
|
||||
@Before
|
||||
public void setUpBefore() throws Exception {
|
||||
parser = new MainframeFTPFileGdgEntryParser();
|
||||
listing = new ArrayList<>();
|
||||
listing.add("Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname");
|
||||
listing.add(ENTRY);
|
||||
listing.add("H81751 Tape G0035V00");
|
||||
listing.add("H73545 Tape G0036V00");
|
||||
listing.add("G10987 Tape G0037V00");
|
||||
listing.add("SHT331 3390 **NONE** 1 15 VB 114 27998 PS DUMMY");
|
||||
listing.add("SHT337 3390 **NONE** 1 15 VB 114 27998 PS G0035V00.COPY");
|
||||
listing.add("SHT33A 3390 **NONE** 1 15 VB 114 27998 PS HELLO");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsHeader() {
|
||||
assertTrue(parser.isFtpListingHeader(FTP_LIST_HEADER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateFtpFile() {
|
||||
FTPFile file = parser.createFtpFile(ENTRY, DSNAME);
|
||||
assertEquals(ENTRY,file.getRawListing());
|
||||
assertEquals(DSNAME,file.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseFTPEntry() {
|
||||
final int EXPECTED_RECORD_COUNT=4;
|
||||
long i = listing.stream()
|
||||
.map(parser::parseFTPEntry)
|
||||
.filter(Objects::nonNull)
|
||||
.count();
|
||||
assertEquals(EXPECTED_RECORD_COUNT,i);
|
||||
}
|
||||
}
|
@ -297,7 +297,7 @@ public void testGdgGetLatest() {
|
||||
FTPFile file2 = new FTPFile();
|
||||
file2.setName("G0101V00");
|
||||
file2.setType(FTPFile.FILE_TYPE);
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPClient.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_GDG_ENTRY_PARSER_CLASSNAME,"")).thenReturn(mockFTPListParseEngine);
|
||||
when(mockFTPListParseEngine.hasNext()).thenReturn(true,true,false);
|
||||
when(mockFTPListParseEngine.getNext(25)).thenReturn(new FTPFile[] {file1,file2});
|
||||
} catch (IOException e) {
|
||||
|
Loading…
Reference in New Issue
Block a user