mirror of
https://github.com/apache/sqoop.git
synced 2025-05-02 17:59:21 +08:00
SQOOP-3237: Mainframe FTP transfer option to insert custom FTP commands prior to transfer
(Chris Teoh via Szabolcs Vasas)
This commit is contained in:
parent
b94a0bd948
commit
4a22691f45
@ -214,6 +214,20 @@ buffer size specified in bytes. By default, --buffersize is set to 32760 bytes.
|
||||
will alter the number of records Sqoop reports to have imported. This is because it reads the
|
||||
binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
|
||||
|
||||
Use the +\--ftp-commands+ with a comma separated list of commands to send custom FTP commands prior to
|
||||
file retrieval. This is useful for letting the mainframe know to embed data into the binary files
|
||||
like Record Descriptor Words for variable length records so downstream processes can separate each
|
||||
record. The mainframe will otherwise discard this metadata in the file transmission.
|
||||
|
||||
NOTE: The responses from the mainframe of these commands are logged ONLY. It is up to the user to check
|
||||
for errors responses from the mainframe.
|
||||
|
||||
----
|
||||
$ sqoop import-mainframe -D hadoop.security.credential.provider.path=jceks://file/my/folder/mainframe.jceks \
|
||||
--connect <host> --username user1 --password-alias alias1 --dataset SomeDS --tape true \
|
||||
--as-binaryfile --datasettype g --ftp-commands "SITE RDW,SITE RDW READTAPEFORMAT=V"
|
||||
----
|
||||
|
||||
Additional Import Configuration Properties
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
There are some additional properties which can be configured by modifying
|
||||
|
@ -369,6 +369,9 @@ public String toString() {
|
||||
// Buffer size to use when using binary FTP transfer mode
|
||||
@StoredAsProperty("mainframe.ftp.buffersize")
|
||||
private Integer bufferSize;
|
||||
// custom FTP commands to be sent to mainframe
|
||||
@StoredAsProperty("mainframe.ftp.commands")
|
||||
private String customFtpCommands;
|
||||
// Accumulo home directory
|
||||
private String accumuloHome; // not serialized to metastore.
|
||||
// Zookeeper home directory
|
||||
@ -2528,6 +2531,16 @@ public void setBufferSize(int buf) {
|
||||
bufferSize = buf;
|
||||
}
|
||||
|
||||
// sets the custom FTP commands
|
||||
public void setFtpCommands(String ftpCmds) {
|
||||
customFtpCommands = ftpCmds;
|
||||
}
|
||||
|
||||
// gets the custom FTP commands issued
|
||||
public String getFtpCommands() {
|
||||
return customFtpCommands;
|
||||
}
|
||||
|
||||
public static String getAccumuloHomeDefault() {
|
||||
// Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
|
||||
String accumuloHome = System.getenv("ACCUMULO_HOME");
|
||||
|
@ -44,4 +44,6 @@ public class MainframeConfiguration
|
||||
public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
|
||||
|
||||
public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
|
||||
|
||||
public static final String MAINFRAME_FTP_CUSTOM_COMMANDS = "mainframe.ftp.commands";
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -74,6 +75,11 @@ protected void configureInputFormat(Job job, String tableName,
|
||||
job.getConfiguration().set(
|
||||
MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
|
||||
options.getMainframeInputDatasetTape().toString());
|
||||
if (!StringUtils.isBlank(options.getFtpCommands())) {
|
||||
job.getConfiguration().set(
|
||||
MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS,
|
||||
options.getFtpCommands());
|
||||
}
|
||||
if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
|
||||
job.getConfiguration().set(
|
||||
MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
|
||||
|
@ -42,6 +42,7 @@ public class MainframeImportTool extends ImportTool {
|
||||
public static final String DS_TYPE_ARG = "datasettype";
|
||||
public static final String DS_TAPE_ARG = "tape";
|
||||
public static final String BUFFERSIZE_ARG = "buffersize";
|
||||
public static final String FTP_COMMANDS = "ftp-commands";
|
||||
|
||||
public MainframeImportTool() {
|
||||
super("import-mainframe", false);
|
||||
@ -92,6 +93,10 @@ protected RelatedOptions getImportOptions() {
|
||||
.hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
|
||||
.withLongOpt(BUFFERSIZE_ARG)
|
||||
.create());
|
||||
importOpts.addOption(OptionBuilder.withArgName("Comma separated FTP commands issued before FTP transfer")
|
||||
.hasArg().withDescription("Additional FTP commands issued before transfer")
|
||||
.withLongOpt(FTP_COMMANDS)
|
||||
.create());
|
||||
importOpts.addOption(OptionBuilder.withArgName("n")
|
||||
.hasArg().withDescription("Use 'n' map tasks to import in parallel")
|
||||
.withLongOpt(NUM_MAPPERS_ARG)
|
||||
@ -200,6 +205,9 @@ public void applyOptions(CommandLine in, SqoopOptions out)
|
||||
// set the default buffer size to 32kB
|
||||
out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
|
||||
}
|
||||
if (in.hasOption(FTP_COMMANDS)) {
|
||||
out.setFtpCommands(in.getOptionValue(FTP_COMMANDS));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -226,6 +227,7 @@ public static FTPClient getFTPConnection(Configuration conf)
|
||||
LOG.info("Defaulting FTP transfer mode to ascii");
|
||||
ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
|
||||
}
|
||||
applyFtpCmds(ftp,conf);
|
||||
// Use passive mode as default.
|
||||
ftp.enterLocalPassiveMode();
|
||||
LOG.info("System type detected: " + ftp.getSystemType());
|
||||
@ -271,4 +273,28 @@ public static void setMockFTPClient(FTPClient FTPClient) {
|
||||
mockFTPClient = FTPClient;
|
||||
}
|
||||
|
||||
public static List<String> applyFtpCmds(FTPClient ftp, Configuration conf) throws IOException {
|
||||
String ftpCmds = conf.get(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS);
|
||||
String[] ftpCmdList = parseFtpCommands(ftpCmds);
|
||||
List<String> results = new ArrayList<String>();
|
||||
for (String ftpCommand : ftpCmdList) {
|
||||
LOG.info("Issuing command: "+ftpCommand);
|
||||
int res = ftp.sendCommand(ftpCommand);
|
||||
String result = ftp.getReplyString();
|
||||
results.add(result);
|
||||
LOG.info("ReplyCode: "+res + " ReplyString: "+result);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// splits out the concatenated FTP commands
|
||||
public static String[] parseFtpCommands(String ftpCmds) {
|
||||
if (StringUtils.isBlank(ftpCmds)) {
|
||||
return new String[] {};
|
||||
}
|
||||
return Arrays.stream(ftpCmds.split(","))
|
||||
.map(String::trim)
|
||||
.filter(StringUtils::isNotEmpty)
|
||||
.toArray(String[]::new);
|
||||
}
|
||||
}
|
||||
|
@ -228,6 +228,19 @@ public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseExcep
|
||||
configureAndValidateOptions(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpTransferCommands() throws ParseException, InvalidOptionsException {
|
||||
String expectedCmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
|
||||
String[] args = new String[] { "--dataset", "mydatasetname", "--ftp-commands", expectedCmds};
|
||||
ToolOptions toolOptions = new ToolOptions();
|
||||
SqoopOptions sqoopOption = new SqoopOptions();
|
||||
mfImportTool.configureOptions(toolOptions);
|
||||
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
|
||||
mfImportTool.validateImportOptions(sqoopOption);
|
||||
String ftpcmds = sqoopOption.getFtpCommands();
|
||||
assertEquals(ftpcmds,expectedCmds);
|
||||
}
|
||||
|
||||
private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
|
||||
mfImportTool.configureOptions(toolOptions);
|
||||
sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.sqoop.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -26,6 +28,7 @@
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.apache.commons.net.ftp.FTPListParseEngine;
|
||||
@ -48,6 +51,8 @@ public class TestMainframeFTPClientUtils {
|
||||
|
||||
private FTPClient mockFTPClient;
|
||||
private FTPListParseEngine mockFTPListParseEngine;
|
||||
private static final String DEFAULT_FTP_USERNAME="user";
|
||||
private static final String DEFAULT_FTP_PASSWORD="pssword";
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
@ -119,9 +124,8 @@ public void testWrongUsername() {
|
||||
}
|
||||
|
||||
FTPClient ftp = null;
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
@ -148,13 +152,8 @@ public void testNotListDatasets() {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
"pssword".getBytes());
|
||||
setupDefaultConfiguration();
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
|
||||
|
||||
try {
|
||||
MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf);
|
||||
@ -187,15 +186,9 @@ public void testListSequentialDatasets() {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
"pssword".getBytes());
|
||||
|
||||
try {
|
||||
List<String> files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf);
|
||||
@ -226,15 +219,9 @@ public void testListEmptySequentialDatasets() {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
"pssword".getBytes());
|
||||
|
||||
try {
|
||||
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
|
||||
@ -266,15 +253,9 @@ public void testTrailingDotSequentialDatasets() {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1.");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
"pssword".getBytes());
|
||||
|
||||
try {
|
||||
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
|
||||
@ -307,15 +288,9 @@ public void testGdgGetLatest() {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
"pssword".getBytes());
|
||||
|
||||
try {
|
||||
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
|
||||
@ -347,14 +322,9 @@ public void testPartitionedDatasetsShouldReturnAllFiles() {
|
||||
} catch (IOException e) {
|
||||
fail("No IOException should be thrown!");
|
||||
}
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
|
||||
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
|
||||
// set the password in the secure credentials object
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
|
||||
try {
|
||||
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
|
||||
List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
|
||||
@ -365,4 +335,79 @@ public void testPartitionedDatasetsShouldReturnAllFiles() {
|
||||
Assert.fail(ioeString);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandExecutes() throws IOException {
|
||||
final String EXPECTED_RESPONSE = "200 OK";
|
||||
final int EXPECTED_RESPONSE_CODE = 200;
|
||||
String ftpcmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
|
||||
when(mockFTPClient.login("user", "pssword")).thenReturn(true);
|
||||
when(mockFTPClient.logout()).thenReturn(true);
|
||||
when(mockFTPClient.isConnected()).thenReturn(false);
|
||||
when(mockFTPClient.getReplyCode()).thenReturn(EXPECTED_RESPONSE_CODE);
|
||||
when(mockFTPClient.getReplyString()).thenReturn(EXPECTED_RESPONSE);
|
||||
setupDefaultConfiguration();
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
|
||||
conf.set(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
|
||||
conf.set(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS, ftpcmds);
|
||||
MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
|
||||
FTPClient ftp = MainframeFTPClientUtils.getFTPConnection(conf);
|
||||
verify(mockFTPClient).sendCommand("quote SITE RDW");
|
||||
verify(mockFTPClient).sendCommand("quote SITE RDW READTAPEFORMAT=V");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsOneCommand() {
|
||||
String inputString = "quote SITE RDW READTAPEFORMAT=V";
|
||||
String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertArrayEquals(cmds,expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsOneCommandWithComma() {
|
||||
String inputString = ",quote SITE RDW READTAPEFORMAT=V";
|
||||
String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertArrayEquals(cmds,expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsOneCommandWithCommas() {
|
||||
String inputString = ",quote SITE RDW READTAPEFORMAT=V,";
|
||||
String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertArrayEquals(cmds,expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsTwoCommandWithComma() {
|
||||
String inputString = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
|
||||
String [] expected = new String [] {"quote SITE RDW","quote SITE RDW READTAPEFORMAT=V"};
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertArrayEquals(cmds,expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsNullCommand() {
|
||||
String inputString = null;
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertEquals(0, cmds.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFtpCommandsEmptyCommands() {
|
||||
String inputString = ",,,";
|
||||
String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
|
||||
assertEquals(0, cmds.length);
|
||||
}
|
||||
|
||||
private void setupDefaultConfiguration() {
|
||||
conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
|
||||
conf.set(DBConfiguration.USERNAME_PROPERTY, DEFAULT_FTP_USERNAME);
|
||||
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
|
||||
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
|
||||
DEFAULT_FTP_PASSWORD.getBytes());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user