diff --git a/build.xml b/build.xml index 7f68b573..a85705fa 100644 --- a/build.xml +++ b/build.xml @@ -734,6 +734,9 @@ + + + diff --git a/ivy.xml b/ivy.xml index 6be4fa20..6af94d9d 100644 --- a/ivy.xml +++ b/ivy.xml @@ -195,6 +195,8 @@ under the License. + + diff --git a/src/docs/user/hive-args.txt b/src/docs/user/hive-args.txt index 441f54e8..75095641 100644 --- a/src/docs/user/hive-args.txt +++ b/src/docs/user/hive-args.txt @@ -43,5 +43,10 @@ Argument Description Hive type for configured columns. If specify commas in\ this argument, use URL encoded keys and values, for example,\ use DECIMAL(1%2C%201) instead of DECIMAL(1, 1). ++\--hs2-url+ The JDBC connection string to HiveServer2 as you would specify in Beeline. If you use this option with \ + --hive-import then Sqoop will try to connect to HiveServer2 instead of using Hive CLI. ++\--hs2-user+ The user for creating the JDBC connection to HiveServer2. The default is the current OS user. ++\--hs2-keytab+ The path to the keytab file of the user connecting to HiveServer2. If you choose another \ + HiveServer2 user (with --hs2-user) then --hs2-keytab has to be also specified otherwise it can be omitted. -------------------------------------------------------------------------- diff --git a/src/docs/user/hive.txt b/src/docs/user/hive.txt index 3dc8bb46..f8f7c27e 100644 --- a/src/docs/user/hive.txt +++ b/src/docs/user/hive.txt @@ -35,12 +35,22 @@ omitted, Sqoop will generate a Hive script containing a +CREATE TABLE+ operation defining your columns using Hive's types, and a +LOAD DATA INPATH+ statement to move the data files into Hive's warehouse directory. -The script will be executed by calling +The script can be executed in two ways: + +- By the default the script will be executed by calling the installed copy of hive on the machine where Sqoop is run. If you have multiple Hive installations, or +hive+ is not in your +$PATH+, use the *+\--hive-home+* option to identify the Hive installation directory. Sqoop will use +$HIVE_HOME/bin/hive+ from here. +- If the user specifies the *+\--hs2-url+* parameter then the script will + be sent to HiveServer2 through a JDBC connection. Note that the data itself + will not be transferred via the JDBC connection it is written directly to HDFS + just like in case of the default hive import. As HiveServer2 provides proper + authorization and auditing features it is recommended to use this instead of + the default. Currently only Kerberos authentication and text file format is + supported with this option. + NOTE: This function is incompatible with +\--as-avrodatafile+ and +\--as-sequencefile+. diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 651cebd6..d9984af3 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -449,6 +449,15 @@ public String toString() { private String metaUsername; private String metaPassword; + @StoredAsProperty("hs2.url") + private String hs2Url; + + @StoredAsProperty("hs2.user") + private String hs2User; + + @StoredAsProperty("hs2.keytab") + private String hs2Keytab; + public SqoopOptions() { initDefaults(null); } @@ -2892,5 +2901,29 @@ public void setMetaPassword(String metaPassword) { this.metaPassword = metaPassword; } + public String getHs2Url() { + return hs2Url; + } + + public void setHs2Url(String hs2Url) { + this.hs2Url = hs2Url; + } + + public String getHs2User() { + return hs2User; + } + + public void setHs2User(String hs2User) { + this.hs2User = hs2User; + } + + public String getHs2Keytab() { + return hs2Keytab; + } + + public void setHs2Keytab(String hs2Keytab) { + this.hs2Keytab = hs2Keytab; + } + } diff --git a/src/java/org/apache/sqoop/hive/HiveClient.java b/src/java/org/apache/sqoop/hive/HiveClient.java new file mode 100644 index 00000000..994f858f --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveClient.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import java.io.IOException; + +public interface HiveClient { + + void importTable() throws IOException; + + void createTable() throws IOException; +} diff --git a/src/java/org/apache/sqoop/hive/HiveClientCommon.java b/src/java/org/apache/sqoop/hive/HiveClientCommon.java new file mode 100644 index 00000000..952db6dd --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveClientCommon.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.io.CodecMap; + +import java.io.IOException; + +/** + * Class containing the common logic for different HiveClient implementations. + */ +public class HiveClientCommon { + + public static final Log LOG = LogFactory.getLog(HiveClientCommon.class.getName()); + + /** + * If we used a MapReduce-based upload of the data, remove the _logs dir + * from where we put it, before running Hive LOAD DATA INPATH. + */ + public void removeTempLogs(Configuration configuration, Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(configuration); + Path logsPath = new Path(tablePath, "_logs"); + if (fs.exists(logsPath)) { + LOG.info("Removing temporary files from import process: " + logsPath); + if (!fs.delete(logsPath, true)) { + LOG.warn("Could not delete temporary files; " + + "continuing with import, but it may fail."); + } + } + } + + /** + * Clean up after successful HIVE import. + * + * @param outputPath path to the output directory + * @throws IOException + */ + public void cleanUp(Configuration configuration, Path outputPath) throws IOException { + FileSystem fs = outputPath.getFileSystem(configuration); + + // HIVE is not always removing input directory after LOAD DATA statement + // (which is our export directory). We're removing export directory in case + // that is blank for case that user wants to periodically populate HIVE + // table (for example with --hive-overwrite). + try { + if (outputPath != null && fs.exists(outputPath)) { + FileStatus[] statuses = fs.listStatus(outputPath); + if (statuses.length == 0) { + LOG.info("Export directory is empty, removing it."); + fs.delete(outputPath, true); + } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) { + LOG.info("Export directory is contains the _SUCCESS file only, removing the directory."); + fs.delete(outputPath, true); + } else { + LOG.info("Export directory is not empty, keeping it."); + } + } + } catch(IOException e) { + LOG.error("Issue with cleaning (safe to ignore)", e); + } + } + + public void indexLzoFiles(SqoopOptions sqoopOptions, Path finalPath) throws IOException { + String codec = sqoopOptions.getCompressionCodec(); + if (codec != null && (codec.equals(CodecMap.LZOP) + || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) { + try { + Tool tool = ReflectionUtils.newInstance(Class. + forName("com.hadoop.compression.lzo.DistributedLzoIndexer"). + asSubclass(Tool.class), sqoopOptions.getConf()); + ToolRunner.run(sqoopOptions.getConf(), tool, + new String[]{finalPath.toString()}); + } catch (Exception ex) { + LOG.error("Error indexing lzo files", ex); + throw new IOException("Error indexing lzo files", ex); + } + } + } + +} diff --git a/src/java/org/apache/sqoop/hive/HiveClientFactory.java b/src/java/org/apache/sqoop/hive/HiveClientFactory.java new file mode 100644 index 00000000..67de8f01 --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveClientFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.authentication.KerberosAuthenticator; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator; +import org.apache.sqoop.manager.ConnManager; + +import java.io.IOException; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class HiveClientFactory { + + private final HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer; + + public HiveClientFactory(HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer) { + this.connectionFactoryInitializer = connectionFactoryInitializer; + } + + public HiveClientFactory() { + this(new HiveServer2ConnectionFactoryInitializer()); + } + + public HiveClient createHiveClient(SqoopOptions sqoopOptions, ConnManager connManager) { + if (useHiveCli(sqoopOptions)) { + return createHiveImport(sqoopOptions, connManager); + } else { + return createHiveServer2Client(sqoopOptions, connManager); + } + } + + private HiveClient createHiveImport(SqoopOptions sqoopOptions, ConnManager connManager) { + return new HiveImport(sqoopOptions, connManager, sqoopOptions.getConf(), false); + } + + private HiveClient createHiveServer2Client(SqoopOptions sqoopOptions, ConnManager connManager) { + TableDefWriter tableDefWriter = createTableDefWriter(sqoopOptions, connManager); + JdbcConnectionFactory hs2JdbcConnectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + return new HiveServer2Client(sqoopOptions, tableDefWriter, hs2JdbcConnectionFactory); + } + + TableDefWriter createTableDefWriter(SqoopOptions sqoopOptions, ConnManager connManager) { + return new TableDefWriter(sqoopOptions, connManager, sqoopOptions.getTableName(), sqoopOptions.getHiveTableName(), sqoopOptions.getConf(), false); + } + + private boolean useHiveCli(SqoopOptions sqoopOptions) { + return StringUtils.isEmpty(sqoopOptions.getHs2Url()); + } + +} diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java index c2729119..5da00a74 100644 --- a/src/java/org/apache/sqoop/hive/HiveImport.java +++ b/src/java/org/apache/sqoop/hive/HiveImport.java @@ -31,16 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.util.Tool; -import org.apache.sqoop.io.CodecMap; import org.apache.sqoop.util.Executor; import org.apache.sqoop.util.LoggingAsyncSink; import org.apache.sqoop.util.SubprocessSecurityManager; @@ -54,7 +47,7 @@ * to Hive itself as well as orchestrating the use of the other classes in this * package. */ -public class HiveImport { +public class HiveImport implements HiveClient { public static final Log LOG = LogFactory.getLog(HiveImport.class.getName()); @@ -62,6 +55,7 @@ public class HiveImport { private ConnManager connManager; private Configuration configuration; private boolean generateOnly; + private HiveClientCommon hiveClientCommon; private static boolean testMode = false; public static boolean getTestMode() { @@ -77,13 +71,17 @@ public static void setTestMode(boolean mode) { "org.apache.hadoop.hive.cli.CliDriver"; public HiveImport(final SqoopOptions opts, final ConnManager connMgr, - final Configuration conf, final boolean generateOnly) { + final Configuration conf, final boolean generateOnly, final HiveClientCommon hiveClientCommon) { this.options = opts; this.connManager = connMgr; this.configuration = conf; this.generateOnly = generateOnly; + this.hiveClientCommon = hiveClientCommon; } + public HiveImport(SqoopOptions opts, ConnManager connMgr, Configuration conf, boolean generateOnly) { + this(opts, connMgr, conf, generateOnly, new HiveClientCommon()); + } /** * @return the filename of the hive executable to run to do the import @@ -110,28 +108,12 @@ private String getHiveBinPath() { } } - /** - * If we used a MapReduce-based upload of the data, remove the _logs dir - * from where we put it, before running Hive LOAD DATA INPATH. - */ - private void removeTempLogs(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(configuration); - Path logsPath = new Path(tablePath, "_logs"); - if (fs.exists(logsPath)) { - LOG.info("Removing temporary files from import process: " + logsPath); - if (!fs.delete(logsPath, true)) { - LOG.warn("Could not delete temporary files; " - + "continuing with import, but it may fail."); - } - } - } - /** * @return true if we're just generating the DDL for the import, but * not actually running it (i.e., --generate-only mode). If so, don't * do any side-effecting actions in Hive. */ - private boolean isGenerateOnly() { + boolean isGenerateOnly() { return generateOnly; } @@ -181,8 +163,6 @@ public void importTable(String inputTableName, String outputTableName, } // generate the HQL statements to run. - // reset the connection as it might have timed out - connManager.discardConnection(true); TableDefWriter tableWriter = new TableDefWriter(options, connManager, inputTableName, outputTableName, configuration, !debugMode); @@ -191,23 +171,10 @@ public void importTable(String inputTableName, String outputTableName, Path finalPath = tableWriter.getFinalPath(); if (!isGenerateOnly()) { - removeTempLogs(finalPath); + hiveClientCommon.removeTempLogs(configuration, finalPath); LOG.info("Loading uploaded data into Hive"); - String codec = options.getCompressionCodec(); - if (codec != null && (codec.equals(CodecMap.LZOP) - || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) { - try { - Tool tool = ReflectionUtils.newInstance(Class. - forName("com.hadoop.compression.lzo.DistributedLzoIndexer"). - asSubclass(Tool.class), configuration); - ToolRunner.run(configuration, tool, - new String[] { finalPath.toString() }); - } catch (Exception ex) { - LOG.error("Error indexing lzo files", ex); - throw new IOException("Error indexing lzo files", ex); - } - } + hiveClientCommon.indexLzoFiles(options, finalPath); } // write them to a script file. @@ -242,7 +209,7 @@ public void importTable(String inputTableName, String outputTableName, LOG.info("Hive import complete."); - cleanUp(finalPath); + hiveClientCommon.cleanUp(configuration, finalPath); } } finally { if (!isGenerateOnly()) { @@ -256,37 +223,6 @@ public void importTable(String inputTableName, String outputTableName, } } - /** - * Clean up after successful HIVE import. - * - * @param outputPath path to the output directory - * @throws IOException - */ - private void cleanUp(Path outputPath) throws IOException { - FileSystem fs = outputPath.getFileSystem(configuration); - - // HIVE is not always removing input directory after LOAD DATA statement - // (which is our export directory). We're removing export directory in case - // that is blank for case that user wants to periodically populate HIVE - // table (for example with --hive-overwrite). - try { - if (outputPath != null && fs.exists(outputPath)) { - FileStatus[] statuses = fs.listStatus(outputPath); - if (statuses.length == 0) { - LOG.info("Export directory is empty, removing it."); - fs.delete(outputPath, true); - } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) { - LOG.info("Export directory is contains the _SUCCESS file only, removing the directory."); - fs.delete(outputPath, true); - } else { - LOG.info("Export directory is not empty, keeping it."); - } - } - } catch(IOException e) { - LOG.error("Issue with cleaning (safe to ignore)", e); - } - } - @SuppressWarnings("unchecked") /** * Execute the script file via Hive. @@ -398,5 +334,28 @@ private String[] getHiveArgs(String... args) throws IOException { return newArgs.toArray(new String[newArgs.size()]); } + + @Override + public void importTable() throws IOException { + importTable(options.getTableName(), options.getHiveTableName(), false); + } + + @Override + public void createTable() throws IOException { + importTable(options.getTableName(), options.getHiveTableName(), true); + } + + SqoopOptions getOptions() { + return options; + } + + ConnManager getConnManager() { + return connManager; + } + + Configuration getConfiguration() { + return configuration; + } + } diff --git a/src/java/org/apache/sqoop/hive/HiveServer2Client.java b/src/java/org/apache/sqoop/hive/HiveServer2Client.java new file mode 100644 index 00000000..c4976c13 --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveServer2Client.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.db.JdbcConnectionFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import static java.util.Arrays.asList; + +public class HiveServer2Client implements HiveClient { + + private static final Log LOG = LogFactory.getLog(HiveServer2Client.class.getName()); + + private final SqoopOptions sqoopOptions; + + private final TableDefWriter tableDefWriter; + + private final JdbcConnectionFactory hs2ConnectionFactory; + + private final HiveClientCommon hiveClientCommon; + + public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory, HiveClientCommon hiveClientCommon) { + this.sqoopOptions = sqoopOptions; + this.tableDefWriter = tableDefWriter; + this.hs2ConnectionFactory = hs2ConnectionFactory; + this.hiveClientCommon = hiveClientCommon; + } + + public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory) { + this(sqoopOptions, tableDefWriter, hs2ConnectionFactory, new HiveClientCommon()); + } + + @Override + public void importTable() throws IOException { + LOG.info("Loading uploaded data into Hive."); + String createTableStmt = tableDefWriter.getCreateTableStmt(); + String loadDataStmt = tableDefWriter.getLoadDataStmt(); + executeHiveImport(asList(createTableStmt, loadDataStmt)); + LOG.info("Hive import complete."); + } + + @Override + public void createTable() throws IOException { + LOG.info("Creating Hive table: " + tableDefWriter.getOutputTableName()); + String createTableStmt = tableDefWriter.getCreateTableStmt(); + executeHiveImport(asList(createTableStmt)); + LOG.info("Hive table is successfully created."); + } + + void executeHiveImport(List commands) throws IOException { + Path finalPath = tableDefWriter.getFinalPath(); + + hiveClientCommon.removeTempLogs(sqoopOptions.getConf(), finalPath); + + hiveClientCommon.indexLzoFiles(sqoopOptions, finalPath); + + try { + executeCommands(commands); + } catch (SQLException e) { + throw new RuntimeException("Error executing Hive import.", e); + } + + hiveClientCommon.cleanUp(sqoopOptions.getConf(), finalPath); + } + + void executeCommands(List commands) throws SQLException { + try (Connection hs2Connection = hs2ConnectionFactory.createConnection()) { + for (String command : commands) { + LOG.debug("Executing command: " + command); + try (PreparedStatement statement = hs2Connection.prepareStatement(command)) { + statement.execute(); + } + } + } + } + + SqoopOptions getSqoopOptions() { + return sqoopOptions; + } + + TableDefWriter getTableDefWriter() { + return tableDefWriter; + } + + JdbcConnectionFactory getHs2ConnectionFactory() { + return hs2ConnectionFactory; + } +} diff --git a/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java new file mode 100644 index 00000000..10515a0d --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.db.DriverManagerJdbcConnectionFactory; + +import java.io.IOException; +import java.sql.Connection; + +import static org.apache.commons.lang3.StringUtils.EMPTY; + +public class HiveServer2ConnectionFactory extends DriverManagerJdbcConnectionFactory { + + private static final Log LOG = LogFactory.getLog(HiveServer2ConnectionFactory.class.getName()); + + private static final String HS2_DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver"; + + public HiveServer2ConnectionFactory(String connectionString, String username, String password) { + super(HS2_DRIVER_CLASS, connectionString, username, password); + } + + public HiveServer2ConnectionFactory(String connectionString, String username) { + this(connectionString, username, null); + } + + @Override + public Connection createConnection() { + LOG.info("Creating connection to HiveServer2 as: " + getCurrentUser()); + return super.createConnection(); + } + + private String getCurrentUser() { + try { + return UserGroupInformation.getCurrentUser().toString(); + } catch (IOException e) { + LOG.error("Unable to determine current user.", e); + } + return EMPTY; + } + +} diff --git a/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java new file mode 100644 index 00000000..1d959f9d --- /dev/null +++ b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.authentication.KerberosAuthenticator; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator; + +import java.io.IOException; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class HiveServer2ConnectionFactoryInitializer { + + public JdbcConnectionFactory createJdbcConnectionFactory(SqoopOptions sqoopOptions) { + String connectionUsername = determineConnectionUsername(sqoopOptions); + JdbcConnectionFactory connectionFactory = new HiveServer2ConnectionFactory(sqoopOptions.getHs2Url(), connectionUsername); + if (useKerberizedConnection(sqoopOptions)) { + KerberosAuthenticator authenticator = createKerberosAuthenticator(sqoopOptions); + connectionFactory = new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator); + } + return connectionFactory; + } + + private String determineConnectionUsername(SqoopOptions sqoopOptions) { + if (!isEmpty(sqoopOptions.getHs2User())) { + return sqoopOptions.getHs2User(); + } + try { + return UserGroupInformation.getLoginUser().getUserName(); + } catch (IOException e) { + throw new RuntimeException("Unable to determine login user.", e); + } + } + + private KerberosAuthenticator createKerberosAuthenticator(SqoopOptions sqoopOptions) { + return new KerberosAuthenticator(sqoopOptions.getConf(), sqoopOptions.getHs2User(), sqoopOptions.getHs2Keytab()); + } + + private boolean useKerberizedConnection(SqoopOptions sqoopOptions) { + return !isBlank(sqoopOptions.getHs2Keytab()); + } + +} diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java index b7a25b78..27d988c5 100644 --- a/src/java/org/apache/sqoop/hive/TableDefWriter.java +++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java @@ -96,6 +96,7 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr, * @return the CREATE TABLE statement for the table to load into hive. */ public String getCreateTableStmt() throws IOException { + resetConnManager(); Map columnTypes; Properties userMapping = options.getMapColumnHive(); Boolean isHiveExternalTableSet = !StringUtils.isBlank(options.getHiveExternalTableDir()); @@ -286,5 +287,38 @@ public static String getHiveOctalCharCode(int charNum) { return String.format("\\%03o", charNum); } + /** + * The JDBC connection owned by the ConnManager has been most probably opened when the import was started + * so it might have timed out by the time TableDefWriter methods are invoked which happens at the end of import. + * The task of this method is to discard the current connection held by ConnManager to make sure + * that TableDefWriter will have a working one. + */ + private void resetConnManager() { + this.connManager.discardConnection(true); + } + + SqoopOptions getOptions() { + return options; + } + + ConnManager getConnManager() { + return connManager; + } + + Configuration getConfiguration() { + return configuration; + } + + String getInputTableName() { + return inputTableName; + } + + String getOutputTableName() { + return outputTableName; + } + + boolean isCommentsEnabled() { + return commentsEnabled; + } } diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index b02e4fe7..783651a4 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -18,6 +18,8 @@ package org.apache.sqoop.tool; +import static java.lang.String.format; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -131,6 +133,9 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { public static final String HCATALOG_STORAGE_STANZA_ARG = "hcatalog-storage-stanza"; public static final String HCATALOG_HOME_ARG = "hcatalog-home"; + public static final String HS2_URL_ARG = "hs2-url"; + public static final String HS2_USER_ARG = "hs2-user"; + public static final String HS2_KEYTAB_ARG = "hs2-keytab"; public static final String MAPREDUCE_JOB_NAME = "mapreduce-job-name"; public static final String NUM_MAPPERS_ARG = "num-mappers"; public static final String NUM_MAPPERS_SHORT_ARG = "m"; @@ -609,6 +614,21 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) { + " types.") .withLongOpt(MAP_COLUMN_HIVE) .create()); + hiveOpts.addOption(OptionBuilder + .hasArg() + .withDescription("The URL to the HiveServer2.") + .withLongOpt(HS2_URL_ARG) + .create()); + hiveOpts.addOption(OptionBuilder + .hasArg() + .withDescription("The user/principal for HiveServer2.") + .withLongOpt(HS2_USER_ARG) + .create()); + hiveOpts.addOption(OptionBuilder + .hasArg() + .withDescription("The location of the keytab of the HiveServer2 user.") + .withLongOpt(HS2_KEYTAB_ARG) + .create()); return hiveOpts; } @@ -1238,6 +1258,15 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out) if (in.hasOption(HIVE_EXTERNAL_TABLE_LOCATION_ARG)) { out.setHiveExternalTableDir(in.getOptionValue(HIVE_EXTERNAL_TABLE_LOCATION_ARG)); } + if (in.hasOption(HS2_URL_ARG)) { + out.setHs2Url(in.getOptionValue(HS2_URL_ARG)); + } + if (in.hasOption(HS2_USER_ARG)) { + out.setHs2User(in.getOptionValue(HS2_USER_ARG)); + } + if (in.hasOption(HS2_KEYTAB_ARG)) { + out.setHs2Keytab(in.getOptionValue(HS2_KEYTAB_ARG)); + } } @@ -1618,6 +1647,8 @@ protected void validateHiveOptions(SqoopOptions options) throw new InvalidOptionsException("Importing to external Hive table requires --hive-import parameter to be set." + HELP_STR); } + + validateHS2Options(options); } protected void validateAccumuloOptions(SqoopOptions options) @@ -1851,5 +1882,26 @@ protected void validateHasDirectConnectorOption(SqoopOptions options) throws Sqo "Was called with the --direct option, but no direct connector available."); } } -} + protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException { + final String withoutTemplate = "The %s option cannot be used without the %s option."; + final String withTemplate = "The %s option cannot be used with the %s option."; + + if (isSet(options.getHs2Url()) && !options.doHiveImport()) { + throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG)); + } + + if (isSet(options.getHs2User()) && !isSet(options.getHs2Url())) { + throw new InvalidOptionsException(format(withoutTemplate, HS2_USER_ARG, HS2_URL_ARG)); + } + + if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) { + throw new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG)); + } + + if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) { + throw new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG)); + } + + } +} diff --git a/src/java/org/apache/sqoop/tool/CreateHiveTableTool.java b/src/java/org/apache/sqoop/tool/CreateHiveTableTool.java index d2595661..02f5d2d0 100644 --- a/src/java/org/apache/sqoop/tool/CreateHiveTableTool.java +++ b/src/java/org/apache/sqoop/tool/CreateHiveTableTool.java @@ -30,7 +30,8 @@ import org.apache.sqoop.SqoopOptions.InvalidOptionsException; import org.apache.sqoop.cli.RelatedOptions; import org.apache.sqoop.cli.ToolOptions; -import org.apache.sqoop.hive.HiveImport; +import org.apache.sqoop.hive.HiveClient; +import org.apache.sqoop.hive.HiveClientFactory; /** * Tool that creates a Hive table definition. @@ -40,8 +41,15 @@ public class CreateHiveTableTool extends BaseSqoopTool { public static final Log LOG = LogFactory.getLog( CreateHiveTableTool.class.getName()); - public CreateHiveTableTool() { + private final HiveClientFactory hiveClientFactory; + + public CreateHiveTableTool(HiveClientFactory hiveClientFactory) { super("create-hive-table"); + this.hiveClientFactory = hiveClientFactory; + } + + public CreateHiveTableTool() { + this(new HiveClientFactory()); } @Override @@ -52,10 +60,8 @@ public int run(SqoopOptions options) { } try { - HiveImport hiveImport = new HiveImport(options, manager, - options.getConf(), false); - hiveImport.importTable(options.getTableName(), - options.getHiveTableName(), true); + HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager); + hiveClient.createTable(); } catch (IOException ioe) { LOG.error("Encountered IOException running create table job: " + StringUtils.stringifyException(ioe)); diff --git a/src/java/org/apache/sqoop/tool/ImportAllTablesTool.java b/src/java/org/apache/sqoop/tool/ImportAllTablesTool.java index 18f7a0af..6fb4a667 100644 --- a/src/java/org/apache/sqoop/tool/ImportAllTablesTool.java +++ b/src/java/org/apache/sqoop/tool/ImportAllTablesTool.java @@ -31,7 +31,6 @@ import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.SqoopOptions.InvalidOptionsException; import org.apache.sqoop.cli.RelatedOptions; -import org.apache.sqoop.hive.HiveImport; import org.apache.sqoop.util.ImportException; /** @@ -75,7 +74,6 @@ public void applyOptions(CommandLine in, SqoopOptions out) @Override /** {@inheritDoc} */ public int run(SqoopOptions options) { - HiveImport hiveImport = null; Set excludes = new HashSet(); if (!init(options)) { @@ -83,9 +81,6 @@ public int run(SqoopOptions options) { } try { - if (options.doHiveImport()) { - hiveImport = new HiveImport(options, manager, options.getConf(), false); - } if (options.getAllTablesExclude() != null) { excludes.addAll(Arrays.asList(options.getAllTablesExclude().split(","))); @@ -102,7 +97,8 @@ public int run(SqoopOptions options) { System.out.println("Skipping table: " + tableName); } else { SqoopOptions clonedOptions = (SqoopOptions) options.clone(); - importTable(clonedOptions, tableName, hiveImport); + clonedOptions.setTableName(tableName); + importTable(clonedOptions); } } } diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index e9920058..ee79d8b7 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -42,7 +42,8 @@ import org.apache.sqoop.SqoopOptions.InvalidOptionsException; import org.apache.sqoop.cli.RelatedOptions; import org.apache.sqoop.cli.ToolOptions; -import org.apache.sqoop.hive.HiveImport; +import org.apache.sqoop.hive.HiveClient; +import org.apache.sqoop.hive.HiveClientFactory; import org.apache.sqoop.manager.ImportJobContext; import org.apache.sqoop.mapreduce.MergeJob; import org.apache.sqoop.metastore.JobData; @@ -78,18 +79,21 @@ public class ImportTool extends BaseSqoopTool { // Set classloader for local job runner private ClassLoader prevClassLoader = null; + private final HiveClientFactory hiveClientFactory; + public ImportTool() { this("import", false); } public ImportTool(String toolName, boolean allTables) { - this(toolName, new CodeGenTool(), allTables); + this(toolName, new CodeGenTool(), allTables, new HiveClientFactory()); } - public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) { + public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables, HiveClientFactory hiveClientFactory) { super(toolName); this.codeGenerator = codeGenerator; this.allTables = allTables; + this.hiveClientFactory = hiveClientFactory; } @Override @@ -499,17 +503,16 @@ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) * Import a table or query. * @return true if an import was performed, false otherwise. */ - protected boolean importTable(SqoopOptions options, String tableName, - HiveImport hiveImport) throws IOException, ImportException { + protected boolean importTable(SqoopOptions options) throws IOException, ImportException { String jarFile = null; // Generate the ORM code for the tables. - jarFile = codeGenerator.generateORM(options, tableName); + jarFile = codeGenerator.generateORM(options, options.getTableName()); - Path outputPath = getOutputPath(options, tableName); + Path outputPath = getOutputPath(options, options.getTableName()); // Do the actual import. - ImportJobContext context = new ImportJobContext(tableName, jarFile, + ImportJobContext context = new ImportJobContext(options.getTableName(), jarFile, options, outputPath); // If we're doing an incremental import, set up the @@ -522,7 +525,7 @@ protected boolean importTable(SqoopOptions options, String tableName, deleteTargetDir(context); } - if (null != tableName) { + if (null != options.getTableName()) { manager.importTable(context); } else { manager.importQuery(context); @@ -540,7 +543,8 @@ protected boolean importTable(SqoopOptions options, String tableName, // For Parquet file, the import action will create hive table directly via // kite. So there is no need to do hive import as a post step again. if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) { - hiveImport.importTable(tableName, options.getHiveTableName(), false); + HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager); + hiveClient.importTable(); } } @@ -609,8 +613,6 @@ private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) @Override /** {@inheritDoc} */ public int run(SqoopOptions options) { - HiveImport hiveImport = null; - if (allTables) { // We got into this method, but we should be in a subclass. // (This method only handles a single table) @@ -626,12 +628,8 @@ public int run(SqoopOptions options) { codeGenerator.setManager(manager); try { - if (options.doHiveImport()) { - hiveImport = new HiveImport(options, manager, options.getConf(), false); - } - // Import a single table (or query) the user specified. - importTable(options, options.getTableName(), hiveImport); + importTable(options); } catch (IllegalArgumentException iea) { LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage()); rethrowIfRequired(options, iea); diff --git a/src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java b/src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java new file mode 100644 index 00000000..4d2cb2f8 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator; +import org.assertj.core.api.SoftAssertions; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HiveServer2ConnectionFactoryInitializerTest { + + private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default"; + + private static final String TEST_HS2_USER = "testuser"; + + private static final String TEST_HS2_KEYTAB = "testkeytab"; + + private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer; + + private SqoopOptions sqoopOptions; + + private Configuration configuration; + + private SoftAssertions softly; + + @Before + public void before() { + connectionFactoryInitializer = new HiveServer2ConnectionFactoryInitializer(); + sqoopOptions = mock(SqoopOptions.class); + configuration = mock(Configuration.class); + softly = new SoftAssertions(); + + when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER); + when(sqoopOptions.getConf()).thenReturn(configuration); + } + + @Test + public void testCreateJdbcConnectionFactoryWithoutKerberosConfiguredReturnsHiveServer2ConnectionFactory() throws Exception { + JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + assertThat(connectionFactory, instanceOf(HiveServer2ConnectionFactory.class)); + } + + @Test + public void testCreateJdbcConnectionFactoryInitializesConnectionStringProperly() throws Exception { + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + assertEquals(TEST_HS2_URL, connectionFactory.getConnectionString()); + } + + @Test + public void testCreateJdbcConnectionFactoryInitializesConnectionUsernameProperly() throws Exception { + HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + assertEquals(TEST_HS2_USER, connectionFactory.getUsername()); + } + + @Test + public void testCreateJdbcConnectionFactoryWithoutHs2UserSpecifiedInitializesConnectionUsernameProperly() throws Exception { + when(sqoopOptions.getHs2User()).thenReturn(null); + String expectedUsername = UserGroupInformation.getLoginUser().getUserName(); + HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + assertEquals(expectedUsername, connectionFactory.getUsername()); + } + + @Test + public void testCreateJdbcConnectionFactoryWithKerberosConfiguredReturnsKerberizedConnectionFactoryDecorator() throws Exception { + when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB); + + JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + assertThat(connectionFactory, instanceOf(KerberizedConnectionFactoryDecorator.class)); + } + + @Test + public void testCreateJdbcConnectionFactoryWithKerberosConfiguredInitializesDecoratorProperly() throws Exception { + when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB); + + KerberizedConnectionFactoryDecorator connectionFactory = (KerberizedConnectionFactoryDecorator) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions); + + softly.assertThat(connectionFactory.getDecorated()).isInstanceOf(HiveServer2ConnectionFactory.class); + softly.assertThat(connectionFactory.getAuthenticator().getConfiguration()).isSameAs(configuration); + softly.assertThat(connectionFactory.getAuthenticator().getPrincipal()).isEqualTo(TEST_HS2_USER); + softly.assertThat(connectionFactory.getAuthenticator().getKeytabLocation()).isEqualTo(TEST_HS2_KEYTAB); + + softly.assertAll(); + } + +} \ No newline at end of file diff --git a/src/test/org/apache/sqoop/hive/TestHiveClientFactory.java b/src/test/org/apache/sqoop/hive/TestHiveClientFactory.java new file mode 100644 index 00000000..a3c2dc93 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveClientFactory.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.manager.ConnManager; +import org.assertj.core.api.SoftAssertions; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestHiveClientFactory { + + private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default"; + + private static final String TEST_TABLE_NAME = "testTableName"; + + private static final String TEST_HIVE_TABLE_NAME = "testHiveTableName"; + + private HiveClientFactory hiveClientFactory; + + private ConnManager connectionManager; + + private SqoopOptions sqoopOptions; + + private Configuration configuration; + + private JdbcConnectionFactory jdbcConnectionFactory; + + private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer; + + private SoftAssertions softly; + + @Before + public void before() { + connectionFactoryInitializer = mock(HiveServer2ConnectionFactoryInitializer.class); + hiveClientFactory = new HiveClientFactory(connectionFactoryInitializer); + softly = new SoftAssertions(); + + connectionManager = mock(ConnManager.class); + sqoopOptions = mock(SqoopOptions.class); + configuration = mock(Configuration.class); + jdbcConnectionFactory = mock(JdbcConnectionFactory.class); + + when(sqoopOptions.getConf()).thenReturn(configuration); + when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE_NAME); + when(sqoopOptions.getHiveTableName()).thenReturn(TEST_HIVE_TABLE_NAME); + } + + @Test + public void testCreateHiveClientCreatesHiveImportWhenHs2UrlIsNotProvided() throws Exception { + HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager); + assertThat(hiveClient, instanceOf(HiveImport.class)); + } + + @Test + public void testCreateHiveClientInitializesHiveImportProperly() throws Exception { + HiveImport hiveImport = (HiveImport) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager); + + softly.assertThat(hiveImport.getOptions()).isSameAs(sqoopOptions); + softly.assertThat(hiveImport.getConnManager()).isSameAs(connectionManager); + softly.assertThat(hiveImport.getConfiguration()).isSameAs(configuration); + softly.assertThat(hiveImport.isGenerateOnly()).isFalse(); + softly.assertAll(); + } + + @Test + public void testCreateHiveClientCreatesHiveServer2ClientWhenHs2UrlIsProvided() throws Exception { + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager); + assertThat(hiveClient, instanceOf(HiveServer2Client.class)); + } + + @Test + public void testCreateHiveClientInitializesHiveServer2ClientProperly() throws Exception { + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + when(connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions)).thenReturn(jdbcConnectionFactory); + + HiveServer2Client hs2Client = (HiveServer2Client) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager); + + softly.assertThat(hs2Client.getSqoopOptions()).isSameAs(sqoopOptions); + softly.assertThat(hs2Client.getHs2ConnectionFactory()).isSameAs(jdbcConnectionFactory); + softly.assertThat(hs2Client.getTableDefWriter().getOptions()).isSameAs(sqoopOptions); + softly.assertThat(hs2Client.getTableDefWriter().getConnManager()).isSameAs(connectionManager); + softly.assertThat(hs2Client.getTableDefWriter().getInputTableName()).isEqualTo(TEST_TABLE_NAME); + softly.assertThat(hs2Client.getTableDefWriter().getOutputTableName()).isEqualTo(TEST_HIVE_TABLE_NAME); + softly.assertThat(hs2Client.getTableDefWriter().getConfiguration()).isSameAs(configuration); + softly.assertThat(hs2Client.getTableDefWriter().isCommentsEnabled()).isFalse(); + + softly.assertAll(); + } + +} \ No newline at end of file diff --git a/src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java b/src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java new file mode 100644 index 00000000..419f888c --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.hive.minicluster.AuthenticationConfiguration; +import org.apache.sqoop.hive.minicluster.HiveMiniCluster; +import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration; +import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration; +import org.apache.sqoop.hive.minicluster.PasswordAuthenticationConfiguration; +import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestHiveMiniCluster { + + @ClassRule + public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule(); + + private static final String TEST_USERNAME = "sqoop"; + + private static final String TEST_PASSWORD = "secret"; + + @Parameters(name = "config = {0}") + public static Iterable authenticationParameters() { + return Arrays.asList(new NoAuthenticationConfiguration(), + new PasswordAuthenticationConfiguration(TEST_USERNAME, TEST_PASSWORD), + new KerberosAuthenticationConfiguration(miniKdcInfrastructure)); + } + + private static final String CREATE_TABLE_SQL = "CREATE TABLE TestTable (id int)"; + + private static final String INSERT_SQL = "INSERT INTO TestTable VALUES (?)"; + + private static final String SELECT_SQL = "SELECT * FROM TestTable"; + + private static final int TEST_VALUE = 10; + + private final AuthenticationConfiguration authenticationConfiguration; + + private HiveMiniCluster hiveMiniCluster; + + private JdbcConnectionFactory connectionFactory; + + public TestHiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) { + this.authenticationConfiguration = authenticationConfiguration; + } + + @Before + public void before() throws SQLException { + hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration); + hiveMiniCluster.start(); + + connectionFactory = authenticationConfiguration.decorateConnectionFactory(new HiveServer2ConnectionFactory(hiveMiniCluster.getUrl(), TEST_USERNAME, TEST_PASSWORD)); + } + + @Test + public void testInsertedRowCanBeReadFromTable() throws Exception { + createTestTable(); + insertRowIntoTestTable(); + + assertEquals(TEST_VALUE, getDataFromTestTable()); + } + + private void insertRowIntoTestTable() throws SQLException { + try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(INSERT_SQL)) { + stmnt.setInt(1, TEST_VALUE); + stmnt.executeUpdate(); + } + } + + private int getDataFromTestTable() throws SQLException { + try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(SELECT_SQL)) { + ResultSet resultSet = stmnt.executeQuery(); + resultSet.next(); + return resultSet.getInt(1); + } + } + + private void createTestTable() throws SQLException { + try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(CREATE_TABLE_SQL)) { + stmnt.executeUpdate(); + } + } + + @After + public void after() { + hiveMiniCluster.stop(); + UserGroupInformation.setConfiguration(new Configuration()); + } + +} diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2Client.java b/src/test/org/apache/sqoop/hive/TestHiveServer2Client.java new file mode 100644 index 00000000..02617295 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveServer2Client.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestHiveServer2Client { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private static final String CREATE_TABLE_STATEMENT = "createTableStatement"; + + private static final String LOAD_DATA_STATEMENT = "loadDataStatement"; + + private static final List TEST_COMMANDS = asList("command1", "command2", "command3"); + + private HiveServer2Client hs2Client; + + private HiveServer2Client hs2ClientSpy; + + private SqoopOptions sqoopOptions; + + private TableDefWriter tableDefWriter; + + private JdbcConnectionFactory hs2ConnectionFactory; + + private Connection hs2Connection; + + private PreparedStatement preparedStatement; + + private HiveClientCommon hiveClientCommon; + + private Path finalPath; + + private Configuration configuration; + + @Before + public void before() throws Exception { + sqoopOptions = mock(SqoopOptions.class); + tableDefWriter = mock(TableDefWriter.class); + hs2ConnectionFactory = mock(JdbcConnectionFactory.class); + hs2Connection = mock(Connection.class); + preparedStatement = mock(PreparedStatement.class); + hiveClientCommon = mock(HiveClientCommon.class); + finalPath = mock(Path.class); + configuration = mock(Configuration.class); + + when(sqoopOptions.getConf()).thenReturn(configuration); + when(hs2ConnectionFactory.createConnection()).thenReturn(hs2Connection); + when(hs2Connection.prepareStatement(anyString())).thenReturn(preparedStatement); + + when(tableDefWriter.getCreateTableStmt()).thenReturn(CREATE_TABLE_STATEMENT); + when(tableDefWriter.getLoadDataStmt()).thenReturn(LOAD_DATA_STATEMENT); + when(tableDefWriter.getFinalPath()).thenReturn(finalPath); + + hs2Client = new HiveServer2Client(sqoopOptions, tableDefWriter, hs2ConnectionFactory, hiveClientCommon); + hs2ClientSpy = spy(hs2Client); + } + + @Test + public void testImportTableExecutesHiveImportWithCreateTableAndLoadDataCommands() throws Exception { + doNothing().when(hs2ClientSpy).executeHiveImport(anyList()); + + hs2ClientSpy.importTable(); + + verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT, LOAD_DATA_STATEMENT)); + } + + @Test + public void testCreateTableExecutesHiveImportWithCreateTableCommandOnly() throws Exception { + doNothing().when(hs2ClientSpy).executeHiveImport(anyList()); + + hs2ClientSpy.createTable(); + + verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT)); + } + + @Test + public void testExecuteHiveImportInvokesMethodsInCorrectSequence() throws Exception { + InOrder inOrder = Mockito.inOrder(hiveClientCommon, hs2ClientSpy); + doNothing().when(hs2ClientSpy).executeCommands(TEST_COMMANDS); + + hs2ClientSpy.executeHiveImport(TEST_COMMANDS); + + inOrder.verify(hiveClientCommon).removeTempLogs(configuration, finalPath); + inOrder.verify(hiveClientCommon).indexLzoFiles(sqoopOptions, finalPath); + inOrder.verify(hs2ClientSpy).executeCommands(TEST_COMMANDS); + inOrder.verify(hiveClientCommon).cleanUp(configuration, finalPath); + } + + @Test + public void testExecuteHiveImportThrowsRuntimeExceptionWhenExecuteCommandsThrows() throws Exception { + SQLException sqlException = mock(SQLException.class); + doThrow(sqlException).when(hs2ClientSpy).executeCommands(anyList()); + + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Error executing Hive import."); + + hs2ClientSpy.executeHiveImport(TEST_COMMANDS); + } + + @Test + public void testExecuteCommandsCreatesExactlyOneConnection() throws Exception { + hs2Client.executeCommands(TEST_COMMANDS); + + verify(hs2ConnectionFactory, times(1)).createConnection(); + } + + @Test + public void testExecuteCommandsClosesConnectionWhenStatementExecutionIsSuccessful() throws Exception { + hs2Client.executeCommands(TEST_COMMANDS); + + verify(hs2Connection).close(); + } + + @Test + public void testExecuteCommandsClosesConnectionWhenStatementExecutionThrows() throws Exception { + when(hs2Connection.prepareStatement(anyString())).thenThrow(new SQLException()); + + expectedException.expect(SQLException.class); + hs2Client.executeCommands(TEST_COMMANDS); + + verify(hs2Connection).close(); + } + + @Test + public void testExecuteCommandsClosesPreparedStatementsWhenStatementExecutionIsSuccessful() throws Exception { + hs2Client.executeCommands(TEST_COMMANDS); + + verify(preparedStatement, times(TEST_COMMANDS.size())).close(); + } + + @Test + public void testExecuteCommandsClosesPreparedStatementWhenStatementExecutionThrows() throws Exception { + when(preparedStatement.execute()).thenThrow(new SQLException()); + + expectedException.expect(SQLException.class); + hs2Client.executeCommands(TEST_COMMANDS); + + verify(preparedStatement).close(); + } + + @Test + public void testExecuteCommandsThrowsWhenCreateConnectionThrows() throws Exception { + RuntimeException expected = mock(RuntimeException.class); + when(hs2ConnectionFactory.createConnection()).thenThrow(expected); + + expectedException.expect(equalTo(expected)); + hs2Client.executeCommands(TEST_COMMANDS); + } + + @Test + public void testExecuteCommandsThrowsWhenPrepareStatementThrows() throws Exception { + SQLException expected = mock(SQLException.class); + when(hs2Connection.prepareStatement(anyString())).thenThrow(expected); + + expectedException.expect(equalTo(expected)); + hs2Client.executeCommands(TEST_COMMANDS); + } + + @Test + public void testExecuteCommandsThrowsWhenExecuteStatementThrows() throws Exception { + SQLException expected = mock(SQLException.class); + when(preparedStatement.execute()).thenThrow(expected); + + expectedException.expect(equalTo(expected)); + hs2Client.executeCommands(TEST_COMMANDS); + } + +} diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java b/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java new file mode 100644 index 00000000..f6d591b7 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.sqoop.hive.minicluster.HiveMiniCluster; +import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration; +import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule; +import org.apache.sqoop.testutil.ArgumentArrayBuilder; +import org.apache.sqoop.testutil.HiveServer2TestUtil; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestHiveServer2TextImport extends ImportJobTestCase { + + @ClassRule + public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule(); + + private HiveMiniCluster hiveMiniCluster; + + private HiveServer2TestUtil hiveServer2TestUtil; + + @Override + @Before + public void setUp() { + super.setUp(); + KerberosAuthenticationConfiguration authenticationConfiguration = new KerberosAuthenticationConfiguration(miniKdcInfrastructure); + hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration); + hiveMiniCluster.start(); + hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl()); + } + + @Override + @After + public void tearDown() { + super.tearDown(); + hiveMiniCluster.stop(); + } + + @Test + public void testImport() throws Exception { + List columnValues = Arrays.asList("test", 42, "somestring"); + + String[] types = {"VARCHAR(32)", "INTEGER", "CHAR(64)"}; + createTableWithColTypes(types, toStringArray(columnValues)); + + String[] args = new ArgumentArrayBuilder() + .withProperty(YarnConfiguration.RM_PRINCIPAL, miniKdcInfrastructure.getTestPrincipal()) + .withOption("connect", getConnectString()) + .withOption("table", getTableName()) + .withOption("hive-import") + .withOption("hs2-url", hiveMiniCluster.getUrl()) + .withOption("split-by", getColName(1)) + .build(); + + runImport(args); + + List> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()); + assertEquals(columnValues, rows.get(0)); + } + + private String[] toStringArray(List columnValues) { + String[] result = new String[columnValues.size()]; + + for (int i = 0; i < columnValues.size(); i++) { + if (columnValues.get(i) instanceof String) { + result[i] = StringUtils.wrap((String) columnValues.get(i), '\''); + } else { + result[i] = columnValues.get(i).toString(); + } + } + + return result; + } + +} diff --git a/src/test/org/apache/sqoop/hive/TestTableDefWriter.java b/src/test/org/apache/sqoop/hive/TestTableDefWriter.java index 8bdc3beb..3ea61f64 100644 --- a/src/test/org/apache/sqoop/hive/TestTableDefWriter.java +++ b/src/test/org/apache/sqoop/hive/TestTableDefWriter.java @@ -42,6 +42,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -248,6 +249,13 @@ public void testHiveDatabase() throws Exception { assertTrue(createTable.contains("`db`.`outputTable`")); } + @Test + public void testGetCreateTableStmtDiscardsConnection() throws Exception { + writer.getCreateTableStmt(); + + verify(connManager).discardConnection(true); + } + private void setUpMockConnManager(String tableName, Map typeMap) { when(connManager.getColumnTypes(tableName)).thenReturn(typeMap); when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{})); diff --git a/src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java new file mode 100644 index 00000000..41713374 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive.minicluster; + +import org.apache.sqoop.db.JdbcConnectionFactory; + +import java.security.PrivilegedAction; +import java.util.Map; + +public interface AuthenticationConfiguration { + + Map getAuthenticationConfig(); + + String getUrlParams(); + + T doAsAuthenticated(PrivilegedAction action); + + void init(); + + JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory); + +} diff --git a/src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java b/src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java new file mode 100644 index 00000000..19bb7605 --- /dev/null +++ b/src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive.minicluster; + +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.Service; +import org.apache.hive.service.server.HiveServer2; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +public class HiveMiniCluster { + + private static final Log LOG = LogFactory.getLog(HiveMiniCluster.class.getName()); + + private static final String DEFAULT_HOST = "127.0.0.1"; + + private static final int DEFAULT_PORT = 10000; + + private final String hostName; + + private final int port; + + private final String tempFolderPath; + + private final AuthenticationConfiguration authenticationConfiguration; + + private final HiveServer2 hiveServer2; + + private HiveConf config; + + public HiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) { + this(DEFAULT_HOST, DEFAULT_PORT, authenticationConfiguration); + } + + public HiveMiniCluster(String hostname, int port, AuthenticationConfiguration authenticationConfiguration) { + this(hostname, port, Files.createTempDir().getAbsolutePath(), authenticationConfiguration); + } + + public HiveMiniCluster(String hostname, int port, String tempFolderPath, AuthenticationConfiguration authenticationConfiguration) { + this.hostName = hostname; + this.port = port; + this.tempFolderPath = tempFolderPath; + this.authenticationConfiguration = authenticationConfiguration; + this.hiveServer2 = new HiveServer2(); + } + + private void createHiveConf() { + config = new HiveConf(); + config.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, tempFolderPath); + config.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getHostName()); + config.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, getPort()); + config.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, getMetastoreConnectUrl()); + + for (Map.Entry authConfig : authenticationConfiguration.getAuthenticationConfig().entrySet()) { + config.set(authConfig.getKey(), authConfig.getValue()); + } + } + + public void start() { + try { + authenticationConfiguration.init(); + createHiveConf(); + createHiveSiteXml(); + startHiveServer(); + waitForStartUp(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void createHiveSiteXml() throws IOException { + File hiveSiteXmlFile = new File(tempFolderPath,"hive-site.xml"); + try (OutputStream out = new FileOutputStream(hiveSiteXmlFile)) { + config.writeXml(out); + } + + HiveConf.setHiveSiteLocation(hiveSiteXmlFile.toURI().toURL()); + } + + private void startHiveServer() throws Exception { + authenticationConfiguration.doAsAuthenticated(new PrivilegedAction() { + @Override + public Void run() { + hiveServer2.init(config); + hiveServer2.start(); + return null; + } + }); + } + + public void stop() { + hiveServer2.stop(); + HiveConf.setHiveSiteLocation(null); + try { + FileUtils.deleteDirectory(new File(tempFolderPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public HiveConf getConfig() { + return config; + } + + public int getPort() { + return port; + } + + public String getHostName() { + return hostName; + } + + public String getUrl() { + return String.format("jdbc:hive2://%s:%d/default%s", hostName, port, authenticationConfiguration.getUrlParams()); + } + + public String getTempFolderPath() { + return tempFolderPath; + } + + public String getMetastoreConnectUrl() { + return String.format("jdbc:derby:;databaseName=%s/minicluster_metastore_db;create=true", tempFolderPath); + } + + public boolean isStarted() { + return hiveServer2.getServiceState() == Service.STATE.STARTED; + } + + private void waitForStartUp() throws InterruptedException, TimeoutException { + final int numberOfAttempts = 500; + final long sleepTime = 100; + for (int i = 0; i < numberOfAttempts; ++i) { + try { + LOG.debug("Attempt " + (i + 1) + " to access " + hostName + ":" + port); + new Socket(InetAddress.getByName(hostName), port).close(); + return; + } catch (RuntimeException | IOException e) { + LOG.debug("Failed to connect to " + hostName + ":" + port, e); + } + + Thread.sleep(sleepTime); + } + + throw new RuntimeException("Couldn't access new server: " + hostName + ":" + port); + } + +} diff --git a/src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java new file mode 100644 index 00000000..549a8c6c --- /dev/null +++ b/src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive.minicluster; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.sqoop.authentication.KerberosAuthenticator; +import org.apache.sqoop.db.JdbcConnectionFactory; +import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator; +import org.apache.sqoop.infrastructure.kerberos.KerberosConfigurationProvider; + +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; + +public class KerberosAuthenticationConfiguration implements AuthenticationConfiguration { + + private final KerberosConfigurationProvider kerberosConfig; + + private KerberosAuthenticator authenticator; + + public KerberosAuthenticationConfiguration(KerberosConfigurationProvider kerberosConfig) { + this.kerberosConfig = kerberosConfig; + } + + @Override + public Map getAuthenticationConfig() { + Map result = new HashMap<>(); + + result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, kerberosConfig.getTestPrincipal()); + result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB.varname, kerberosConfig.getKeytabFilePath()); + result.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.KERBEROS.toString()); + result.put(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, HiveAuthFactory.AuthTypes.KERBEROS.toString()); + result.put(YarnConfiguration.RM_PRINCIPAL, kerberosConfig.getTestPrincipal()); + + return result; + } + + @Override + public String getUrlParams() { + return ";principal=" + kerberosConfig.getTestPrincipal(); + } + + @Override + public T doAsAuthenticated(PrivilegedAction action) { + return authenticator.authenticate().doAs(action); + } + + @Override + public void init() { + authenticator = createKerberosAuthenticator(); + } + + @Override + public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) { + return new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator); + } + + private KerberosAuthenticator createKerberosAuthenticator() { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + KerberosAuthenticator result = new KerberosAuthenticator(conf, kerberosConfig.getTestPrincipal(), kerberosConfig.getKeytabFilePath()); + return result; + } + +} diff --git a/src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java new file mode 100644 index 00000000..20502c9f --- /dev/null +++ b/src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive.minicluster; + +import org.apache.commons.lang3.StringUtils; +import org.apache.sqoop.db.JdbcConnectionFactory; + +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.Map; + +public class NoAuthenticationConfiguration implements AuthenticationConfiguration { + @Override + public Map getAuthenticationConfig() { + return Collections.emptyMap(); + } + + @Override + public String getUrlParams() { + return StringUtils.EMPTY; + } + + @Override + public T doAsAuthenticated(PrivilegedAction action) { + return action.run(); + } + + @Override + public void init() { + // do nothing + } + + @Override + public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) { + return connectionFactory; + } +} diff --git a/src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java new file mode 100644 index 00000000..79881f7b --- /dev/null +++ b/src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.hive.minicluster; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.auth.PasswdAuthenticationProvider; +import org.apache.sqoop.db.JdbcConnectionFactory; + +import javax.security.sasl.AuthenticationException; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS; + +public class PasswordAuthenticationConfiguration implements AuthenticationConfiguration { + + private static String TEST_USERNAME; + + private static String TEST_PASSWORD; + + private static final class TestPasswordAuthenticationProvider implements PasswdAuthenticationProvider { + + @Override + public void Authenticate(String user, String password) throws AuthenticationException { + if (!(TEST_USERNAME.equals(user) && TEST_PASSWORD.equals(password))) { + throw new AuthenticationException("Authentication failed!"); + } + } + } + + public PasswordAuthenticationConfiguration(String testUsername, String testPassword) { + TEST_USERNAME = testUsername; + TEST_PASSWORD = testPassword; + } + + @Override + public Map getAuthenticationConfig() { + Map result = new HashMap<>(); + result.put(HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.CUSTOM.getAuthName()); + result.put(HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS.varname, TestPasswordAuthenticationProvider.class.getName()); + + return result; + } + + @Override + public String getUrlParams() { + return StringUtils.EMPTY; + } + + @Override + public T doAsAuthenticated(PrivilegedAction action) { + return action.run(); + } + + @Override + public void init() { + //do nothing + } + + @Override + public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) { + return connectionFactory; + } +} diff --git a/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java b/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java new file mode 100644 index 00000000..79937081 --- /dev/null +++ b/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.testutil; + +import org.apache.sqoop.hive.HiveServer2ConnectionFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +public class HiveServer2TestUtil { + + private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s"; + + private HiveServer2ConnectionFactory hs2ConnectionFactory; + + public HiveServer2TestUtil(String url) { + this(url, null, null); + } + + public HiveServer2TestUtil(String url, String username, String password) { + hs2ConnectionFactory = new HiveServer2ConnectionFactory(url, username, password); + } + + public List> loadRowsFromTable(String tableName) { + List> result = new ArrayList<>(); + try(Connection connection = hs2ConnectionFactory.createConnection(); + PreparedStatement query = connection.prepareStatement(String.format(SELECT_TABLE_QUERY, tableName))) { + + ResultSet resultSet = query.executeQuery(); + ResultSetMetaData metaData = resultSet.getMetaData(); + + while (resultSet.next()) { + LinkedHashMap row = new LinkedHashMap<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row.put(metaData.getColumnName(i), resultSet.getObject(i)); + } + result.add(row); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return result; + } + + public List> loadRawRowsFromTable(String tableName) { + List> result = new ArrayList<>(); + List> rowsWithColumnNames = loadRowsFromTable(tableName); + + for (LinkedHashMap rowWithColumnNames : rowsWithColumnNames) { + result.add(new ArrayList<>(rowWithColumnNames.values())); + } + + return result; + } + +} diff --git a/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java b/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java new file mode 100644 index 00000000..4d3f9389 --- /dev/null +++ b/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.tool; + +import org.apache.sqoop.SqoopOptions; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Properties; + +import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile; +import static org.apache.sqoop.SqoopOptions.IncrementalMode.None; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(Parameterized.class) +public class TestHiveServer2OptionValidations { + + @Parameters(name = "sqoopTool = {0}") + public static Iterable parameters() { + return Arrays.asList( + new ImportTool(), + new ImportAllTablesTool(), + new CreateHiveTableTool()); + } + + private static final String TEST_HS2_URL = "test-hs2-url"; + private static final String TEST_HS2_USER = "test-hs2-user"; + private static final String TEST_HS2_KEYTAB = "test-hs2-keytab"; + private static final String TEST_TABLE = "testtable"; + private static final String TEST_CONNECTION_STRING = "testconnectstring"; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final BaseSqoopTool sqoopTool; + + private SqoopOptions sqoopOptions; + + public TestHiveServer2OptionValidations(BaseSqoopTool sqoopTool) { + this.sqoopTool = spy(sqoopTool); + } + + @Before + public void before() { + sqoopOptions = mock(SqoopOptions.class); + when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE); + when(sqoopOptions.getIncrementalMode()).thenReturn(None); + when(sqoopOptions.getConnectString()).thenReturn(TEST_CONNECTION_STRING); + when(sqoopOptions.getMapColumnHive()).thenReturn(new Properties()); + + + doReturn(0).when(sqoopTool).getDashPosition(any(String[].class)); + } + + @Test + public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithoutHiveImport() throws Exception { + expectedException.expect(SqoopOptions.InvalidOptionsException.class); + expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option."); + + when(sqoopOptions.doHiveImport()).thenReturn(false); + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithHCatalogImport() throws Exception { + expectedException.expect(SqoopOptions.InvalidOptionsException.class); + expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option."); + + when(sqoopOptions.getHCatTableName()).thenReturn(TEST_TABLE); + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsThrowsWhenHs2UserIsUsedWithoutHs2Url() throws Exception { + expectedException.expect(SqoopOptions.InvalidOptionsException.class); + expectedException.expectMessage("The hs2-user option cannot be used without the hs2-url option."); + + when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsThrowsWhenHs2KeytabIsUsedWithoutHs2User() throws Exception { + expectedException.expect(SqoopOptions.InvalidOptionsException.class); + expectedException.expectMessage("The hs2-keytab option cannot be used without the hs2-user option."); + + when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImport() throws Exception { + when(sqoopOptions.doHiveImport()).thenReturn(true); + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImportAndHs2UserButWithoutHs2Keytab() throws Exception { + when(sqoopOptions.doHiveImport()).thenReturn(true); + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_URL); + + sqoopTool.validateOptions(sqoopOptions); + } + + @Test + public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception { + expectedException.expect(SqoopOptions.InvalidOptionsException.class); + expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option."); + + when(sqoopOptions.doHiveImport()).thenReturn(true); + when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL); + when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile); + + sqoopTool.validateOptions(sqoopOptions); + } + +} diff --git a/src/test/org/apache/sqoop/tool/TestImportTool.java b/src/test/org/apache/sqoop/tool/TestImportTool.java index 1c0cf4d8..3bdc5c65 100644 --- a/src/test/org/apache/sqoop/tool/TestImportTool.java +++ b/src/test/org/apache/sqoop/tool/TestImportTool.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -33,10 +32,10 @@ import java.sql.Connection; import org.apache.sqoop.SqoopOptions.InvalidOptionsException; -import org.apache.sqoop.hive.HiveImport; import org.apache.avro.Schema; import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.avro.AvroSchemaMismatchException; +import org.apache.sqoop.hive.HiveClientFactory; import org.apache.sqoop.util.ExpectedLogMessage; import org.junit.Assert; import org.junit.Rule; @@ -75,7 +74,7 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex final String actualSchemaString = "actualSchema"; final String errorMessage = "Import failed"; - ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false)); + ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false, mock(HiveClientFactory.class))); doReturn(true).when(importTool).init(any(SqoopOptions.class)); @@ -85,7 +84,7 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex when(actualSchema.toString()).thenReturn(actualSchemaString); AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema); - doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class), anyString(), any(HiveImport.class)); + doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class)); SqoopOptions sqoopOptions = mock(SqoopOptions.class); when(sqoopOptions.doHiveImport()).thenReturn(true);