diff --git a/connector/connector-sftp/pom.xml b/connector/connector-sftp/pom.xml new file mode 100644 index 00000000..44e64b1e --- /dev/null +++ b/connector/connector-sftp/pom.xml @@ -0,0 +1,69 @@ + + + + + 4.0.0 + + + org.apache.sqoop + connector + 2.0.0-SNAPSHOT + + + org.apache.sqoop.connector + sqoop-connector-sftp + Sqoop SFTP Connector + + + + org.testng + testng + + + + org.apache.sqoop + connector-sdk + + + + com.jcraft + jsch + 0.1.51 + + + + + sqoop + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnector.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnector.java new file mode 100644 index 00000000..c2ce8da4 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnector.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * Implementation of a Sqoop 2 connector to support data movement to/from an + * SFTP server. + */ +public class SftpConnector extends SqoopConnector { + + /** + * Define the TO instance. + */ + private static final To TO = new To(SftpToInitializer.class, + SftpLoader.class, + SftpToDestroyer.class); + + /** + * {@inheritDoc} + * + * Since this is a built-in connector it will return the same version as the + * rest of the Sqoop code. + */ + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + /** + * Return the configuration resource bundle for this connector. + * + * @param locale The Locale object. + * + * @return The resource bundle associated with the input locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle(SftpConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * Get the class encapsulating link configuration for this connector. + * + * @return The link configuration class for this connector. + */ + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + /** + * Get the appropriate job configuration class for the input direction. + * + * @param direction Whether to return TO or FROM configuration class. + * + * @return Job configuration class for given direction. + */ + @Override + public Class getJobConfigurationClass(Direction direction) { + switch (direction) { + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + /** + * Get the object which defines classes for performing import jobs. + * + * @return the From object defining classes for performing import. + */ + @Override + public From getFrom() { + return null; + } + + /** + * Get the object which defines classes for performing export jobs. + * + * @return the To object defining classes for performing export. + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} + * object that can upgrade the connection and job configs. + * + * @return configurable upgrader object + */ + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + return new SftpConnectorUpgrader(); + } + + /** + * Return a List of directions supported by this connector. + * + * @return list of enums representing supported directions. + */ + public List getSupportedDirections() { + return Arrays.asList(Direction.TO); + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorError.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorError.java new file mode 100644 index 00000000..e1bb96d3 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorError.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.common.ErrorCode; + +/** + * Error messages for SFTP connector. + */ +public enum SftpConnectorError implements ErrorCode { + SFTP_CONNECTOR_0000("Unknown error occurred."), + SFTP_CONNECTOR_0001("Error occurred connecting to SFTP server."), + SFTP_CONNECTOR_0002("Error occurred disconnecting from SFTP server."), + SFTP_CONNECTOR_0003("Error occurred transferring data to SFTP server."), + SFTP_CONNECTOR_0004("Unknown job type") + ; + + private final String message; + + private SftpConnectorError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } + +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorUpgrader.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorUpgrader.java new file mode 100644 index 00000000..697dc145 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConnectorUpgrader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +//NOTE: All config types have the similar upgrade path at this point +public class SftpConnectorUpgrader extends ConnectorConfigurableUpgrader { + + /** + * {@inheritDoc} + */ + @Override + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), + upgradeTarget.getConfigs()); + } + + /** + * {@inheritDoc} + */ + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), + upgradeTarget.getConfigs()); + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConstants.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConstants.java new file mode 100644 index 00000000..b0548c67 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpConstants.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.job.Constants; + +/** + * Constants for FTP connector. + */ +public final class SftpConstants extends Constants { + + /** + * Name of resource bundle for configuring this connector. + */ + public static final String RESOURCE_BUNDLE_NAME = "sftp-connector-config"; + + /** + * Default port for FTP. + */ + public static final int DEFAULT_PORT = 22; +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpLoader.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpLoader.java new file mode 100644 index 00000000..87734e3b --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpLoader.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.sftp.sftpclient.SftpConnectorClient; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +import java.util.UUID; + +/** + * Class to receive data from a From instance and load to a To instance. + */ +public class SftpLoader extends Loader { + + /** + * Number of records written by last call to load() method. + */ + private long rowsWritten = 0; + + /** + * Load data to target directory on SFTP server. This will create a uniquely + * named file in the specified destination directory and write the input + * records to that file. This ensures that multiple calls to this method with + * subsets of a dataset won't overwrite previous data. + * + * @param context Loader context object. + * @param linkConfiguration Link configuration. + * @param toJobConfig Job configuration. + * @throws Exception Re-thrown from SFTP client code. + */ + @Override + public void load(LoaderContext context, + LinkConfiguration linkConfiguration, + ToJobConfiguration toJobConfig) throws Exception { + DataReader reader = context.getDataReader(); + String outputDir = toJobConfig.toJobConfig.outputDirectory; + // Create a unique filename for writing records. + String path = outputDir + "/" + UUID.randomUUID() + ".txt"; + SftpConnectorClient client = + new SftpConnectorClient(); + client.connect(linkConfiguration.linkConfig.server, + linkConfiguration.linkConfig.port, + linkConfiguration.linkConfig.username, + linkConfiguration.linkConfig.password); + rowsWritten = client.upload(reader, path); + client.disconnect(); + } + + /** + * Return the number of rows witten by the last call to load() method. + * + * @return Number of rows written by call to loader. + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToDestroyer.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToDestroyer.java new file mode 100644 index 00000000..c7ba1bee --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToDestroyer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp; + +import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +/** + * Perform any clean up, etc. tasks when the Sqoop execution completes. + */ +public class SftpToDestroyer extends Destroyer { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param linkConfig link configuration object + * @param jobConfig TO job configuration object + */ + @Override + public void destroy(DestroyerContext context, LinkConfiguration linkConfig, + ToJobConfiguration jobConfig) { + // do nothing at this point + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java new file mode 100644 index 00000000..bfb51ac9 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/SftpToInitializer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp; + +import org.apache.log4j.Logger; + +import org.apache.sqoop.connector.sftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.sftp.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.utils.ClassUtils; + +import java.util.Set; + +/** + * Perform any required initialization before execution of job. + */ +public class SftpToInitializer extends Initializer { + + private static final Logger LOG = Logger.getLogger(SftpToInitializer.class); + + /** + * {@inheritDoc} + */ + @Override + public void initialize(InitializerContext context, LinkConfiguration linkConfig, + ToJobConfiguration jobConfig) { + LOG.info("Running SFTP Connector TO initializer."); + // do nothing at this point + } + + /** + * {@inheritDoc} + */ + @Override + public Set getJars(InitializerContext context, + LinkConfiguration linkConfiguration, + ToJobConfiguration toJobConfiguration) { + Set jars = + super.getJars(context, linkConfiguration, toJobConfiguration); + // Jar for jsch library: + jars.add(ClassUtils.jarForClass("com.jcraft.jsch.JSch")); + return jars; + } + +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfig.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfig.java new file mode 100644 index 00000000..1868bba6 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfig.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp.configuration; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.sftp.sftpclient.SftpConnectorClient; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * Attributes for SFTP connector link configuration. + */ +@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)}) +public class LinkConfig { + + /** + * FTP server hostname. + */ + @Input(size = 255, validators = {@Validator(NotEmpty.class)}) + public String server; + + /** + * FTP server port. Default is port 22. + */ + @Input + public Integer port; + + /** + * Username for server login. + */ + @Input(size = 256, validators = {@Validator(NotEmpty.class)}) + public String username; + + /** + * Password for server login. + */ + @Input(size = 256, sensitive = true) + public String password; + + /** + * Validate that we can log into the server using the supplied credentials. + */ + public static class ConfigValidator extends AbstractValidator { + @Override + public void validate(LinkConfig linkConfig) { + try { + SftpConnectorClient client = + new SftpConnectorClient(); + client.connect(linkConfig.server, linkConfig.port, + linkConfig.username, linkConfig.password); + client.disconnect(); + } catch (SqoopException e) { + addMessage(Status.WARNING, "Can't connect to the SFTP server " + + linkConfig.server + " error is " + e.getMessage()); + } + } + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfiguration.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfiguration.java new file mode 100644 index 00000000..1e1c9014 --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/LinkConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +/** + * Class to encapsulate link attributes for FTP connector. + */ +@ConfigurationClass +public class LinkConfiguration { + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfig.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfig.java new file mode 100644 index 00000000..1140673c --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfig.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * Attributes for SFTP connector TO configuration. + */ +@ConfigClass +public class ToJobConfig { + + /** + * Directory on FTP server to write data to. + */ + @Input(size = 260, validators = {@Validator(NotEmpty.class)}) + public String outputDirectory; +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfiguration.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfiguration.java new file mode 100644 index 00000000..00d4eb5b --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/configuration/ToJobConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.sftp.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; + +/** + * Class to encapsulate TO configuration. + */ +@ConfigurationClass +public class ToJobConfiguration { + @Config + public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } +} diff --git a/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/sftpclient/SftpConnectorClient.java b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/sftpclient/SftpConnectorClient.java new file mode 100644 index 00000000..a8418abe --- /dev/null +++ b/connector/connector-sftp/src/main/java/org/apache/sqoop/connector/sftp/sftpclient/SftpConnectorClient.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.sftp.sftpclient; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.Session; +import com.jcraft.jsch.JSchException; + +import org.apache.log4j.Logger; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.sftp.SftpConstants; +import org.apache.sqoop.connector.sftp.SftpConnectorError; +import org.apache.sqoop.etl.io.DataReader; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +/** + * Class encapsulating functionality to interact with an SFTP server. This class + * uses the JSch library to provide the SFTP functionality. See + * http://www.jcraft.com/jsch/. + */ +public class SftpConnectorClient { + + /** + * Java secure channel implementation supporting sftp functionality. + */ + private JSch jsch = null; + + /** + * jsch session object. + */ + private Session session = null; + + /** + * sftp channel object. + */ + private ChannelSftp channelSftp = null; + + /** + * Hostname for sftp server. + */ + private String sftpServer = null; + + /** + * Port for sftp server. + */ + private int sftpPort = SftpConstants.DEFAULT_PORT; + + /** + * Log4j logger. + */ + private static final Logger LOG = Logger.getLogger(SftpConnectorClient.class); + + /** + * Default constructor. We'll just initialize the secure channel object here. + */ + public SftpConnectorClient() { + jsch = new JSch(); + } + + /** + * Create a jsch session object, and use it to open a channel to the sftp + * server. + * + * @param host Hostname for SFTP server. + * @param port Port that SFTP server is running on. 22 by default. + * @param username Username for logging into server. + * @param password Password for user login. + * + * @exception SqoopException thrown if a JSchException is caught indicating + * an error during connection. + */ + public void connect(String host, Integer port, String username, String password) + throws SqoopException { + + if (port != null) { + sftpPort = port.intValue(); + } + + try { + session = jsch.getSession(username, host, sftpPort); + session.setPassword(password); + // Disable StrictHostKeyChecking so we don't get an "UnkownHostKey" error: + Properties fstpProps = new Properties(); + fstpProps.put("StrictHostKeyChecking", "no"); + session.setConfig(fstpProps); + LOG.info("connecting to " + host + " port=" + sftpPort); + session.connect(); + LOG.info("successfully connected to " + host); + Channel channel = session.openChannel("sftp"); + channel.connect(); + channelSftp = (ChannelSftp)channel; + LOG.info("successfully created sftp channel for " + host); + } catch (JSchException e) { + LOG.error("Caught JSchException: " + e.getMessage()); + throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0001, + e.getMessage(), e); + } + } + + /** + * Upload records to the SFTP server. + * + * @param reader a DataReader object containing data passed from source + * connector. + * @param path Full path on SFTP server to write files to. + * + * @return Number of records written in call to this method. + * + * @throws SqoopException thrown if error occurs during interaction with + * SFTP server. + */ + public long upload(DataReader reader, String path) + throws SqoopException { + + long recordsWritten = 0; + // OutputStream for writing records to SFTP server. + OutputStream out = null; + + try { + out = channelSftp.put(path, null, ChannelSftp.OVERWRITE, 0); + LOG.info("Opened OutputStream to path: " + path); + String record; + while ((record = reader.readTextRecord()) != null) { + out.write(record.getBytes()); + out.write(("\n").getBytes()); + recordsWritten++; + } + } catch (Exception e) { + LOG.error("Caught Exception writing records to SFTP server: " + + e.getMessage()); + throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0003, + e.getMessage(), e); + } finally { + try { + if (out != null) { + out.close(); + } + } catch (IOException e) { + LOG.error("Caught IOException closing SFTP output stream: " + + e.getMessage()); + throw new SqoopException(SftpConnectorError.SFTP_CONNECTOR_0002, + "Caught IOException: " + e.getMessage(), e); + } + } + + return recordsWritten; + } + + /** + * Disconnect from SFTP server. + */ + public void disconnect() { + session.disconnect(); + } +} diff --git a/connector/connector-sftp/src/main/resources/log4j.properties b/connector/connector-sftp/src/main/resources/log4j.properties new file mode 100644 index 00000000..44ffced2 --- /dev/null +++ b/connector/connector-sftp/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/connector/connector-sftp/src/main/resources/sftp-connector-config.properties b/connector/connector-sftp/src/main/resources/sftp-connector-config.properties new file mode 100644 index 00000000..c56c8e02 --- /dev/null +++ b/connector/connector-sftp/src/main/resources/sftp-connector-config.properties @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# FTP Connector Resources + +############################ + +# Link Config +linkConfig.label = Link configuration +linkConfig.help = Parameters required to connect to an FTP server. + +# FTP server hostname +linkConfig.server.label = SFTP server hostname +linkConfig.server.help = Hostname for the SFTP server. + +# FTP server port +linkConfig.port.label = SFTP server port (22) +linkConfig.port.help = Port for the SFTP server. 22 by default. + +# username string +linkConfig.username.label = Username +linkConfig.username.help = Enter the username to be used for connecting to the \ + SFTP server. + +# password string +linkConfig.password.label = Password +linkConfig.password.help = Enter the password to be used for connecting to the \ + SFTP server. + +# To Job Config +# +toJobConfig.label = ToJob configuration +toJobConfig.help = Parameters required to store data on the SFTP server. + +toJobConfig.outputDirectory.label = Output directory +toJobConfig.outputDirectory.help = Directory on the SFTP server to write data to. + +toJobConfig.ignored.label = Ignored +toJobConfig.ignored.help = This value is ignored. diff --git a/connector/connector-sftp/src/main/resources/sqoopconnector.properties b/connector/connector-sftp/src/main/resources/sqoopconnector.properties new file mode 100644 index 00000000..7e1e21b6 --- /dev/null +++ b/connector/connector-sftp/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SFTP Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.sftp.SftpConnector +org.apache.sqoop.connector.name = sftp-connector diff --git a/connector/pom.xml b/connector/pom.xml index dfa7e884..c9990612 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -38,6 +38,7 @@ limitations under the License. connector-hdfs connector-kite connector-kafka + connector-sftp