5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 11:21:39 +08:00

SQOOP-2217: Sqoop2: Implement SFTP To Connector Support

(Jonathan Seidman via Abraham Elmahrek)
This commit is contained in:
Abraham Elmahrek 2015-04-27 17:33:55 -07:00
parent 4e05e61864
commit 6284d590f3
20 changed files with 1056 additions and 0 deletions

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-sftp</artifactId>
<name>Sqoop SFTP Connector</name>
<dependencies>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>connector-sdk</artifactId>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.51</version>
</dependency>
</dependencies>
<build>
<finalName>sqoop</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration;
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
/**
* Implementation of a Sqoop 2 connector to support data movement to/from an
* SFTP server.
*/
public class SftpConnector extends SqoopConnector {
/**
* Define the TO instance.
*/
private static final To TO = new To(SftpToInitializer.class,
SftpLoader.class,
SftpToDestroyer.class);
/**
* {@inheritDoc}
*
* Since this is a built-in connector it will return the same version as the
* rest of the Sqoop code.
*/
@Override
public String getVersion() {
return VersionInfo.getBuildVersion();
}
/**
* Return the configuration resource bundle for this connector.
*
* @param locale The Locale object.
*
* @return The resource bundle associated with the input locale.
*/
@Override
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(SftpConstants.RESOURCE_BUNDLE_NAME, locale);
}
/**
* Get the class encapsulating link configuration for this connector.
*
* @return The link configuration class for this connector.
*/
@Override
public Class getLinkConfigurationClass() {
return LinkConfiguration.class;
}
/**
* Get the appropriate job configuration class for the input direction.
*
* @param direction Whether to return TO or FROM configuration class.
*
* @return Job configuration class for given direction.
*/
@Override
public Class getJobConfigurationClass(Direction direction) {
switch (direction) {
case TO:
return ToJobConfiguration.class;
default:
return null;
}
}
/**
* Get the object which defines classes for performing import jobs.
*
* @return the From object defining classes for performing import.
*/
@Override
public From getFrom() {
return null;
}
/**
* Get the object which defines classes for performing export jobs.
*
* @return the To object defining classes for performing export.
*/
@Override
public To getTo() {
return TO;
}
/**
* Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader}
* object that can upgrade the connection and job configs.
*
* @return configurable upgrader object
*/
@Override
public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
return new SftpConnectorUpgrader();
}
/**
* Return a List of directions supported by this connector.
*
* @return list of enums representing supported directions.
*/
public List<Direction> getSupportedDirections() {
return Arrays.asList(Direction.TO);
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.common.ErrorCode;
/**
* Error messages for SFTP connector.
*/
public enum SftpConnectorError implements ErrorCode {
SFTP_CONNECTOR_0000("Unknown error occurred."),
SFTP_CONNECTOR_0001("Error occurred connecting to SFTP server."),
SFTP_CONNECTOR_0002("Error occurred disconnecting from SFTP server."),
SFTP_CONNECTOR_0003("Error occurred transferring data to SFTP server."),
SFTP_CONNECTOR_0004("Unknown job type")
;
private final String message;
private SftpConnectorError(String message) {
this.message = message;
}
public String getCode() {
return name();
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
//NOTE: All config types have the similar upgrade path at this point
public class SftpConnectorUpgrader extends ConnectorConfigurableUpgrader {
/**
* {@inheritDoc}
*/
@Override
public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
upgradeTarget.getConfigs());
}
/**
* {@inheritDoc}
*/
@Override
public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
upgradeTarget.getConfigs());
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.job.Constants;
/**
* Constants for FTP connector.
*/
public final class SftpConstants extends Constants {
/**
* Name of resource bundle for configuring this connector.
*/
public static final String RESOURCE_BUNDLE_NAME = "sftp-connector-config";
/**
* Default port for FTP.
*/
public static final int DEFAULT_PORT = 22;
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration;
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.sftp.sftpclient.SftpConnectorClient;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import java.util.UUID;
/**
* Class to receive data from a From instance and load to a To instance.
*/
public class SftpLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
/**
* Number of records written by last call to load() method.
*/
private long rowsWritten = 0;
/**
* Load data to target directory on SFTP server. This will create a uniquely
* named file in the specified destination directory and write the input
* records to that file. This ensures that multiple calls to this method with
* subsets of a dataset won't overwrite previous data.
*
* @param context Loader context object.
* @param linkConfiguration Link configuration.
* @param toJobConfig Job configuration.
* @throws Exception Re-thrown from SFTP client code.
*/
@Override
public void load(LoaderContext context,
LinkConfiguration linkConfiguration,
ToJobConfiguration toJobConfig) throws Exception {
DataReader reader = context.getDataReader();
String outputDir = toJobConfig.toJobConfig.outputDirectory;
// Create a unique filename for writing records.
String path = outputDir + "/" + UUID.randomUUID() + ".txt";
SftpConnectorClient client =
new SftpConnectorClient();
client.connect(linkConfiguration.linkConfig.server,
linkConfiguration.linkConfig.port,
linkConfiguration.linkConfig.username,
linkConfiguration.linkConfig.password);
rowsWritten = client.upload(reader, path);
client.disconnect();
}
/**
* Return the number of rows witten by the last call to load() method.
*
* @return Number of rows written by call to loader.
*/
@Override
public long getRowsWritten() {
return rowsWritten;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration;
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
/**
* Perform any clean up, etc. tasks when the Sqoop execution completes.
*/
public class SftpToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
/**
* Callback to clean up after job execution.
*
* @param context Destroyer context
* @param linkConfig link configuration object
* @param jobConfig TO job configuration object
*/
@Override
public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
ToJobConfiguration jobConfig) {
// do nothing at this point
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration;
import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.utils.ClassUtils;
import java.util.Set;
/**
* Perform any required initialization before execution of job.
*/
public class SftpToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
private static final Logger LOG = Logger.getLogger(SftpToInitializer.class);
/**
* {@inheritDoc}
*/
@Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig,
ToJobConfiguration jobConfig) {
LOG.info("Running SFTP Connector TO initializer.");
// do nothing at this point
}
/**
* {@inheritDoc}
*/
@Override
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfiguration,
ToJobConfiguration toJobConfiguration) {
Set<String> jars =
super.getJars(context, linkConfiguration, toJobConfiguration);
// Jar for jsch library:
jars.add(ClassUtils.jarForClass("com.jcraft.jsch.JSch"));
return jars;
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp.configuration;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.sftp.sftpclient.SftpConnectorClient;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
import org.apache.sqoop.validation.validators.NotEmpty;
/**
* Attributes for SFTP connector link configuration.
*/
@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
public class LinkConfig {
/**
* FTP server hostname.
*/
@Input(size = 255, validators = {@Validator(NotEmpty.class)})
public String server;
/**
* FTP server port. Default is port 22.
*/
@Input
public Integer port;
/**
* Username for server login.
*/
@Input(size = 256, validators = {@Validator(NotEmpty.class)})
public String username;
/**
* Password for server login.
*/
@Input(size = 256, sensitive = true)
public String password;
/**
* Validate that we can log into the server using the supplied credentials.
*/
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
@Override
public void validate(LinkConfig linkConfig) {
try {
SftpConnectorClient client =
new SftpConnectorClient();
client.connect(linkConfig.server, linkConfig.port,
linkConfig.username, linkConfig.password);
client.disconnect();
} catch (SqoopException e) {
addMessage(Status.WARNING, "Can't connect to the SFTP server " +
linkConfig.server + " error is " + e.getMessage());
}
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp.configuration;
import org.apache.sqoop.model.Config;
import org.apache.sqoop.model.ConfigurationClass;
/**
* Class to encapsulate link attributes for FTP connector.
*/
@ConfigurationClass
public class LinkConfiguration {
@Config
public LinkConfig linkConfig;
public LinkConfiguration() {
linkConfig = new LinkConfig();
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp.configuration;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.validators.NotEmpty;
/**
* Attributes for SFTP connector TO configuration.
*/
@ConfigClass
public class ToJobConfig {
/**
* Directory on FTP server to write data to.
*/
@Input(size = 260, validators = {@Validator(NotEmpty.class)})
public String outputDirectory;
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp.configuration;
import org.apache.sqoop.model.ConfigurationClass;
import org.apache.sqoop.model.Config;
/**
* Class to encapsulate TO configuration.
*/
@ConfigurationClass
public class ToJobConfiguration {
@Config
public ToJobConfig toJobConfig;
public ToJobConfiguration() {
toJobConfig = new ToJobConfig();
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector.sftp.sftpclient;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.JSchException;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.sftp.SftpConstants;
import org.apache.sqoop.connector.sftp.SftpConnectorError;
import org.apache.sqoop.etl.io.DataReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Properties;
/**
* Class encapsulating functionality to interact with an SFTP server. This class
* uses the JSch library to provide the SFTP functionality. See
* http://www.jcraft.com/jsch/.
*/
public class SftpConnectorClient {
/**
* Java secure channel implementation supporting sftp functionality.
*/
private JSch jsch = null;
/**
* jsch session object.
*/
private Session session = null;
/**
* sftp channel object.
*/
private ChannelSftp channelSftp = null;
/**
* Hostname for sftp server.
*/
private String sftpServer = null;
/**
* Port for sftp server.
*/
private int sftpPort = SftpConstants.DEFAULT_PORT;
/**
* Log4j logger.
*/
private static final Logger LOG = Logger.getLogger(SftpConnectorClient.class);
/**
* Default constructor. We'll just initialize the secure channel object here.
*/
public SftpConnectorClient() {
jsch = new JSch();
}
/**
* Create a jsch session object, and use it to open a channel to the sftp
* server.
*
* @param host Hostname for SFTP server.
* @param port Port that SFTP server is running on. 22 by default.
* @param username Username for logging into server.
* @param password Password for user login.
*
* @exception SqoopException thrown if a JSchException is caught indicating
* an error during connection.
*/
public void connect(String host, Integer port, String username, String password)
throws SqoopException {
if (port != null) {
sftpPort = port.intValue();
}
try {
session = jsch.getSession(username, host, sftpPort);
session.setPassword(password);
// Disable StrictHostKeyChecking so we don't get an "UnkownHostKey" error:
Properties fstpProps = new Properties();
fstpProps.put("StrictHostKeyChecking", "no");
session.setConfig(fstpProps);
LOG.info("connecting to " + host + " port=" + sftpPort);
session.connect();
LOG.info("successfully connected to " + host);
Channel channel = session.openChannel("sftp");
channel.connect();
channelSftp = (ChannelSftp)channel;
LOG.info("successfully created sftp channel for " + host);
} catch (JSchException e) {
LOG.error("Caught JSchException: " + e.getMessage());
throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0001,
e.getMessage(), e);
}
}
/**
* Upload records to the SFTP server.
*
* @param reader a DataReader object containing data passed from source
* connector.
* @param path Full path on SFTP server to write files to.
*
* @return Number of records written in call to this method.
*
* @throws SqoopException thrown if error occurs during interaction with
* SFTP server.
*/
public long upload(DataReader reader, String path)
throws SqoopException {
long recordsWritten = 0;
// OutputStream for writing records to SFTP server.
OutputStream out = null;
try {
out = channelSftp.put(path, null, ChannelSftp.OVERWRITE, 0);
LOG.info("Opened OutputStream to path: " + path);
String record;
while ((record = reader.readTextRecord()) != null) {
out.write(record.getBytes());
out.write(("\n").getBytes());
recordsWritten++;
}
} catch (Exception e) {
LOG.error("Caught Exception writing records to SFTP server: "
+ e.getMessage());
throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0003,
e.getMessage(), e);
} finally {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
LOG.error("Caught IOException closing SFTP output stream: " +
e.getMessage());
throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0002,
"Caught IOException: " + e.getMessage(), e);
}
}
return recordsWritten;
}
/**
* Disconnect from SFTP server.
*/
public void disconnect() {
session.disconnect();
}
}

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# FTP Connector Resources
############################
# Link Config
linkConfig.label = Link configuration
linkConfig.help = Parameters required to connect to an FTP server.
# FTP server hostname
linkConfig.server.label = SFTP server hostname
linkConfig.server.help = Hostname for the SFTP server.
# FTP server port
linkConfig.port.label = SFTP server port (22)
linkConfig.port.help = Port for the SFTP server. 22 by default.
# username string
linkConfig.username.label = Username
linkConfig.username.help = Enter the username to be used for connecting to the \
SFTP server.
# password string
linkConfig.password.label = Password
linkConfig.password.help = Enter the password to be used for connecting to the \
SFTP server.
# To Job Config
#
toJobConfig.label = ToJob configuration
toJobConfig.help = Parameters required to store data on the SFTP server.
toJobConfig.outputDirectory.label = Output directory
toJobConfig.outputDirectory.help = Directory on the SFTP server to write data to.
toJobConfig.ignored.label = Ignored
toJobConfig.ignored.help = This value is ignored.

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SFTP Connector Properties
org.apache.sqoop.connector.class = org.apache.sqoop.connector.sftp.SftpConnector
org.apache.sqoop.connector.name = sftp-connector

View File

@ -38,6 +38,7 @@ limitations under the License.
<module>connector-hdfs</module>
<module>connector-kite</module>
<module>connector-kafka</module>
<module>connector-sftp</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
<module>connector-mysql-fastpath</module>

View File

@ -386,3 +386,76 @@ Loader
------
During the *loading* phase, Kafka is written to directly from each loader. The order in which data is loaded into Kafka is not guaranteed.
++++++++++++++
SFTP Connector
++++++++++++++
The SFTP connector supports moving data between a Secure File Transfer Protocol (SFTP) server and other supported Sqoop2 connectors.
Currently only the TO direction is supported to write records to an SFTP server. A FROM connector is pending (SQOOP-2218).
-----
Usage
-----
Before executing a Sqoop2 job with the SFTP connector, set **mapreduce.task.classpath.user.precedence** to true in the Hadoop cluster config, for example::
<property>
<name>mapreduce.task.classpath.user.precedence</name>
<value>true</value>
</property>
This is required since the SFTP connector uses the JSch library (http://www.jcraft.com/jsch/) to provide SFTP functionality. Unfortunately Hadoop currently ships with an earlier version of this library which causes an issue with some SFTP servers. Setting this property ensures that the current version of the library packaged with this connector will appear first in the classpath.
To use the SFTP Connector, create a link for the connector and a job that uses the link.
**Link Configuration**
++++++++++++++++++++++
Inputs associated with the link configuration include:
+-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
| Input | Type | Description | Example |
+=============================+=========+=======================================================================+============================+
| SFTP server hostname | String | Hostname for the SFTP server. | sftp.example.com |
| | | *Required*. | |
+-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
| SFTP server port | Integer | Port number for the SFTP server. Defaults to 22. | 2220 |
| | | *Optional*. | |
+-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
| Username | String | The username to provide when connecting to the SFTP server. | sqoop |
| | | *Required*. | |
+-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
| Password | String | The password to provide when connecting to the SFTP server. | sqoop |
| | | *Required* | |
+-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
**Notes**
=========
1. The SFTP connector will attempt to connect to the SFTP server as part of the link validation process. If for some reason a connection can not be established, you'll see a corresponding error message.
2. Note that during connection, the SFTP connector explictly disables *StrictHostKeyChecking* to avoid "UnknownHostKey" errors.
**TO Job Configuration**
++++++++++++++++++++++++
Inputs associated with the Job configuration for the TO direction include:
+-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
| Input | Type | Description | Example |
+=============================+=========+=========================================================================+===================================+
| Output directory | String | The location on the SFTP server that the connector will write files to. | uploads |
| | | *Required* | |
+-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
**Notes**
=========
1. The *output directory* value needs to be an existing directory on the SFTP server.
------
Loader
------
During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector.

11
pom.xml
View File

@ -306,6 +306,17 @@ limitations under the License.
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-sftp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-sftp</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kite</artifactId>

View File

@ -93,6 +93,11 @@ limitations under the License.
<artifactId>sqoop-connector-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-sftp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>