5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 06:21:11 +08:00

SQOOP-2938: Mainframe import module extension to support data sets on tape (Chris Teoh via Kate Ting)

This commit is contained in:
Kate Ting 2016-06-06 17:31:17 -07:00
parent 28bbe4d461
commit b302d89fae
16 changed files with 799 additions and 15 deletions

View File

@ -23,3 +23,9 @@
--dataset (partitioned dataset name)::
Specify a partitioned dataset name
--datasettype (data set type)::
Specify a dataset type. "s" for sequential, "p" for partitioned dataset (default), "g" for generational data group
--tape (dataset on tape)::
Specify whether a dataset is on tape or not (true|false)

View File

@ -33,6 +33,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.accumulo.AccumuloConstants;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.apache.sqoop.util.CredentialsUtil;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.SqoopJsonUtil;
@ -282,6 +283,14 @@ public String toString() {
@StoredAsProperty("mainframe.input.dataset.name")
private String mainframeInputDatasetName;
// Dataset type for mainframe import tool
@StoredAsProperty("mainframe.input.dataset.type")
private String mainframeInputDatasetType;
// Indicates if the data set is on tape to use different FTP parser
@StoredAsProperty("mainframe.input.dataset.tape")
private String mainframeInputDatasetTape;
// Accumulo home directory
private String accumuloHome; // not serialized to metastore.
// Zookeeper home directory
@ -1023,6 +1032,9 @@ private void initDefaults(Configuration baseConfiguration) {
// Relaxed isolation will not enabled by default which is the behavior
// of sqoop until now.
this.relaxedIsolation = false;
// set default mainframe data set type to partitioned data set
this.mainframeInputDatasetType = MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED;
}
/**
@ -2276,6 +2288,11 @@ public String getMergeKeyCol() {
public void setMainframeInputDatasetName(String name) {
mainframeInputDatasetName = name;
tableName = name;
// may need to set this in the conf variable otherwise it gets lost.
}
public void setMainframeInputDatasetType(String name) {
mainframeInputDatasetType = name;
}
/**
@ -2285,6 +2302,24 @@ public String getMainframeInputDatasetName() {
return mainframeInputDatasetName;
}
/*
* Return the mainframe dataset type
*/
public String getMainframeInputDatasetType() {
return mainframeInputDatasetType;
}
// return whether the dataset is on tape
public Boolean getMainframeInputDatasetTape() {
if (mainframeInputDatasetTape == null) { return false; }
return Boolean.parseBoolean(mainframeInputDatasetTape);
}
// sets whether the dataset is on tape
public void setMainframeInputDatasetTape(String txtIsFromTape) {
mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString();
}
public static String getAccumuloHomeDefault() {
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
String accumuloHome = System.getenv("ACCUMULO_HOME");

View File

@ -22,4 +22,15 @@ public class MainframeConfiguration
public static final String MAINFRAME_INPUT_DATASET_NAME
= "mapreduce.mainframe.input.dataset.name";
public static final String MAINFRAME_INPUT_DATASET_TYPE
= "mapreduce.mainframe.input.dataset.type";
public static final String MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL
= "s";
public static final String MAINFRAME_INPUT_DATASET_TYPE_GDG
= "g";
public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED
= "p";
public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
}

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.util.MainframeFTPClientUtils;
@ -52,9 +51,20 @@ public void initialize(InputSplit inputSplit,
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
if (ftp != null) {
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
ftp.changeWorkingDirectory("'" + dsName + "'");
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
MainframeDatasetPath p = null;
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
if (dsType != null && p != null) {
dsName = p.getMainframeDatasetFolder();
}
ftp.changeWorkingDirectory("'" + dsName + "'");
}
}

View File

@ -60,9 +60,13 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
LOG.info("Datasets to transfer from: " + dsName);
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
LOG.info("Dataset type: " + dsType);
String dsTape = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE);
LOG.info("Dataset on tape?: " + dsTape);
List<String> datasets = retrieveDatasets(dsName, conf);
if (datasets.isEmpty()) {
throw new IOException ("No sequential datasets retrieved from " + dsName);
throw new IOException ("No datasets retrieved from " + dsName);
} else {
int count = datasets.size();
int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job));

View File

@ -0,0 +1,117 @@
/**
*
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.text.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* This class determines resolves FTP folder paths
* given the data set type and the data set name
*/
public class MainframeDatasetPath {
private static final Log LOG =
LogFactory.getLog(MainframeDatasetPath.class);
private String datasetName;
private MainframeDatasetType datasetType;
private String dsFolderName = null;
private String dsFileName = null;
// default constructor
public MainframeDatasetPath(){}
// constructor that takes dataset name job configuration
public MainframeDatasetPath(String dsName, Configuration conf) throws Exception {
String inputName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
// this should always be true
assert(inputName.equals(dsName));
LOG.info("Datasets to transfer from: " + dsName);
this.datasetName = dsName;
// initialise dataset type
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
this.setMainframeDatasetType(dsType);
initialisePaths();
}
public MainframeDatasetPath(String dsName, MainframeDatasetType dsType) {
this.setMainframeDatasetName(dsName);
this.setMainframeDatasetType(dsType);
initialisePaths();
}
public MainframeDatasetPath(String dsName, String dsType) throws ParseException {
this.setMainframeDatasetName(dsName);
this.setMainframeDatasetType(dsType);
initialisePaths();
}
public void initialisePaths() throws IllegalStateException {
if (this.datasetName == null || this.datasetType == null) {
throw new IllegalStateException("Please set data set name and type first.");
}
boolean isSequentialDs = this.datasetType.equals(MainframeDatasetType.SEQUENTIAL);
boolean isGDG = this.datasetType.equals(MainframeDatasetType.GDG);
LOG.info(String.format("dsName: %s", this.datasetName));
LOG.info(String.format("isSequentialDs: %s isGDG: %s", isSequentialDs, isGDG));
if (isSequentialDs)
{
// truncate the tailing string until the dot
// usually dataset qualifiers are dots. eg blah1.blah2.blah3
// so in this case, we should return blah1.blah2
int lastDotIndex = this.datasetName.lastIndexOf(".");
// if not found, it is probably in the root
if (lastDotIndex == -1) { this.datasetName = ""; } else {
// if found, return the truncated name
dsFolderName = this.datasetName.substring(0, lastDotIndex);
if (lastDotIndex + 1 < this.datasetName.length()) {
dsFileName = this.datasetName.substring(lastDotIndex + 1);
}
}
} else {
// GDG or PDS
dsFolderName = this.datasetName;
dsFileName = null; // handle parentheses parsing later
}
}
// getters and setters
public MainframeDatasetType getMainframeDatasetType() {
return this.datasetType;
}
public void setMainframeDatasetType(MainframeDatasetType dsType) {
this.datasetType = dsType;
}
// overloaded setter to parse string
public void setMainframeDatasetType(String dsType) throws ParseException {
if (dsType.equals("s")) { this.datasetType = MainframeDatasetType.SEQUENTIAL; }
else if (dsType.equals("p")) { this.datasetType = MainframeDatasetType.PARTITIONED; }
else if (dsType.equals("g")) { this.datasetType = MainframeDatasetType.GDG; }
else { throw new ParseException(String.format("Invalid data set type specified: %s",dsType), 0); }
}
public String getMainframeDatasetName() {
return this.datasetName;
}
public void setMainframeDatasetName(String dsName) {
this.datasetName = dsName;
}
public String getMainframeDatasetFolder() {
return this.dsFolderName;
}
public String getMainframeDatasetFileName() {
// returns filename in the folder and null if it is a GDG as this requires a file listing
return dsFileName;
}
}

View File

@ -0,0 +1,19 @@
package org.apache.sqoop.mapreduce.mainframe;
/********
* Basic enumeration for Mainframe data set types
********/
public enum MainframeDatasetType {
GDG, PARTITIONED, SEQUENTIAL;
@Override
public String toString() {
switch(this) {
case GDG: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG;
case PARTITIONED: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED;
case SEQUENTIAL: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL;
default: throw new IllegalArgumentException();
}
}
}

View File

@ -0,0 +1,93 @@
package org.apache.sqoop.mapreduce.mainframe;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClientConfig;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl;
public class MainframeFTPFileEntryParser extends ConfigurableFTPFileEntryParserImpl {
/* Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
* xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED
* x31167 Tape UNLOAD.EDH.UNLOADT
* xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1
*/
// match Unit and Dsname
private static String REGEX = "^\\S+\\s+(\\S+)\\s+.*?\\s+(\\S+)$";
// match Unit, BlkSz, Dsorg, DsName
private static String NON_TAPE_REGEX = "^\\S+\\s+(\\S+)\\s+.*?(\\d+)\\s+(\\S+)\\s+(\\S+)$";
static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm";
//= "MMM d yyyy"; //Nov 9 2001
static final String DEFAULT_RECENT_DATE_FORMAT = "MMM d HH:mm"; //Nov 9 20:06
private static String tapeUnitString = "Tape";
private static String unitHeaderString = "Unit";
private static String dsNameHeaderString = "Dsname";
private static String dsOrgPDSString = "PO";
private static String dsOrgPDSExtendedString = "PO-E";
private static String dsOrgSeqString = "PS";
private static Pattern nonTapePattern = Pattern.compile(NON_TAPE_REGEX);
private static final Log LOG = LogFactory.getLog(MainframeFTPFileEntryParser.class.getName());
public MainframeFTPFileEntryParser() {
super(REGEX);
LOG.info("MainframeFTPFileEntryParser constructor");
}
public MainframeFTPFileEntryParser(String regex) {
super(REGEX);
}
public FTPFile parseFTPEntry(String entry) {
LOG.info("parseFTPEntry: "+entry);
if (matches(entry)) {
String unit = group(1);
String dsName = group(2);
LOG.info("parseFTPEntry match: "+group(1)+" "+group(2));
if (unit.equals(unitHeaderString) && dsName.equals(dsNameHeaderString)) {
return null;
}
FTPFile file = new FTPFile();
file.setRawListing(entry);
file.setName(dsName);
// match non tape values
if (!unit.equals("Tape")) {
Matcher m = nonTapePattern.matcher(entry);
// get sizes
if (m.matches()) {
// PO/PO-E = PDS = directory
// PS = Sequential data set = file
String size = m.group(2);
String dsOrg = m.group(3);
file.setSize(Long.parseLong(size));
LOG.info(String.format("Non tape match: %s, %s, %s", file.getName(), file.getSize(), dsOrg));
if (dsOrg.equals(dsOrgPDSString) || dsOrg.equals(dsOrgPDSExtendedString)) {
file.setType(FTPFile.DIRECTORY_TYPE);
}
if (dsOrg.equals(dsOrgSeqString)) {
file.setType(FTPFile.FILE_TYPE);
}
}
} else {
LOG.info(String.format("Tape match: %s, %s", file.getName(), unit));
file.setType(FTPFile.FILE_TYPE);
}
return file;
}
return null;
}
@Override
protected FTPClientConfig getDefaultConfiguration() {
return new FTPClientConfig(FTPClientConfig.SYST_MVS,
DEFAULT_DATE_FORMAT, null, null, null, null);
}
}

View File

@ -59,6 +59,12 @@ protected void configureInputFormat(Job job, String tableName,
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,
options.getMainframeInputDatasetName());
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,
options.getMainframeInputDatasetType());
job.getConfiguration().set(
MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
options.getMainframeInputDatasetTape().toString());
}
@Override

View File

@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
@ -36,6 +37,8 @@ public class MainframeImportTool extends ImportTool {
private static final Log LOG
= LogFactory.getLog(MainframeImportTool.class.getName());
public static final String DS_ARG = "dataset";
public static final String DS_TYPE_ARG = "datasettype";
public static final String DS_TAPE_ARG = "tape";
public MainframeImportTool() {
super("import-mainframe", false);
@ -59,6 +62,14 @@ protected RelatedOptions getImportOptions() {
.hasArg().withDescription("HDFS plain file destination")
.withLongOpt(TARGET_DIR_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("Dataset type")
.hasArg().withDescription("Dataset type (p=partitioned data set|s=sequential data set|g=GDG)")
.withLongOpt(DS_TYPE_ARG)
.create());
importOpts.addOption(OptionBuilder.withArgName("Dataset is on tape")
.hasArg().withDescription("Dataset is on tape (true|false)")
.withLongOpt(DS_TAPE_ARG)
.create());
addValidationOpts(importOpts);
@ -142,6 +153,20 @@ public void applyOptions(CommandLine in, SqoopOptions out)
if (in.hasOption(DS_ARG)) {
out.setMainframeInputDatasetName(in.getOptionValue(DS_ARG));
}
if (in.hasOption(DS_TYPE_ARG)) {
out.setMainframeInputDatasetType(in.getOptionValue(DS_TYPE_ARG));
} else {
// set default data set type to partitioned
out.setMainframeInputDatasetType(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
}
if (in.hasOption(DS_TAPE_ARG)) {
out.setMainframeInputDatasetTape(in.getOptionValue(DS_TAPE_ARG));
} else {
// set default tape value to false
out.setMainframeInputDatasetTape("false");
}
}
@Override
@ -151,6 +176,19 @@ protected void validateImportOptions(SqoopOptions options)
throw new InvalidOptionsException(
"--" + DS_ARG + " is required for mainframe import. " + HELP_STR);
}
String dsType = options.getMainframeInputDatasetType();
LOG.info("Dataset type: "+dsType);
if (!dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED)
&& !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL)
&& !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG)) {
throw new InvalidOptionsException(
"--" + DS_TYPE_ARG + " specified is invalid. " + HELP_STR);
}
Boolean dsTape = options.getMainframeInputDatasetTape();
if (dsTape == null && dsTape != true && dsTape != false) {
throw new InvalidOptionsException(
"--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR);
}
super.validateImportOptions(options);
}
}

View File

@ -29,16 +29,16 @@
import org.apache.commons.net.ftp.FTPClientConfig;
import org.apache.commons.net.ftp.FTPConnectionClosedException;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPListParseEngine;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.mapreduce.JobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.apache.sqoop.mapreduce.mainframe.MainframeDatasetPath;
/**
* Utility methods used when accessing a mainframe server through FTP client.
@ -52,20 +52,78 @@ public final class MainframeFTPClientUtils {
private MainframeFTPClientUtils() {
}
public static List<String> listSequentialDatasets(
String pdsName, Configuration conf) throws IOException {
public static List<String> listSequentialDatasets(String pdsName, Configuration conf) throws IOException {
List<String> datasets = new ArrayList<String>();
String dsName = pdsName;
String fileName = "";
MainframeDatasetPath p = null;
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE));
boolean isSequentialDs = false;
boolean isGDG = false;
if (dsType != null && p != null) {
isSequentialDs = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
isGDG = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
pdsName = p.getMainframeDatasetFolder();
fileName = p.getMainframeDatasetFileName();
}
FTPClient ftp = null;
try {
ftp = getFTPConnection(conf);
if (ftp != null) {
ftp.changeWorkingDirectory("'" + pdsName + "'");
FTPFile[] ftpFiles = ftp.listFiles();
for (FTPFile f : ftpFiles) {
if (f.getType() == FTPFile.FILE_TYPE) {
datasets.add(f.getName());
}
FTPFile[] ftpFiles = null;
if (isTape) {
FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
List<FTPFile> listing = new ArrayList<FTPFile>();
while(parser.hasNext()) {
FTPFile[] files = parser.getNext(25);
for (FTPFile file : files) {
if (file != null) {
listing.add(file);
LOG.info(String.format("Name: %s Type: %s", file.getName(), file.getType()));
}
// skip nulls returned from parser
}
ftpFiles = new FTPFile[listing.size()];
for (int i = 0;i < listing.size(); i++) {
ftpFiles[i] = listing.get(i);
}
LOG.info("Files returned from mainframe parser:-");
for (FTPFile f : ftpFiles) {
LOG.info(String.format("Name: %s, Type: %s",f.getName(),f.getType()));
}
}
}
else { ftpFiles = ftp.listFiles(); }
if (!isGDG) {
for (FTPFile f : ftpFiles) {
LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
if (f.getType() == FTPFile.FILE_TYPE) {
// only add datasets if default behaviour of partitioned data sets
// or if it is a sequential data set, only add if the file name matches exactly
if (!isSequentialDs || isSequentialDs && f.getName().equals(fileName) && !fileName.equals("")) {
datasets.add(f.getName());
}
}
}
} else {
LOG.info("GDG branch. File list:-");
for (FTPFile f : ftpFiles) {
LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
}
if (ftpFiles.length > 0 && ftpFiles[ftpFiles.length-1].getType() == FTPFile.FILE_TYPE) {
// for GDG - add the last file in the collection
datasets.add(ftpFiles[ftpFiles.length-1].getName());
}
}
}
} catch(IOException ioe) {
throw new IOException ("Could not list datasets from " + pdsName + ":"
@ -144,6 +202,7 @@ public static FTPClient getFTPConnection(Configuration conf)
ftp.setFileType(FTP.ASCII_FILE_TYPE);
// Use passive mode as default.
ftp.enterLocalPassiveMode();
LOG.info("System type detected: " + ftp.getSystemType());
} catch(IOException ioe) {
if (ftp != null && ftp.isConnected()) {
try {

View File

@ -0,0 +1,86 @@
package org.apache.sqoop.mapreduce.mainframe;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.text.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
public class TestMainframeDatasetPath {
@Test
public void testCanGetFileNameOnSequential() throws Exception {
String dsName = "a.b.c.d";
String expectedFileName = "d";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
assertEquals(expectedFileName,p.getMainframeDatasetFileName());
}
@Test
public void testCanGetFolderNameOnSequential() throws Exception {
String dsName = "a.b.c.d";
String expectedFolderName = "a.b.c";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
}
@Test
public void testCanGetFolderNameOnGDG() throws Exception {
String dsName = "a.b.c.d";
String expectedFolderName = "a.b.c.d";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
}
@Test
public void testFileNameIsNullOnGDG() throws Exception {
String dsName = "a.b.c.d";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
assertEquals(null,p.getMainframeDatasetFileName());
}
@Test
public void testConstructor1() throws Exception {
String dsName = "a.b.c.d";
String expectedFolderName = "a.b.c";
String expectedFileName = "d";
Configuration conf = new Configuration();
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
MainframeDatasetPath p = new MainframeDatasetPath(dsName,conf);
assertEquals(dsName,p.getMainframeDatasetName());
assertEquals(MainframeDatasetType.SEQUENTIAL.toString(),p.getMainframeDatasetType().toString());
assertEquals(expectedFolderName, p.getMainframeDatasetFolder());
assertEquals(expectedFileName, p.getMainframeDatasetFileName());
}
@Test
public void testConstructor2() throws Exception {
String dsName = "a.b.c.d";
String expectedFolderName = "a.b.c.d";
String dsType = "p";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,dsType);
assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
}
@Test
public void testConstructor3() {
String dsName = "a.b.c.d";
String expectedFolderName = "a.b.c.d";
MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeDatasetType.GDG);
assertEquals(expectedFolderName, p.getMainframeDatasetFolder());
}
@Test
public void testUninitialisedThrowsException() {
MainframeDatasetPath p = new MainframeDatasetPath();
try {
p.initialisePaths();
} catch (IllegalStateException e) {
assertNotNull(e);
assertEquals("Please set data set name and type first.",e.getMessage());
}
}
}

View File

@ -0,0 +1,64 @@
package org.apache.sqoop.mapreduce.mainframe;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.net.ftp.FTPFile;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestMainframeFTPFileEntryParser {
static List<String> listing;
static MainframeFTPFileEntryParser parser2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
/* Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
* xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED
x31167 Tape UNLOAD.EDH.UNLOADT
xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1
xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD2
xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD3
*/
listing = new ArrayList<String>();
listing.add("Volume Unit Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname");
listing.add("xxx300 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOADED");
listing.add("x31167 Tape UNLOAD.EDH.UNLOADT");
listing.add("xxx305 3390 2016/05/23 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD1");
listing.add("xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD2");
listing.add("xxx305 3390 2016/05/25 1 45 VB 2349 27998 PS UNLOAD.EDH.UNLOAD3");
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
@Test
public void testMainframeFTPFileEntryParserString() {
MainframeFTPFileEntryParser parser = new MainframeFTPFileEntryParser();
assert(parser != null);
}
@Test
public void testParseFTPEntry() {
parser2 = new MainframeFTPFileEntryParser();
int i = 0;
for (String j : listing) {
FTPFile file = parser2.parseFTPEntry(j);
if (file != null) {
i++;
}
}
assert(i == listing.size()-1);
}
}

View File

@ -23,7 +23,9 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Before;
import org.junit.Test;

View File

@ -24,7 +24,9 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -75,6 +77,7 @@ public void testGetImportOptions() throws SecurityException,
assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME));
assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG));
assertTrue(rOptions.hasOption(MainframeImportTool.DS_TYPE_ARG));
}
@Test
@ -101,4 +104,78 @@ public void testNotApplyOptions() throws ParseException,
assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName");
assertNull(sqoopOption.getTableName());
}
@Test
public void testDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
}
@Test
public void testDefaultDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException {
String[] args = new String[] {};
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
}
@Test
public void testInvalidDataSetTypeOptionThrowsException() {
// validateImportOptions gets called from Sqoop.run() function
String[] args = new String[] { "--dataset", "mydataset","--datasettype", "fjfdksjjf" };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
try {
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
mfImportTool.validateImportOptions(sqoopOption);
String dsType = sqoopOption.getMainframeInputDatasetType();
System.out.println("dsType: "+dsType);
} catch (InvalidOptionsException e) {
String errorMessage = "--datasettype specified is invalid.";
e.printStackTrace();
assert(e.getMessage().contains(errorMessage));
} catch (ParseException e) {
e.printStackTrace();
assertFalse(e != null);
}
}
@Test
public void testTapeOptionIsSet() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--tape", "true" };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
assert(isTape != null && isTape.toString().equals("true"));
}
@Test
public void testTapeOptionDefaultIsSet() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
assert(isTape != null && isTape.toString().equals("false"));
}
@Test
public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOptionsException {
String[] args = new String[] { "--dataset", "mydatasetname", "--tape", "invalidvalue" };
ToolOptions toolOptions = new ToolOptions();
SqoopOptions sqoopOption = new SqoopOptions();
mfImportTool.configureOptions(toolOptions);
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
mfImportTool.validateImportOptions(sqoopOption);
Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
assert(isTape != null && isTape.toString().equals("false"));
}
}

View File

@ -24,11 +24,14 @@
import java.io.IOException;
import java.util.List;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.mapreduce.JobBase;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -155,4 +158,158 @@ public void testNotListDatasets() {
ioe.toString());
}
}
@Test
public void testListSequentialDatasets() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true);
FTPFile file1 = new FTPFile();
file1.setName("blah1");
file1.setType(FTPFile.FILE_TYPE);
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
List<String> files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf);
Assert.assertEquals(1,files.size());
} catch (IOException ioe) {
fail("No IOException should be thrown!");
}
}
@Test
public void testListEmptySequentialDatasets() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true);
FTPFile file1 = new FTPFile();
file1.setName("blah0");
file1.setType(FTPFile.FILE_TYPE);
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
Assert.assertEquals(0,files.size());
} catch (IOException ioe) {
fail("No IOException should be thrown!");
}
}
@Test
public void testTrailingDotSequentialDatasets() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.changeWorkingDirectory("'a.b.c.blah1'")).thenThrow(new IOException("Folder not found"));
FTPFile file1 = new FTPFile();
file1.setName("blah1");
file1.setType(FTPFile.FILE_TYPE);
FTPFile file2 = new FTPFile();
file2.setName("blah2");
file2.setType(FTPFile.FILE_TYPE);
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1.");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
Assert.assertEquals(0,files.size());
} catch (IOException ioe) {
String ioeString = ioe.getMessage();
Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
}
}
@Test
public void testGdgGetLatest() {
try {
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
when(mockFTPClient.logout()).thenReturn(true);
when(mockFTPClient.isConnected()).thenReturn(false);
when(mockFTPClient.getReplyCode()).thenReturn(200);
when(mockFTPClient.changeWorkingDirectory("'a.b.c'")).thenReturn(true);
FTPFile file1 = new FTPFile();
file1.setName("G0100V00");
file1.setType(FTPFile.FILE_TYPE);
FTPFile file2 = new FTPFile();
file2.setName("G0101V00");
file2.setType(FTPFile.FILE_TYPE);
when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
} catch (IOException e) {
fail("No IOException should be thrown!");
}
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
try {
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
Assert.assertEquals("G0101V00", files.get(0));
Assert.assertEquals(1,files.size());
} catch (IOException ioe) {
String ioeString = ioe.getMessage();
Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
}
}
}