From b302d89faebd3f21cdbddcf1e1d74bf8ebd8035d Mon Sep 17 00:00:00 2001 From: Kate Ting Date: Mon, 6 Jun 2016 17:31:17 -0700 Subject: [PATCH] SQOOP-2938: Mainframe import module extension to support data sets on tape (Chris Teoh via Kate Ting) --- src/docs/man/mainframe-connection-args.txt | 6 + src/java/org/apache/sqoop/SqoopOptions.java | 35 ++++ .../mainframe/MainframeConfiguration.java | 11 ++ .../MainframeDatasetFTPRecordReader.java | 18 +- .../MainframeDatasetInputFormat.java | 6 +- .../mainframe/MainframeDatasetPath.java | 117 +++++++++++++ .../mainframe/MainframeDatasetType.java | 19 +++ .../MainframeFTPFileEntryParser.java | 93 +++++++++++ .../mainframe/MainframeImportJob.java | 6 + .../sqoop/tool/MainframeImportTool.java | 38 +++++ .../sqoop/util/MainframeFTPClientUtils.java | 79 +++++++-- .../mainframe/TestMainframeDatasetPath.java | 86 ++++++++++ .../TestMainframeFTPFileEntryParser.java | 64 +++++++ .../mainframe/TestMainframeImportJob.java | 2 + .../sqoop/tool/TestMainframeImportTool.java | 77 +++++++++ .../util/TestMainframeFTPClientUtils.java | 157 ++++++++++++++++++ 16 files changed, 799 insertions(+), 15 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java create mode 100644 src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java create mode 100644 src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java create mode 100644 src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java diff --git a/src/docs/man/mainframe-connection-args.txt b/src/docs/man/mainframe-connection-args.txt index 04e33c7d..4dd1f711 100644 --- a/src/docs/man/mainframe-connection-args.txt +++ b/src/docs/man/mainframe-connection-args.txt @@ -23,3 +23,9 @@ --dataset (partitioned dataset name):: Specify a partitioned dataset name + +--datasettype (data set type):: + Specify a dataset type. "s" for sequential, "p" for partitioned dataset (default), "g" for generational data group + +--tape (dataset on tape):: + Specify whether a dataset is on tape or not (true|false) \ No newline at end of file diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index ff962807..e14a0b7f 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.accumulo.AccumuloConstants; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; import org.apache.sqoop.util.CredentialsUtil; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.SqoopJsonUtil; @@ -282,6 +283,14 @@ public String toString() { @StoredAsProperty("mainframe.input.dataset.name") private String mainframeInputDatasetName; + // Dataset type for mainframe import tool + @StoredAsProperty("mainframe.input.dataset.type") + private String mainframeInputDatasetType; + + // Indicates if the data set is on tape to use different FTP parser + @StoredAsProperty("mainframe.input.dataset.tape") + private String mainframeInputDatasetTape; + // Accumulo home directory private String accumuloHome; // not serialized to metastore. // Zookeeper home directory @@ -1023,6 +1032,9 @@ private void initDefaults(Configuration baseConfiguration) { // Relaxed isolation will not enabled by default which is the behavior // of sqoop until now. this.relaxedIsolation = false; + + // set default mainframe data set type to partitioned data set + this.mainframeInputDatasetType = MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED; } /** @@ -2276,6 +2288,11 @@ public String getMergeKeyCol() { public void setMainframeInputDatasetName(String name) { mainframeInputDatasetName = name; tableName = name; + // may need to set this in the conf variable otherwise it gets lost. + } + + public void setMainframeInputDatasetType(String name) { + mainframeInputDatasetType = name; } /** @@ -2285,6 +2302,24 @@ public String getMainframeInputDatasetName() { return mainframeInputDatasetName; } + /* + * Return the mainframe dataset type + */ + public String getMainframeInputDatasetType() { + return mainframeInputDatasetType; + } + + // return whether the dataset is on tape + public Boolean getMainframeInputDatasetTape() { + if (mainframeInputDatasetTape == null) { return false; } + return Boolean.parseBoolean(mainframeInputDatasetTape); + } + + // sets whether the dataset is on tape + public void setMainframeInputDatasetTape(String txtIsFromTape) { + mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString(); + } + public static String getAccumuloHomeDefault() { // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override. String accumuloHome = System.getenv("ACCUMULO_HOME"); diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java index f8894353..ea54b07f 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -22,4 +22,15 @@ public class MainframeConfiguration public static final String MAINFRAME_INPUT_DATASET_NAME = "mapreduce.mainframe.input.dataset.name"; + public static final String MAINFRAME_INPUT_DATASET_TYPE + = "mapreduce.mainframe.input.dataset.type"; + public static final String MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL + = "s"; + public static final String MAINFRAME_INPUT_DATASET_TYPE_GDG + = "g"; + public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED + = "p"; + public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape"; + + public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser"; } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java index 7c368428..1f78384b 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; - import org.apache.sqoop.lib.SqoopRecord; import org.apache.sqoop.util.MainframeFTPClientUtils; @@ -52,9 +51,20 @@ public void initialize(InputSplit inputSplit, Configuration conf = getConfiguration(); ftp = MainframeFTPClientUtils.getFTPConnection(conf); if (ftp != null) { - String dsName - = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); - ftp.changeWorkingDirectory("'" + dsName + "'"); + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + MainframeDatasetPath p = null; + try { + p = new MainframeDatasetPath(dsName,conf); + } catch (Exception e) { + LOG.error(e.getMessage()); + LOG.error("MainframeDatasetPath helper class incorrectly initialised"); + e.printStackTrace(); + } + if (dsType != null && p != null) { + dsName = p.getMainframeDatasetFolder(); + } + ftp.changeWorkingDirectory("'" + dsName + "'"); } } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java index 045bbd21..55c0fdcf 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java @@ -60,9 +60,13 @@ public List getSplits(JobContext job) throws IOException { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); LOG.info("Datasets to transfer from: " + dsName); + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + LOG.info("Dataset type: " + dsType); + String dsTape = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE); + LOG.info("Dataset on tape?: " + dsTape); List datasets = retrieveDatasets(dsName, conf); if (datasets.isEmpty()) { - throw new IOException ("No sequential datasets retrieved from " + dsName); + throw new IOException ("No datasets retrieved from " + dsName); } else { int count = datasets.size(); int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job)); diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java new file mode 100644 index 00000000..56c70d60 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java @@ -0,0 +1,117 @@ +/** + * + */ +package org.apache.sqoop.mapreduce.mainframe; + +import java.text.ParseException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * This class determines resolves FTP folder paths + * given the data set type and the data set name + */ + +public class MainframeDatasetPath { + + private static final Log LOG = + LogFactory.getLog(MainframeDatasetPath.class); + private String datasetName; + private MainframeDatasetType datasetType; + private String dsFolderName = null; + private String dsFileName = null; + + // default constructor + public MainframeDatasetPath(){} + + // constructor that takes dataset name job configuration + public MainframeDatasetPath(String dsName, Configuration conf) throws Exception { + String inputName + = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + // this should always be true + assert(inputName.equals(dsName)); + LOG.info("Datasets to transfer from: " + dsName); + this.datasetName = dsName; + // initialise dataset type + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + this.setMainframeDatasetType(dsType); + initialisePaths(); + } + + public MainframeDatasetPath(String dsName, MainframeDatasetType dsType) { + this.setMainframeDatasetName(dsName); + this.setMainframeDatasetType(dsType); + initialisePaths(); + } + + public MainframeDatasetPath(String dsName, String dsType) throws ParseException { + this.setMainframeDatasetName(dsName); + this.setMainframeDatasetType(dsType); + initialisePaths(); + } + + public void initialisePaths() throws IllegalStateException { + if (this.datasetName == null || this.datasetType == null) { + throw new IllegalStateException("Please set data set name and type first."); + } + boolean isSequentialDs = this.datasetType.equals(MainframeDatasetType.SEQUENTIAL); + boolean isGDG = this.datasetType.equals(MainframeDatasetType.GDG); + LOG.info(String.format("dsName: %s", this.datasetName)); + LOG.info(String.format("isSequentialDs: %s isGDG: %s", isSequentialDs, isGDG)); + if (isSequentialDs) + { + // truncate the tailing string until the dot + // usually dataset qualifiers are dots. eg blah1.blah2.blah3 + // so in this case, we should return blah1.blah2 + int lastDotIndex = this.datasetName.lastIndexOf("."); + // if not found, it is probably in the root + if (lastDotIndex == -1) { this.datasetName = ""; } else { + // if found, return the truncated name + dsFolderName = this.datasetName.substring(0, lastDotIndex); + if (lastDotIndex + 1 < this.datasetName.length()) { + dsFileName = this.datasetName.substring(lastDotIndex + 1); + } + } + } else { + // GDG or PDS + dsFolderName = this.datasetName; + dsFileName = null; // handle parentheses parsing later + } + } + + // getters and setters + public MainframeDatasetType getMainframeDatasetType() { + return this.datasetType; + } + + public void setMainframeDatasetType(MainframeDatasetType dsType) { + this.datasetType = dsType; + } + + // overloaded setter to parse string + public void setMainframeDatasetType(String dsType) throws ParseException { + if (dsType.equals("s")) { this.datasetType = MainframeDatasetType.SEQUENTIAL; } + else if (dsType.equals("p")) { this.datasetType = MainframeDatasetType.PARTITIONED; } + else if (dsType.equals("g")) { this.datasetType = MainframeDatasetType.GDG; } + else { throw new ParseException(String.format("Invalid data set type specified: %s",dsType), 0); } + } + + public String getMainframeDatasetName() { + return this.datasetName; + } + + public void setMainframeDatasetName(String dsName) { + this.datasetName = dsName; + } + + public String getMainframeDatasetFolder() { + return this.dsFolderName; + } + + public String getMainframeDatasetFileName() { + // returns filename in the folder and null if it is a GDG as this requires a file listing + return dsFileName; + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java new file mode 100644 index 00000000..bcaa56d1 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java @@ -0,0 +1,19 @@ +package org.apache.sqoop.mapreduce.mainframe; + +/******** + * Basic enumeration for Mainframe data set types + ********/ + +public enum MainframeDatasetType { + GDG, PARTITIONED, SEQUENTIAL; + + @Override + public String toString() { + switch(this) { + case GDG: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG; + case PARTITIONED: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED; + case SEQUENTIAL: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL; + default: throw new IllegalArgumentException(); + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java new file mode 100644 index 00000000..661b1cca --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java @@ -0,0 +1,93 @@ +package org.apache.sqoop.mapreduce.mainframe; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTPClientConfig; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl; + +public class MainframeFTPFileEntryParser extends ConfigurableFTPFileEntryParserImpl { + + /* Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname + * xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED + * x31167 Tape UNLOAD.EDH.UNLOADT + * xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1 + */ + + // match Unit and Dsname + private static String REGEX = "^\\S+\\s+(\\S+)\\s+.*?\\s+(\\S+)$"; + // match Unit, BlkSz, Dsorg, DsName + private static String NON_TAPE_REGEX = "^\\S+\\s+(\\S+)\\s+.*?(\\d+)\\s+(\\S+)\\s+(\\S+)$"; + static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm"; + //= "MMM d yyyy"; //Nov 9 2001 + + static final String DEFAULT_RECENT_DATE_FORMAT = "MMM d HH:mm"; //Nov 9 20:06 + + private static String tapeUnitString = "Tape"; + private static String unitHeaderString = "Unit"; + private static String dsNameHeaderString = "Dsname"; + private static String dsOrgPDSString = "PO"; + private static String dsOrgPDSExtendedString = "PO-E"; + private static String dsOrgSeqString = "PS"; + private static Pattern nonTapePattern = Pattern.compile(NON_TAPE_REGEX); + + private static final Log LOG = LogFactory.getLog(MainframeFTPFileEntryParser.class.getName()); + + public MainframeFTPFileEntryParser() { + super(REGEX); + LOG.info("MainframeFTPFileEntryParser constructor"); + } + + public MainframeFTPFileEntryParser(String regex) { + super(REGEX); + } + + public FTPFile parseFTPEntry(String entry) { + LOG.info("parseFTPEntry: "+entry); + + if (matches(entry)) { + String unit = group(1); + String dsName = group(2); + LOG.info("parseFTPEntry match: "+group(1)+" "+group(2)); + if (unit.equals(unitHeaderString) && dsName.equals(dsNameHeaderString)) { + return null; + } + FTPFile file = new FTPFile(); + file.setRawListing(entry); + file.setName(dsName); + // match non tape values + if (!unit.equals("Tape")) { + Matcher m = nonTapePattern.matcher(entry); + // get sizes + if (m.matches()) { + // PO/PO-E = PDS = directory + // PS = Sequential data set = file + String size = m.group(2); + String dsOrg = m.group(3); + file.setSize(Long.parseLong(size)); + LOG.info(String.format("Non tape match: %s, %s, %s", file.getName(), file.getSize(), dsOrg)); + if (dsOrg.equals(dsOrgPDSString) || dsOrg.equals(dsOrgPDSExtendedString)) { + file.setType(FTPFile.DIRECTORY_TYPE); + } + if (dsOrg.equals(dsOrgSeqString)) { + file.setType(FTPFile.FILE_TYPE); + } + } + } else { + LOG.info(String.format("Tape match: %s, %s", file.getName(), unit)); + file.setType(FTPFile.FILE_TYPE); + } + return file; + } + return null; + } + + @Override + protected FTPClientConfig getDefaultConfiguration() { + return new FTPClientConfig(FTPClientConfig.SYST_MVS, + DEFAULT_DATE_FORMAT, null, null, null, null); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java index 16c2f75c..f222dc8f 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java @@ -59,6 +59,12 @@ protected void configureInputFormat(Job job, String tableName, job.getConfiguration().set( MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, options.getMainframeInputDatasetName()); + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE, + options.getMainframeInputDatasetType()); + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE, + options.getMainframeInputDatasetTape().toString()); } @Override diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java index bc4ae6c0..0cb91dbc 100644 --- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java +++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.ToolRunner; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; @@ -36,6 +37,8 @@ public class MainframeImportTool extends ImportTool { private static final Log LOG = LogFactory.getLog(MainframeImportTool.class.getName()); public static final String DS_ARG = "dataset"; + public static final String DS_TYPE_ARG = "datasettype"; + public static final String DS_TAPE_ARG = "tape"; public MainframeImportTool() { super("import-mainframe", false); @@ -59,6 +62,14 @@ protected RelatedOptions getImportOptions() { .hasArg().withDescription("HDFS plain file destination") .withLongOpt(TARGET_DIR_ARG) .create()); + importOpts.addOption(OptionBuilder.withArgName("Dataset type") + .hasArg().withDescription("Dataset type (p=partitioned data set|s=sequential data set|g=GDG)") + .withLongOpt(DS_TYPE_ARG) + .create()); + importOpts.addOption(OptionBuilder.withArgName("Dataset is on tape") + .hasArg().withDescription("Dataset is on tape (true|false)") + .withLongOpt(DS_TAPE_ARG) + .create()); addValidationOpts(importOpts); @@ -142,6 +153,20 @@ public void applyOptions(CommandLine in, SqoopOptions out) if (in.hasOption(DS_ARG)) { out.setMainframeInputDatasetName(in.getOptionValue(DS_ARG)); } + + if (in.hasOption(DS_TYPE_ARG)) { + out.setMainframeInputDatasetType(in.getOptionValue(DS_TYPE_ARG)); + } else { + // set default data set type to partitioned + out.setMainframeInputDatasetType(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED); + } + + if (in.hasOption(DS_TAPE_ARG)) { + out.setMainframeInputDatasetTape(in.getOptionValue(DS_TAPE_ARG)); + } else { + // set default tape value to false + out.setMainframeInputDatasetTape("false"); + } } @Override @@ -151,6 +176,19 @@ protected void validateImportOptions(SqoopOptions options) throw new InvalidOptionsException( "--" + DS_ARG + " is required for mainframe import. " + HELP_STR); } + String dsType = options.getMainframeInputDatasetType(); + LOG.info("Dataset type: "+dsType); + if (!dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED) + && !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL) + && !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG)) { + throw new InvalidOptionsException( + "--" + DS_TYPE_ARG + " specified is invalid. " + HELP_STR); + } + Boolean dsTape = options.getMainframeInputDatasetTape(); + if (dsTape == null && dsTape != true && dsTape != false) { + throw new InvalidOptionsException( + "--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR); + } super.validateImportOptions(options); } } diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index eae7a633..f61b9838 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -29,16 +29,16 @@ import org.apache.commons.net.ftp.FTPClientConfig; import org.apache.commons.net.ftp.FTPConnectionClosedException; import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPListParseEngine; import org.apache.commons.net.ftp.FTPReply; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; - import org.apache.sqoop.mapreduce.JobBase; import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; +import org.apache.sqoop.mapreduce.mainframe.MainframeDatasetPath; /** * Utility methods used when accessing a mainframe server through FTP client. @@ -52,20 +52,78 @@ public final class MainframeFTPClientUtils { private MainframeFTPClientUtils() { } - public static List listSequentialDatasets( - String pdsName, Configuration conf) throws IOException { + public static List listSequentialDatasets(String pdsName, Configuration conf) throws IOException { List datasets = new ArrayList(); + String dsName = pdsName; + String fileName = ""; + MainframeDatasetPath p = null; + try { + p = new MainframeDatasetPath(dsName,conf); + } catch (Exception e) { + LOG.error(e.getMessage()); + LOG.error("MainframeDatasetPath helper class incorrectly initialised"); + e.printStackTrace(); + } + String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE); + boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE)); + boolean isSequentialDs = false; + boolean isGDG = false; + if (dsType != null && p != null) { + isSequentialDs = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL); + isGDG = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG); + pdsName = p.getMainframeDatasetFolder(); + fileName = p.getMainframeDatasetFileName(); + } FTPClient ftp = null; try { ftp = getFTPConnection(conf); if (ftp != null) { ftp.changeWorkingDirectory("'" + pdsName + "'"); - FTPFile[] ftpFiles = ftp.listFiles(); - for (FTPFile f : ftpFiles) { - if (f.getType() == FTPFile.FILE_TYPE) { - datasets.add(f.getName()); - } + FTPFile[] ftpFiles = null; + if (isTape) { + FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, ""); + List listing = new ArrayList(); + while(parser.hasNext()) { + FTPFile[] files = parser.getNext(25); + for (FTPFile file : files) { + if (file != null) { + listing.add(file); + LOG.info(String.format("Name: %s Type: %s", file.getName(), file.getType())); + } + // skip nulls returned from parser + } + ftpFiles = new FTPFile[listing.size()]; + for (int i = 0;i < listing.size(); i++) { + ftpFiles[i] = listing.get(i); + } + LOG.info("Files returned from mainframe parser:-"); + for (FTPFile f : ftpFiles) { + LOG.info(String.format("Name: %s, Type: %s",f.getName(),f.getType())); + } + } } + else { ftpFiles = ftp.listFiles(); } + if (!isGDG) { + for (FTPFile f : ftpFiles) { + LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType())); + if (f.getType() == FTPFile.FILE_TYPE) { + // only add datasets if default behaviour of partitioned data sets + // or if it is a sequential data set, only add if the file name matches exactly + if (!isSequentialDs || isSequentialDs && f.getName().equals(fileName) && !fileName.equals("")) { + datasets.add(f.getName()); + } + } + } + } else { + LOG.info("GDG branch. File list:-"); + for (FTPFile f : ftpFiles) { + LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType())); + } + if (ftpFiles.length > 0 && ftpFiles[ftpFiles.length-1].getType() == FTPFile.FILE_TYPE) { + // for GDG - add the last file in the collection + datasets.add(ftpFiles[ftpFiles.length-1].getName()); + } + } } } catch(IOException ioe) { throw new IOException ("Could not list datasets from " + pdsName + ":" @@ -144,6 +202,7 @@ public static FTPClient getFTPConnection(Configuration conf) ftp.setFileType(FTP.ASCII_FILE_TYPE); // Use passive mode as default. ftp.enterLocalPassiveMode(); + LOG.info("System type detected: " + ftp.getSystemType()); } catch(IOException ioe) { if (ftp != null && ftp.isConnected()) { try { diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java new file mode 100644 index 00000000..3492fee0 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java @@ -0,0 +1,86 @@ +package org.apache.sqoop.mapreduce.mainframe; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.text.ParseException; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestMainframeDatasetPath { + + @Test + public void testCanGetFileNameOnSequential() throws Exception { + String dsName = "a.b.c.d"; + String expectedFileName = "d"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL); + assertEquals(expectedFileName,p.getMainframeDatasetFileName()); + } + + @Test + public void testCanGetFolderNameOnSequential() throws Exception { + String dsName = "a.b.c.d"; + String expectedFolderName = "a.b.c"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL); + assertEquals(expectedFolderName,p.getMainframeDatasetFolder()); + } + + @Test + public void testCanGetFolderNameOnGDG() throws Exception { + String dsName = "a.b.c.d"; + String expectedFolderName = "a.b.c.d"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG); + assertEquals(expectedFolderName,p.getMainframeDatasetFolder()); + } + + @Test + public void testFileNameIsNullOnGDG() throws Exception { + String dsName = "a.b.c.d"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG); + assertEquals(null,p.getMainframeDatasetFileName()); + } + + @Test + public void testConstructor1() throws Exception { + String dsName = "a.b.c.d"; + String expectedFolderName = "a.b.c"; + String expectedFileName = "d"; + Configuration conf = new Configuration(); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL); + MainframeDatasetPath p = new MainframeDatasetPath(dsName,conf); + assertEquals(dsName,p.getMainframeDatasetName()); + assertEquals(MainframeDatasetType.SEQUENTIAL.toString(),p.getMainframeDatasetType().toString()); + assertEquals(expectedFolderName, p.getMainframeDatasetFolder()); + assertEquals(expectedFileName, p.getMainframeDatasetFileName()); + } + + @Test + public void testConstructor2() throws Exception { + String dsName = "a.b.c.d"; + String expectedFolderName = "a.b.c.d"; + String dsType = "p"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,dsType); + assertEquals(expectedFolderName,p.getMainframeDatasetFolder()); + } + + @Test + public void testConstructor3() { + String dsName = "a.b.c.d"; + String expectedFolderName = "a.b.c.d"; + MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeDatasetType.GDG); + assertEquals(expectedFolderName, p.getMainframeDatasetFolder()); + } + + @Test + public void testUninitialisedThrowsException() { + MainframeDatasetPath p = new MainframeDatasetPath(); + try { + p.initialisePaths(); + } catch (IllegalStateException e) { + assertNotNull(e); + assertEquals("Please set data set name and type first.",e.getMessage()); + } + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java new file mode 100644 index 00000000..8926bb91 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java @@ -0,0 +1,64 @@ +package org.apache.sqoop.mapreduce.mainframe; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.net.ftp.FTPFile; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMainframeFTPFileEntryParser { + static List listing; + static MainframeFTPFileEntryParser parser2; + @BeforeClass + public static void setUpBeforeClass() throws Exception { + /* Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname + * xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED +x31167 Tape UNLOAD.EDH.UNLOADT +xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1 +xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD2 +xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD3 + */ + listing = new ArrayList(); + listing.add("Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname"); + listing.add("xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED"); + listing.add("x31167 Tape UNLOAD.EDH.UNLOADT"); + listing.add("xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1"); + listing.add("xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD2"); + listing.add("xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD3"); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testMainframeFTPFileEntryParserString() { + MainframeFTPFileEntryParser parser = new MainframeFTPFileEntryParser(); + assert(parser != null); + } + + @Test + public void testParseFTPEntry() { + parser2 = new MainframeFTPFileEntryParser(); + int i = 0; + for (String j : listing) { + FTPFile file = parser2.parseFTPEntry(j); + if (file != null) { + i++; + } + } + assert(i == listing.size()-1); + } +} diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java index ecaa8d50..4c8f5841 100644 --- a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java +++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java @@ -23,7 +23,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.junit.Before; import org.junit.Test; diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java index d91bbbef..3e502d0e 100644 --- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java +++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java @@ -24,7 +24,9 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.Sqoop; import org.apache.sqoop.cli.RelatedOptions; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -75,6 +77,7 @@ public void testGetImportOptions() throws SecurityException, assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME)); assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG)); assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG)); + assertTrue(rOptions.hasOption(MainframeImportTool.DS_TYPE_ARG)); } @Test @@ -101,4 +104,78 @@ public void testNotApplyOptions() throws ParseException, assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName"); assertNull(sqoopOption.getTableName()); } + + @Test + public void testDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG); + } + + @Test + public void testDefaultDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException { + String[] args = new String[] {}; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED); + } + + @Test + public void testInvalidDataSetTypeOptionThrowsException() { + // validateImportOptions gets called from Sqoop.run() function + String[] args = new String[] { "--dataset", "mydataset","--datasettype", "fjfdksjjf" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + try { + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + mfImportTool.validateImportOptions(sqoopOption); + String dsType = sqoopOption.getMainframeInputDatasetType(); + System.out.println("dsType: "+dsType); + } catch (InvalidOptionsException e) { + String errorMessage = "--datasettype specified is invalid."; + e.printStackTrace(); + assert(e.getMessage().contains(errorMessage)); + } catch (ParseException e) { + e.printStackTrace(); + assertFalse(e != null); + } + } + + @Test + public void testTapeOptionIsSet() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--tape", "true" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + Boolean isTape = sqoopOption.getMainframeInputDatasetTape(); + assert(isTape != null && isTape.toString().equals("true")); + } + @Test + public void testTapeOptionDefaultIsSet() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + Boolean isTape = sqoopOption.getMainframeInputDatasetTape(); + assert(isTape != null && isTape.toString().equals("false")); + } + @Test + public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOptionsException { + String[] args = new String[] { "--dataset", "mydatasetname", "--tape", "invalidvalue" }; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + mfImportTool.validateImportOptions(sqoopOption); + Boolean isTape = sqoopOption.getMainframeInputDatasetTape(); + assert(isTape != null && isTape.toString().equals("false")); + } } diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java index 6b895021..d87c75df 100644 --- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -24,11 +24,14 @@ import java.io.IOException; +import java.util.List; import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.sqoop.mapreduce.JobBase; import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -155,4 +158,158 @@ public void testNotListDatasets() { ioe.toString()); } } + + @Test + public void testListSequentialDatasets() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true); + FTPFile file1 = new FTPFile(); + file1.setName("blah1"); + file1.setType(FTPFile.FILE_TYPE); + FTPFile file2 = new FTPFile(); + file2.setName("blah2"); + file2.setType(FTPFile.FILE_TYPE); + when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2}); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + List files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf); + Assert.assertEquals(1,files.size()); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } + } + + @Test + public void testListEmptySequentialDatasets() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true); + FTPFile file1 = new FTPFile(); + file1.setName("blah0"); + file1.setType(FTPFile.FILE_TYPE); + FTPFile file2 = new FTPFile(); + file2.setName("blah2"); + file2.setType(FTPFile.FILE_TYPE); + when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + List files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); + Assert.assertEquals(0,files.size()); + } catch (IOException ioe) { + fail("No IOException should be thrown!"); + } + } + + @Test + public void testTrailingDotSequentialDatasets() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.changeWorkingDirectory("'a.b.c.blah1'")).thenThrow(new IOException("Folder not found")); + FTPFile file1 = new FTPFile(); + file1.setName("blah1"); + file1.setType(FTPFile.FILE_TYPE); + FTPFile file2 = new FTPFile(); + file2.setName("blah2"); + file2.setType(FTPFile.FILE_TYPE); + when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1."); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + List files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); + Assert.assertEquals(0,files.size()); + } catch (IOException ioe) { + String ioeString = ioe.getMessage(); + Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString); + } + } + + @Test + public void testGdgGetLatest() { + try { + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(200); + when(mockFTPClient.changeWorkingDirectory("'a.b.c'")).thenReturn(true); + FTPFile file1 = new FTPFile(); + file1.setName("G0100V00"); + file1.setType(FTPFile.FILE_TYPE); + FTPFile file2 = new FTPFile(); + file2.setName("G0101V00"); + file2.setType(FTPFile.FILE_TYPE); + when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2}); + } catch (IOException e) { + fail("No IOException should be thrown!"); + } + + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); + conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d"); + // set the password in the secure credentials object + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + "pssword".getBytes()); + + try { + String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); + List files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); + Assert.assertEquals("G0101V00", files.get(0)); + Assert.assertEquals(1,files.size()); + } catch (IOException ioe) { + String ioeString = ioe.getMessage(); + Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString); + } + } }