From 4a22691f45d7d66157ff6dfaa8fca5581e0a8955 Mon Sep 17 00:00:00 2001 From: Szabolcs Vasas Date: Thu, 13 Dec 2018 15:19:16 +0100 Subject: [PATCH] SQOOP-3237: Mainframe FTP transfer option to insert custom FTP commands prior to transfer (Chris Teoh via Szabolcs Vasas) --- src/docs/user/import-mainframe.txt | 14 ++ src/java/org/apache/sqoop/SqoopOptions.java | 13 ++ .../mainframe/MainframeConfiguration.java | 2 + .../mainframe/MainframeImportJob.java | 6 + .../sqoop/tool/MainframeImportTool.java | 8 ++ .../sqoop/util/MainframeFTPClientUtils.java | 26 ++++ .../sqoop/tool/TestMainframeImportTool.java | 13 ++ .../util/TestMainframeFTPClientUtils.java | 133 ++++++++++++------ 8 files changed, 171 insertions(+), 44 deletions(-) diff --git a/src/docs/user/import-mainframe.txt b/src/docs/user/import-mainframe.txt index 3ecfb7e4..a994f8b5 100644 --- a/src/docs/user/import-mainframe.txt +++ b/src/docs/user/import-mainframe.txt @@ -214,6 +214,20 @@ buffer size specified in bytes. By default, --buffersize is set to 32760 bytes. will alter the number of records Sqoop reports to have imported. This is because it reads the binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records. +Use the +\--ftp-commands+ with a comma separated list of commands to send custom FTP commands prior to +file retrieval. This is useful for letting the mainframe know to embed data into the binary files +like Record Descriptor Words for variable length records so downstream processes can separate each +record. The mainframe will otherwise discard this metadata in the file transmission. + +NOTE: The responses from the mainframe of these commands are logged ONLY. It is up to the user to check +for errors responses from the mainframe. + +---- +$ sqoop import-mainframe -D hadoop.security.credential.provider.path=jceks://file/my/folder/mainframe.jceks \ + --connect --username user1 --password-alias alias1 --dataset SomeDS --tape true \ + --as-binaryfile --datasettype g --ftp-commands "SITE RDW,SITE RDW READTAPEFORMAT=V" +---- + Additional Import Configuration Properties ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ There are some additional properties which can be configured by modifying diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index f06872f9..99eb8e68 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -369,6 +369,9 @@ public String toString() { // Buffer size to use when using binary FTP transfer mode @StoredAsProperty("mainframe.ftp.buffersize") private Integer bufferSize; + // custom FTP commands to be sent to mainframe + @StoredAsProperty("mainframe.ftp.commands") + private String customFtpCommands; // Accumulo home directory private String accumuloHome; // not serialized to metastore. // Zookeeper home directory @@ -2528,6 +2531,16 @@ public void setBufferSize(int buf) { bufferSize = buf; } + // sets the custom FTP commands + public void setFtpCommands(String ftpCmds) { + customFtpCommands = ftpCmds; + } + + // gets the custom FTP commands issued + public String getFtpCommands() { + return customFtpCommands; + } + public static String getAccumuloHomeDefault() { // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override. String accumuloHome = System.getenv("ACCUMULO_HOME"); diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java index 9842daa6..2ea21f75 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java @@ -44,4 +44,6 @@ public class MainframeConfiguration public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760; public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize"; + + public static final String MAINFRAME_FTP_CUSTOM_COMMANDS = "mainframe.ftp.commands"; } diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java index 90dc2ddd..622667d4 100644 --- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -74,6 +75,11 @@ protected void configureInputFormat(Job job, String tableName, job.getConfiguration().set( MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE, options.getMainframeInputDatasetTape().toString()); + if (!StringUtils.isBlank(options.getFtpCommands())) { + job.getConfiguration().set( + MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS, + options.getFtpCommands()); + } if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) { job.getConfiguration().set( MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE, diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java index fbc8c3db..cbaaf653 100644 --- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java +++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java @@ -42,6 +42,7 @@ public class MainframeImportTool extends ImportTool { public static final String DS_TYPE_ARG = "datasettype"; public static final String DS_TAPE_ARG = "tape"; public static final String BUFFERSIZE_ARG = "buffersize"; + public static final String FTP_COMMANDS = "ftp-commands"; public MainframeImportTool() { super("import-mainframe", false); @@ -92,6 +93,10 @@ protected RelatedOptions getImportOptions() { .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)") .withLongOpt(BUFFERSIZE_ARG) .create()); + importOpts.addOption(OptionBuilder.withArgName("Comma separated FTP commands issued before FTP transfer") + .hasArg().withDescription("Additional FTP commands issued before transfer") + .withLongOpt(FTP_COMMANDS) + .create()); importOpts.addOption(OptionBuilder.withArgName("n") .hasArg().withDescription("Use 'n' map tasks to import in parallel") .withLongOpt(NUM_MAPPERS_ARG) @@ -200,6 +205,9 @@ public void applyOptions(CommandLine in, SqoopOptions out) // set the default buffer size to 32kB out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE); } + if (in.hasOption(FTP_COMMANDS)) { + out.setFtpCommands(in.getOptionValue(FTP_COMMANDS)); + } } @Override diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java index e7c48a6b..a80aad9e 100644 --- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java +++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; @@ -226,6 +227,7 @@ public static FTPClient getFTPConnection(Configuration conf) LOG.info("Defaulting FTP transfer mode to ascii"); ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE); } + applyFtpCmds(ftp,conf); // Use passive mode as default. ftp.enterLocalPassiveMode(); LOG.info("System type detected: " + ftp.getSystemType()); @@ -271,4 +273,28 @@ public static void setMockFTPClient(FTPClient FTPClient) { mockFTPClient = FTPClient; } + public static List applyFtpCmds(FTPClient ftp, Configuration conf) throws IOException { + String ftpCmds = conf.get(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS); + String[] ftpCmdList = parseFtpCommands(ftpCmds); + List results = new ArrayList(); + for (String ftpCommand : ftpCmdList) { + LOG.info("Issuing command: "+ftpCommand); + int res = ftp.sendCommand(ftpCommand); + String result = ftp.getReplyString(); + results.add(result); + LOG.info("ReplyCode: "+res + " ReplyString: "+result); + } + return results; + } + + // splits out the concatenated FTP commands + public static String[] parseFtpCommands(String ftpCmds) { + if (StringUtils.isBlank(ftpCmds)) { + return new String[] {}; + } + return Arrays.stream(ftpCmds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .toArray(String[]::new); + } } diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java index 00e57bd0..9c4ac483 100644 --- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java +++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java @@ -228,6 +228,19 @@ public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseExcep configureAndValidateOptions(args); } + @Test + public void testFtpTransferCommands() throws ParseException, InvalidOptionsException { + String expectedCmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V"; + String[] args = new String[] { "--dataset", "mydatasetname", "--ftp-commands", expectedCmds}; + ToolOptions toolOptions = new ToolOptions(); + SqoopOptions sqoopOption = new SqoopOptions(); + mfImportTool.configureOptions(toolOptions); + sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); + mfImportTool.validateImportOptions(sqoopOption); + String ftpcmds = sqoopOption.getFtpCommands(); + assertEquals(ftpcmds,expectedCmds); + } + private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException { mfImportTool.configureOptions(toolOptions); sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false); diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java index fc6e56d6..7a842ec5 100644 --- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java +++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java @@ -18,6 +18,8 @@ package org.apache.sqoop.util; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -26,6 +28,7 @@ import java.io.IOException; import java.util.List; + import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPListParseEngine; @@ -48,6 +51,8 @@ public class TestMainframeFTPClientUtils { private FTPClient mockFTPClient; private FTPListParseEngine mockFTPListParseEngine; + private static final String DEFAULT_FTP_USERNAME="user"; + private static final String DEFAULT_FTP_PASSWORD="pssword"; @Before public void setUp() { @@ -119,9 +124,8 @@ public void testWrongUsername() { } FTPClient ftp = null; - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); + conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername"); // set the password in the secure credentials object Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, @@ -148,13 +152,8 @@ public void testNotListDatasets() { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "userr"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, - "pssword".getBytes()); + setupDefaultConfiguration(); + conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername"); try { MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf); @@ -187,15 +186,9 @@ public void testListSequentialDatasets() { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, - "pssword".getBytes()); try { List files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf); @@ -226,15 +219,9 @@ public void testListEmptySequentialDatasets() { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, - "pssword".getBytes()); try { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); @@ -266,15 +253,9 @@ public void testTrailingDotSequentialDatasets() { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s"); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1."); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, - "pssword".getBytes()); try { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); @@ -307,15 +288,9 @@ public void testGdgGetLatest() { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g"); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d"); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, - "pssword".getBytes()); try { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); @@ -347,14 +322,9 @@ public void testPartitionedDatasetsShouldReturnAllFiles() { } catch (IOException e) { fail("No IOException should be thrown!"); } - conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); - conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); - conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); + setupDefaultConfiguration(); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p"); conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1"); - // set the password in the secure credentials object - Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); - conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes()); try { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); List files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf); @@ -365,4 +335,79 @@ public void testPartitionedDatasetsShouldReturnAllFiles() { Assert.fail(ioeString); } } + + @Test + public void testFtpCommandExecutes() throws IOException { + final String EXPECTED_RESPONSE = "200 OK"; + final int EXPECTED_RESPONSE_CODE = 200; + String ftpcmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V"; + when(mockFTPClient.login("user", "pssword")).thenReturn(true); + when(mockFTPClient.logout()).thenReturn(true); + when(mockFTPClient.isConnected()).thenReturn(false); + when(mockFTPClient.getReplyCode()).thenReturn(EXPECTED_RESPONSE_CODE); + when(mockFTPClient.getReplyString()).thenReturn(EXPECTED_RESPONSE); + setupDefaultConfiguration(); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g"); + conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d"); + conf.set(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY); + conf.set(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS, ftpcmds); + MainframeFTPClientUtils.setMockFTPClient(mockFTPClient); + FTPClient ftp = MainframeFTPClientUtils.getFTPConnection(conf); + verify(mockFTPClient).sendCommand("quote SITE RDW"); + verify(mockFTPClient).sendCommand("quote SITE RDW READTAPEFORMAT=V"); + } + + @Test + public void testFtpCommandsOneCommand() { + String inputString = "quote SITE RDW READTAPEFORMAT=V"; + String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"}; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertArrayEquals(cmds,expected); + } + + @Test + public void testFtpCommandsOneCommandWithComma() { + String inputString = ",quote SITE RDW READTAPEFORMAT=V"; + String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"}; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertArrayEquals(cmds,expected); + } + + @Test + public void testFtpCommandsOneCommandWithCommas() { + String inputString = ",quote SITE RDW READTAPEFORMAT=V,"; + String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"}; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertArrayEquals(cmds,expected); + } + + @Test + public void testFtpCommandsTwoCommandWithComma() { + String inputString = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V"; + String [] expected = new String [] {"quote SITE RDW","quote SITE RDW READTAPEFORMAT=V"}; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertArrayEquals(cmds,expected); + } + + @Test + public void testFtpCommandsNullCommand() { + String inputString = null; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertEquals(0, cmds.length); + } + + @Test + public void testFtpCommandsEmptyCommands() { + String inputString = ",,,"; + String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString); + assertEquals(0, cmds.length); + } + + private void setupDefaultConfiguration() { + conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111"); + conf.set(DBConfiguration.USERNAME_PROPERTY, DEFAULT_FTP_USERNAME); + Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); + conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, + DEFAULT_FTP_PASSWORD.getBytes()); + } }