5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-02 23:21:22 +08:00

SQOOP-3309: Implement HiveServer2 client

(Szabolcs Vasas via Boglarka Egyed)
This commit is contained in:
Boglarka Egyed 2018-04-18 15:59:33 +02:00
parent 44ac3012f1
commit 72c5cd717e
31 changed files with 1994 additions and 110 deletions

View File

@ -734,6 +734,9 @@
<!-- enable asserts in tests -->
<jvmarg value="-ea" />
<!-- We need to disable asserts in HadoopThriftAuthBridge to be able to run HiveMiniCluster tests. -->
<jvmarg value="-da:org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge" />
<jvmarg value="${remoteDebugJvmArgs}"/>
<sysproperty key="test.build.data" value="${build.test}/data"/>

View File

@ -195,6 +195,8 @@ under the License.
<exclude org="org.apache.avro" module="avro" />
</dependency>
<dependency org="org.apache.hive" name="hive-jdbc" rev="${hcatalog.version}" conf="common->default" />
<dependency org="org.apache.hive.hcatalog" name="hive-hcatalog-core"
rev="${hcatalog.version}" conf="common->default">
<artifact name="hive-hcatalog-core" type="jar"/>

View File

@ -43,5 +43,10 @@ Argument Description
Hive type for configured columns. If specify commas in\
this argument, use URL encoded keys and values, for example,\
use DECIMAL(1%2C%201) instead of DECIMAL(1, 1).
+\--hs2-url+ The JDBC connection string to HiveServer2 as you would specify in Beeline. If you use this option with \
--hive-import then Sqoop will try to connect to HiveServer2 instead of using Hive CLI.
+\--hs2-user+ The user for creating the JDBC connection to HiveServer2. The default is the current OS user.
+\--hs2-keytab+ The path to the keytab file of the user connecting to HiveServer2. If you choose another \
HiveServer2 user (with --hs2-user) then --hs2-keytab has to be also specified otherwise it can be omitted.
--------------------------------------------------------------------------

View File

@ -35,12 +35,22 @@ omitted, Sqoop will generate a Hive script containing a +CREATE TABLE+
operation defining your columns using Hive's types, and a +LOAD DATA INPATH+
statement to move the data files into Hive's warehouse directory.
The script will be executed by calling
The script can be executed in two ways:
- By the default the script will be executed by calling
the installed copy of hive on the machine where Sqoop is run. If you have
multiple Hive installations, or +hive+ is not in your +$PATH+, use the
*+\--hive-home+* option to identify the Hive installation directory.
Sqoop will use +$HIVE_HOME/bin/hive+ from here.
- If the user specifies the *+\--hs2-url+* parameter then the script will
be sent to HiveServer2 through a JDBC connection. Note that the data itself
will not be transferred via the JDBC connection it is written directly to HDFS
just like in case of the default hive import. As HiveServer2 provides proper
authorization and auditing features it is recommended to use this instead of
the default. Currently only Kerberos authentication and text file format is
supported with this option.
NOTE: This function is incompatible with +\--as-avrodatafile+ and
+\--as-sequencefile+.

View File

@ -449,6 +449,15 @@ public String toString() {
private String metaUsername;
private String metaPassword;
@StoredAsProperty("hs2.url")
private String hs2Url;
@StoredAsProperty("hs2.user")
private String hs2User;
@StoredAsProperty("hs2.keytab")
private String hs2Keytab;
public SqoopOptions() {
initDefaults(null);
}
@ -2892,5 +2901,29 @@ public void setMetaPassword(String metaPassword) {
this.metaPassword = metaPassword;
}
public String getHs2Url() {
return hs2Url;
}
public void setHs2Url(String hs2Url) {
this.hs2Url = hs2Url;
}
public String getHs2User() {
return hs2User;
}
public void setHs2User(String hs2User) {
this.hs2User = hs2User;
}
public String getHs2Keytab() {
return hs2Keytab;
}
public void setHs2Keytab(String hs2Keytab) {
this.hs2Keytab = hs2Keytab;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import java.io.IOException;
public interface HiveClient {
void importTable() throws IOException;
void createTable() throws IOException;
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.io.CodecMap;
import java.io.IOException;
/**
* Class containing the common logic for different HiveClient implementations.
*/
public class HiveClientCommon {
public static final Log LOG = LogFactory.getLog(HiveClientCommon.class.getName());
/**
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH.
*/
public void removeTempLogs(Configuration configuration, Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
if (!fs.delete(logsPath, true)) {
LOG.warn("Could not delete temporary files; "
+ "continuing with import, but it may fail.");
}
}
}
/**
* Clean up after successful HIVE import.
*
* @param outputPath path to the output directory
* @throws IOException
*/
public void cleanUp(Configuration configuration, Path outputPath) throws IOException {
FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
fs.delete(outputPath, true);
} else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}
}
} catch(IOException e) {
LOG.error("Issue with cleaning (safe to ignore)", e);
}
}
public void indexLzoFiles(SqoopOptions sqoopOptions, Path finalPath) throws IOException {
String codec = sqoopOptions.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
try {
Tool tool = ReflectionUtils.newInstance(Class.
forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
asSubclass(Tool.class), sqoopOptions.getConf());
ToolRunner.run(sqoopOptions.getConf(), tool,
new String[]{finalPath.toString()});
} catch (Exception ex) {
LOG.error("Error indexing lzo files", ex);
throw new IOException("Error indexing lzo files", ex);
}
}
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.authentication.KerberosAuthenticator;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
import org.apache.sqoop.manager.ConnManager;
import java.io.IOException;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
public class HiveClientFactory {
private final HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
public HiveClientFactory(HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer) {
this.connectionFactoryInitializer = connectionFactoryInitializer;
}
public HiveClientFactory() {
this(new HiveServer2ConnectionFactoryInitializer());
}
public HiveClient createHiveClient(SqoopOptions sqoopOptions, ConnManager connManager) {
if (useHiveCli(sqoopOptions)) {
return createHiveImport(sqoopOptions, connManager);
} else {
return createHiveServer2Client(sqoopOptions, connManager);
}
}
private HiveClient createHiveImport(SqoopOptions sqoopOptions, ConnManager connManager) {
return new HiveImport(sqoopOptions, connManager, sqoopOptions.getConf(), false);
}
private HiveClient createHiveServer2Client(SqoopOptions sqoopOptions, ConnManager connManager) {
TableDefWriter tableDefWriter = createTableDefWriter(sqoopOptions, connManager);
JdbcConnectionFactory hs2JdbcConnectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
return new HiveServer2Client(sqoopOptions, tableDefWriter, hs2JdbcConnectionFactory);
}
TableDefWriter createTableDefWriter(SqoopOptions sqoopOptions, ConnManager connManager) {
return new TableDefWriter(sqoopOptions, connManager, sqoopOptions.getTableName(), sqoopOptions.getHiveTableName(), sqoopOptions.getConf(), false);
}
private boolean useHiveCli(SqoopOptions sqoopOptions) {
return StringUtils.isEmpty(sqoopOptions.getHs2Url());
}
}

View File

@ -31,16 +31,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.sqoop.io.CodecMap;
import org.apache.sqoop.util.Executor;
import org.apache.sqoop.util.LoggingAsyncSink;
import org.apache.sqoop.util.SubprocessSecurityManager;
@ -54,7 +47,7 @@
* to Hive itself as well as orchestrating the use of the other classes in this
* package.
*/
public class HiveImport {
public class HiveImport implements HiveClient {
public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
@ -62,6 +55,7 @@ public class HiveImport {
private ConnManager connManager;
private Configuration configuration;
private boolean generateOnly;
private HiveClientCommon hiveClientCommon;
private static boolean testMode = false;
public static boolean getTestMode() {
@ -77,13 +71,17 @@ public static void setTestMode(boolean mode) {
"org.apache.hadoop.hive.cli.CliDriver";
public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
final Configuration conf, final boolean generateOnly) {
final Configuration conf, final boolean generateOnly, final HiveClientCommon hiveClientCommon) {
this.options = opts;
this.connManager = connMgr;
this.configuration = conf;
this.generateOnly = generateOnly;
this.hiveClientCommon = hiveClientCommon;
}
public HiveImport(SqoopOptions opts, ConnManager connMgr, Configuration conf, boolean generateOnly) {
this(opts, connMgr, conf, generateOnly, new HiveClientCommon());
}
/**
* @return the filename of the hive executable to run to do the import
@ -110,28 +108,12 @@ private String getHiveBinPath() {
}
}
/**
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH.
*/
private void removeTempLogs(Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
if (!fs.delete(logsPath, true)) {
LOG.warn("Could not delete temporary files; "
+ "continuing with import, but it may fail.");
}
}
}
/**
* @return true if we're just generating the DDL for the import, but
* not actually running it (i.e., --generate-only mode). If so, don't
* do any side-effecting actions in Hive.
*/
private boolean isGenerateOnly() {
boolean isGenerateOnly() {
return generateOnly;
}
@ -181,8 +163,6 @@ public void importTable(String inputTableName, String outputTableName,
}
// generate the HQL statements to run.
// reset the connection as it might have timed out
connManager.discardConnection(true);
TableDefWriter tableWriter = new TableDefWriter(options, connManager,
inputTableName, outputTableName,
configuration, !debugMode);
@ -191,23 +171,10 @@ public void importTable(String inputTableName, String outputTableName,
Path finalPath = tableWriter.getFinalPath();
if (!isGenerateOnly()) {
removeTempLogs(finalPath);
hiveClientCommon.removeTempLogs(configuration, finalPath);
LOG.info("Loading uploaded data into Hive");
String codec = options.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
try {
Tool tool = ReflectionUtils.newInstance(Class.
forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
asSubclass(Tool.class), configuration);
ToolRunner.run(configuration, tool,
new String[] { finalPath.toString() });
} catch (Exception ex) {
LOG.error("Error indexing lzo files", ex);
throw new IOException("Error indexing lzo files", ex);
}
}
hiveClientCommon.indexLzoFiles(options, finalPath);
}
// write them to a script file.
@ -242,7 +209,7 @@ public void importTable(String inputTableName, String outputTableName,
LOG.info("Hive import complete.");
cleanUp(finalPath);
hiveClientCommon.cleanUp(configuration, finalPath);
}
} finally {
if (!isGenerateOnly()) {
@ -256,37 +223,6 @@ public void importTable(String inputTableName, String outputTableName,
}
}
/**
* Clean up after successful HIVE import.
*
* @param outputPath path to the output directory
* @throws IOException
*/
private void cleanUp(Path outputPath) throws IOException {
FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
fs.delete(outputPath, true);
} else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}
}
} catch(IOException e) {
LOG.error("Issue with cleaning (safe to ignore)", e);
}
}
@SuppressWarnings("unchecked")
/**
* Execute the script file via Hive.
@ -398,5 +334,28 @@ private String[] getHiveArgs(String... args) throws IOException {
return newArgs.toArray(new String[newArgs.size()]);
}
@Override
public void importTable() throws IOException {
importTable(options.getTableName(), options.getHiveTableName(), false);
}
@Override
public void createTable() throws IOException {
importTable(options.getTableName(), options.getHiveTableName(), true);
}
SqoopOptions getOptions() {
return options;
}
ConnManager getConnManager() {
return connManager;
}
Configuration getConfiguration() {
return configuration;
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.db.JdbcConnectionFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import static java.util.Arrays.asList;
public class HiveServer2Client implements HiveClient {
private static final Log LOG = LogFactory.getLog(HiveServer2Client.class.getName());
private final SqoopOptions sqoopOptions;
private final TableDefWriter tableDefWriter;
private final JdbcConnectionFactory hs2ConnectionFactory;
private final HiveClientCommon hiveClientCommon;
public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory, HiveClientCommon hiveClientCommon) {
this.sqoopOptions = sqoopOptions;
this.tableDefWriter = tableDefWriter;
this.hs2ConnectionFactory = hs2ConnectionFactory;
this.hiveClientCommon = hiveClientCommon;
}
public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory) {
this(sqoopOptions, tableDefWriter, hs2ConnectionFactory, new HiveClientCommon());
}
@Override
public void importTable() throws IOException {
LOG.info("Loading uploaded data into Hive.");
String createTableStmt = tableDefWriter.getCreateTableStmt();
String loadDataStmt = tableDefWriter.getLoadDataStmt();
executeHiveImport(asList(createTableStmt, loadDataStmt));
LOG.info("Hive import complete.");
}
@Override
public void createTable() throws IOException {
LOG.info("Creating Hive table: " + tableDefWriter.getOutputTableName());
String createTableStmt = tableDefWriter.getCreateTableStmt();
executeHiveImport(asList(createTableStmt));
LOG.info("Hive table is successfully created.");
}
void executeHiveImport(List<String> commands) throws IOException {
Path finalPath = tableDefWriter.getFinalPath();
hiveClientCommon.removeTempLogs(sqoopOptions.getConf(), finalPath);
hiveClientCommon.indexLzoFiles(sqoopOptions, finalPath);
try {
executeCommands(commands);
} catch (SQLException e) {
throw new RuntimeException("Error executing Hive import.", e);
}
hiveClientCommon.cleanUp(sqoopOptions.getConf(), finalPath);
}
void executeCommands(List<String> commands) throws SQLException {
try (Connection hs2Connection = hs2ConnectionFactory.createConnection()) {
for (String command : commands) {
LOG.debug("Executing command: " + command);
try (PreparedStatement statement = hs2Connection.prepareStatement(command)) {
statement.execute();
}
}
}
}
SqoopOptions getSqoopOptions() {
return sqoopOptions;
}
TableDefWriter getTableDefWriter() {
return tableDefWriter;
}
JdbcConnectionFactory getHs2ConnectionFactory() {
return hs2ConnectionFactory;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.db.DriverManagerJdbcConnectionFactory;
import java.io.IOException;
import java.sql.Connection;
import static org.apache.commons.lang3.StringUtils.EMPTY;
public class HiveServer2ConnectionFactory extends DriverManagerJdbcConnectionFactory {
private static final Log LOG = LogFactory.getLog(HiveServer2ConnectionFactory.class.getName());
private static final String HS2_DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
public HiveServer2ConnectionFactory(String connectionString, String username, String password) {
super(HS2_DRIVER_CLASS, connectionString, username, password);
}
public HiveServer2ConnectionFactory(String connectionString, String username) {
this(connectionString, username, null);
}
@Override
public Connection createConnection() {
LOG.info("Creating connection to HiveServer2 as: " + getCurrentUser());
return super.createConnection();
}
private String getCurrentUser() {
try {
return UserGroupInformation.getCurrentUser().toString();
} catch (IOException e) {
LOG.error("Unable to determine current user.", e);
}
return EMPTY;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.authentication.KerberosAuthenticator;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
import java.io.IOException;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
public class HiveServer2ConnectionFactoryInitializer {
public JdbcConnectionFactory createJdbcConnectionFactory(SqoopOptions sqoopOptions) {
String connectionUsername = determineConnectionUsername(sqoopOptions);
JdbcConnectionFactory connectionFactory = new HiveServer2ConnectionFactory(sqoopOptions.getHs2Url(), connectionUsername);
if (useKerberizedConnection(sqoopOptions)) {
KerberosAuthenticator authenticator = createKerberosAuthenticator(sqoopOptions);
connectionFactory = new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator);
}
return connectionFactory;
}
private String determineConnectionUsername(SqoopOptions sqoopOptions) {
if (!isEmpty(sqoopOptions.getHs2User())) {
return sqoopOptions.getHs2User();
}
try {
return UserGroupInformation.getLoginUser().getUserName();
} catch (IOException e) {
throw new RuntimeException("Unable to determine login user.", e);
}
}
private KerberosAuthenticator createKerberosAuthenticator(SqoopOptions sqoopOptions) {
return new KerberosAuthenticator(sqoopOptions.getConf(), sqoopOptions.getHs2User(), sqoopOptions.getHs2Keytab());
}
private boolean useKerberizedConnection(SqoopOptions sqoopOptions) {
return !isBlank(sqoopOptions.getHs2Keytab());
}
}

View File

@ -96,6 +96,7 @@ public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
* @return the CREATE TABLE statement for the table to load into hive.
*/
public String getCreateTableStmt() throws IOException {
resetConnManager();
Map<String, Integer> columnTypes;
Properties userMapping = options.getMapColumnHive();
Boolean isHiveExternalTableSet = !StringUtils.isBlank(options.getHiveExternalTableDir());
@ -286,5 +287,38 @@ public static String getHiveOctalCharCode(int charNum) {
return String.format("\\%03o", charNum);
}
/**
* The JDBC connection owned by the ConnManager has been most probably opened when the import was started
* so it might have timed out by the time TableDefWriter methods are invoked which happens at the end of import.
* The task of this method is to discard the current connection held by ConnManager to make sure
* that TableDefWriter will have a working one.
*/
private void resetConnManager() {
this.connManager.discardConnection(true);
}
SqoopOptions getOptions() {
return options;
}
ConnManager getConnManager() {
return connManager;
}
Configuration getConfiguration() {
return configuration;
}
String getInputTableName() {
return inputTableName;
}
String getOutputTableName() {
return outputTableName;
}
boolean isCommentsEnabled() {
return commentsEnabled;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.sqoop.tool;
import static java.lang.String.format;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@ -131,6 +133,9 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
public static final String HCATALOG_STORAGE_STANZA_ARG =
"hcatalog-storage-stanza";
public static final String HCATALOG_HOME_ARG = "hcatalog-home";
public static final String HS2_URL_ARG = "hs2-url";
public static final String HS2_USER_ARG = "hs2-user";
public static final String HS2_KEYTAB_ARG = "hs2-keytab";
public static final String MAPREDUCE_JOB_NAME = "mapreduce-job-name";
public static final String NUM_MAPPERS_ARG = "num-mappers";
public static final String NUM_MAPPERS_SHORT_ARG = "m";
@ -609,6 +614,21 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) {
+ " types.")
.withLongOpt(MAP_COLUMN_HIVE)
.create());
hiveOpts.addOption(OptionBuilder
.hasArg()
.withDescription("The URL to the HiveServer2.")
.withLongOpt(HS2_URL_ARG)
.create());
hiveOpts.addOption(OptionBuilder
.hasArg()
.withDescription("The user/principal for HiveServer2.")
.withLongOpt(HS2_USER_ARG)
.create());
hiveOpts.addOption(OptionBuilder
.hasArg()
.withDescription("The location of the keytab of the HiveServer2 user.")
.withLongOpt(HS2_KEYTAB_ARG)
.create());
return hiveOpts;
}
@ -1238,6 +1258,15 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out)
if (in.hasOption(HIVE_EXTERNAL_TABLE_LOCATION_ARG)) {
out.setHiveExternalTableDir(in.getOptionValue(HIVE_EXTERNAL_TABLE_LOCATION_ARG));
}
if (in.hasOption(HS2_URL_ARG)) {
out.setHs2Url(in.getOptionValue(HS2_URL_ARG));
}
if (in.hasOption(HS2_USER_ARG)) {
out.setHs2User(in.getOptionValue(HS2_USER_ARG));
}
if (in.hasOption(HS2_KEYTAB_ARG)) {
out.setHs2Keytab(in.getOptionValue(HS2_KEYTAB_ARG));
}
}
@ -1618,6 +1647,8 @@ protected void validateHiveOptions(SqoopOptions options)
throw new InvalidOptionsException("Importing to external Hive table requires --hive-import parameter to be set."
+ HELP_STR);
}
validateHS2Options(options);
}
protected void validateAccumuloOptions(SqoopOptions options)
@ -1851,5 +1882,26 @@ protected void validateHasDirectConnectorOption(SqoopOptions options) throws Sqo
"Was called with the --direct option, but no direct connector available.");
}
}
}
protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException {
final String withoutTemplate = "The %s option cannot be used without the %s option.";
final String withTemplate = "The %s option cannot be used with the %s option.";
if (isSet(options.getHs2Url()) && !options.doHiveImport()) {
throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG));
}
if (isSet(options.getHs2User()) && !isSet(options.getHs2Url())) {
throw new InvalidOptionsException(format(withoutTemplate, HS2_USER_ARG, HS2_URL_ARG));
}
if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) {
throw new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG));
}
if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) {
throw new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG));
}
}
}

View File

@ -30,7 +30,8 @@
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.cli.ToolOptions;
import org.apache.sqoop.hive.HiveImport;
import org.apache.sqoop.hive.HiveClient;
import org.apache.sqoop.hive.HiveClientFactory;
/**
* Tool that creates a Hive table definition.
@ -40,8 +41,15 @@ public class CreateHiveTableTool extends BaseSqoopTool {
public static final Log LOG = LogFactory.getLog(
CreateHiveTableTool.class.getName());
public CreateHiveTableTool() {
private final HiveClientFactory hiveClientFactory;
public CreateHiveTableTool(HiveClientFactory hiveClientFactory) {
super("create-hive-table");
this.hiveClientFactory = hiveClientFactory;
}
public CreateHiveTableTool() {
this(new HiveClientFactory());
}
@Override
@ -52,10 +60,8 @@ public int run(SqoopOptions options) {
}
try {
HiveImport hiveImport = new HiveImport(options, manager,
options.getConf(), false);
hiveImport.importTable(options.getTableName(),
options.getHiveTableName(), true);
HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
hiveClient.createTable();
} catch (IOException ioe) {
LOG.error("Encountered IOException running create table job: "
+ StringUtils.stringifyException(ioe));

View File

@ -31,7 +31,6 @@
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.hive.HiveImport;
import org.apache.sqoop.util.ImportException;
/**
@ -75,7 +74,6 @@ public void applyOptions(CommandLine in, SqoopOptions out)
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
HiveImport hiveImport = null;
Set<String> excludes = new HashSet<String>();
if (!init(options)) {
@ -83,9 +81,6 @@ public int run(SqoopOptions options) {
}
try {
if (options.doHiveImport()) {
hiveImport = new HiveImport(options, manager, options.getConf(), false);
}
if (options.getAllTablesExclude() != null) {
excludes.addAll(Arrays.asList(options.getAllTablesExclude().split(",")));
@ -102,7 +97,8 @@ public int run(SqoopOptions options) {
System.out.println("Skipping table: " + tableName);
} else {
SqoopOptions clonedOptions = (SqoopOptions) options.clone();
importTable(clonedOptions, tableName, hiveImport);
clonedOptions.setTableName(tableName);
importTable(clonedOptions);
}
}
}

View File

@ -42,7 +42,8 @@
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.cli.ToolOptions;
import org.apache.sqoop.hive.HiveImport;
import org.apache.sqoop.hive.HiveClient;
import org.apache.sqoop.hive.HiveClientFactory;
import org.apache.sqoop.manager.ImportJobContext;
import org.apache.sqoop.mapreduce.MergeJob;
import org.apache.sqoop.metastore.JobData;
@ -78,18 +79,21 @@ public class ImportTool extends BaseSqoopTool {
// Set classloader for local job runner
private ClassLoader prevClassLoader = null;
private final HiveClientFactory hiveClientFactory;
public ImportTool() {
this("import", false);
}
public ImportTool(String toolName, boolean allTables) {
this(toolName, new CodeGenTool(), allTables);
this(toolName, new CodeGenTool(), allTables, new HiveClientFactory());
}
public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) {
public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables, HiveClientFactory hiveClientFactory) {
super(toolName);
this.codeGenerator = codeGenerator;
this.allTables = allTables;
this.hiveClientFactory = hiveClientFactory;
}
@Override
@ -499,17 +503,16 @@ protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context)
* Import a table or query.
* @return true if an import was performed, false otherwise.
*/
protected boolean importTable(SqoopOptions options, String tableName,
HiveImport hiveImport) throws IOException, ImportException {
protected boolean importTable(SqoopOptions options) throws IOException, ImportException {
String jarFile = null;
// Generate the ORM code for the tables.
jarFile = codeGenerator.generateORM(options, tableName);
jarFile = codeGenerator.generateORM(options, options.getTableName());
Path outputPath = getOutputPath(options, tableName);
Path outputPath = getOutputPath(options, options.getTableName());
// Do the actual import.
ImportJobContext context = new ImportJobContext(tableName, jarFile,
ImportJobContext context = new ImportJobContext(options.getTableName(), jarFile,
options, outputPath);
// If we're doing an incremental import, set up the
@ -522,7 +525,7 @@ protected boolean importTable(SqoopOptions options, String tableName,
deleteTargetDir(context);
}
if (null != tableName) {
if (null != options.getTableName()) {
manager.importTable(context);
} else {
manager.importQuery(context);
@ -540,7 +543,8 @@ protected boolean importTable(SqoopOptions options, String tableName,
// For Parquet file, the import action will create hive table directly via
// kite. So there is no need to do hive import as a post step again.
if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
hiveImport.importTable(tableName, options.getHiveTableName(), false);
HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
hiveClient.importTable();
}
}
@ -609,8 +613,6 @@ private Path getOutputPath(SqoopOptions options, String tableName, boolean temp)
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
HiveImport hiveImport = null;
if (allTables) {
// We got into this method, but we should be in a subclass.
// (This method only handles a single table)
@ -626,12 +628,8 @@ public int run(SqoopOptions options) {
codeGenerator.setManager(manager);
try {
if (options.doHiveImport()) {
hiveImport = new HiveImport(options, manager, options.getConf(), false);
}
// Import a single table (or query) the user specified.
importTable(options, options.getTableName(), hiveImport);
importTable(options);
} catch (IllegalArgumentException iea) {
LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
rethrowIfRequired(options, iea);

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
import org.assertj.core.api.SoftAssertions;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class HiveServer2ConnectionFactoryInitializerTest {
private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default";
private static final String TEST_HS2_USER = "testuser";
private static final String TEST_HS2_KEYTAB = "testkeytab";
private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
private SqoopOptions sqoopOptions;
private Configuration configuration;
private SoftAssertions softly;
@Before
public void before() {
connectionFactoryInitializer = new HiveServer2ConnectionFactoryInitializer();
sqoopOptions = mock(SqoopOptions.class);
configuration = mock(Configuration.class);
softly = new SoftAssertions();
when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER);
when(sqoopOptions.getConf()).thenReturn(configuration);
}
@Test
public void testCreateJdbcConnectionFactoryWithoutKerberosConfiguredReturnsHiveServer2ConnectionFactory() throws Exception {
JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
assertThat(connectionFactory, instanceOf(HiveServer2ConnectionFactory.class));
}
@Test
public void testCreateJdbcConnectionFactoryInitializesConnectionStringProperly() throws Exception {
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
assertEquals(TEST_HS2_URL, connectionFactory.getConnectionString());
}
@Test
public void testCreateJdbcConnectionFactoryInitializesConnectionUsernameProperly() throws Exception {
HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
assertEquals(TEST_HS2_USER, connectionFactory.getUsername());
}
@Test
public void testCreateJdbcConnectionFactoryWithoutHs2UserSpecifiedInitializesConnectionUsernameProperly() throws Exception {
when(sqoopOptions.getHs2User()).thenReturn(null);
String expectedUsername = UserGroupInformation.getLoginUser().getUserName();
HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
assertEquals(expectedUsername, connectionFactory.getUsername());
}
@Test
public void testCreateJdbcConnectionFactoryWithKerberosConfiguredReturnsKerberizedConnectionFactoryDecorator() throws Exception {
when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
assertThat(connectionFactory, instanceOf(KerberizedConnectionFactoryDecorator.class));
}
@Test
public void testCreateJdbcConnectionFactoryWithKerberosConfiguredInitializesDecoratorProperly() throws Exception {
when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
KerberizedConnectionFactoryDecorator connectionFactory = (KerberizedConnectionFactoryDecorator) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
softly.assertThat(connectionFactory.getDecorated()).isInstanceOf(HiveServer2ConnectionFactory.class);
softly.assertThat(connectionFactory.getAuthenticator().getConfiguration()).isSameAs(configuration);
softly.assertThat(connectionFactory.getAuthenticator().getPrincipal()).isEqualTo(TEST_HS2_USER);
softly.assertThat(connectionFactory.getAuthenticator().getKeytabLocation()).isEqualTo(TEST_HS2_KEYTAB);
softly.assertAll();
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.manager.ConnManager;
import org.assertj.core.api.SoftAssertions;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestHiveClientFactory {
private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default";
private static final String TEST_TABLE_NAME = "testTableName";
private static final String TEST_HIVE_TABLE_NAME = "testHiveTableName";
private HiveClientFactory hiveClientFactory;
private ConnManager connectionManager;
private SqoopOptions sqoopOptions;
private Configuration configuration;
private JdbcConnectionFactory jdbcConnectionFactory;
private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
private SoftAssertions softly;
@Before
public void before() {
connectionFactoryInitializer = mock(HiveServer2ConnectionFactoryInitializer.class);
hiveClientFactory = new HiveClientFactory(connectionFactoryInitializer);
softly = new SoftAssertions();
connectionManager = mock(ConnManager.class);
sqoopOptions = mock(SqoopOptions.class);
configuration = mock(Configuration.class);
jdbcConnectionFactory = mock(JdbcConnectionFactory.class);
when(sqoopOptions.getConf()).thenReturn(configuration);
when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE_NAME);
when(sqoopOptions.getHiveTableName()).thenReturn(TEST_HIVE_TABLE_NAME);
}
@Test
public void testCreateHiveClientCreatesHiveImportWhenHs2UrlIsNotProvided() throws Exception {
HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
assertThat(hiveClient, instanceOf(HiveImport.class));
}
@Test
public void testCreateHiveClientInitializesHiveImportProperly() throws Exception {
HiveImport hiveImport = (HiveImport) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
softly.assertThat(hiveImport.getOptions()).isSameAs(sqoopOptions);
softly.assertThat(hiveImport.getConnManager()).isSameAs(connectionManager);
softly.assertThat(hiveImport.getConfiguration()).isSameAs(configuration);
softly.assertThat(hiveImport.isGenerateOnly()).isFalse();
softly.assertAll();
}
@Test
public void testCreateHiveClientCreatesHiveServer2ClientWhenHs2UrlIsProvided() throws Exception {
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
assertThat(hiveClient, instanceOf(HiveServer2Client.class));
}
@Test
public void testCreateHiveClientInitializesHiveServer2ClientProperly() throws Exception {
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
when(connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions)).thenReturn(jdbcConnectionFactory);
HiveServer2Client hs2Client = (HiveServer2Client) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
softly.assertThat(hs2Client.getSqoopOptions()).isSameAs(sqoopOptions);
softly.assertThat(hs2Client.getHs2ConnectionFactory()).isSameAs(jdbcConnectionFactory);
softly.assertThat(hs2Client.getTableDefWriter().getOptions()).isSameAs(sqoopOptions);
softly.assertThat(hs2Client.getTableDefWriter().getConnManager()).isSameAs(connectionManager);
softly.assertThat(hs2Client.getTableDefWriter().getInputTableName()).isEqualTo(TEST_TABLE_NAME);
softly.assertThat(hs2Client.getTableDefWriter().getOutputTableName()).isEqualTo(TEST_HIVE_TABLE_NAME);
softly.assertThat(hs2Client.getTableDefWriter().getConfiguration()).isSameAs(configuration);
softly.assertThat(hs2Client.getTableDefWriter().isCommentsEnabled()).isFalse();
softly.assertAll();
}
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.hive.minicluster.AuthenticationConfiguration;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
import org.apache.sqoop.hive.minicluster.PasswordAuthenticationConfiguration;
import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestHiveMiniCluster {
@ClassRule
public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule();
private static final String TEST_USERNAME = "sqoop";
private static final String TEST_PASSWORD = "secret";
@Parameters(name = "config = {0}")
public static Iterable<? extends Object> authenticationParameters() {
return Arrays.asList(new NoAuthenticationConfiguration(),
new PasswordAuthenticationConfiguration(TEST_USERNAME, TEST_PASSWORD),
new KerberosAuthenticationConfiguration(miniKdcInfrastructure));
}
private static final String CREATE_TABLE_SQL = "CREATE TABLE TestTable (id int)";
private static final String INSERT_SQL = "INSERT INTO TestTable VALUES (?)";
private static final String SELECT_SQL = "SELECT * FROM TestTable";
private static final int TEST_VALUE = 10;
private final AuthenticationConfiguration authenticationConfiguration;
private HiveMiniCluster hiveMiniCluster;
private JdbcConnectionFactory connectionFactory;
public TestHiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) {
this.authenticationConfiguration = authenticationConfiguration;
}
@Before
public void before() throws SQLException {
hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration);
hiveMiniCluster.start();
connectionFactory = authenticationConfiguration.decorateConnectionFactory(new HiveServer2ConnectionFactory(hiveMiniCluster.getUrl(), TEST_USERNAME, TEST_PASSWORD));
}
@Test
public void testInsertedRowCanBeReadFromTable() throws Exception {
createTestTable();
insertRowIntoTestTable();
assertEquals(TEST_VALUE, getDataFromTestTable());
}
private void insertRowIntoTestTable() throws SQLException {
try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(INSERT_SQL)) {
stmnt.setInt(1, TEST_VALUE);
stmnt.executeUpdate();
}
}
private int getDataFromTestTable() throws SQLException {
try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(SELECT_SQL)) {
ResultSet resultSet = stmnt.executeQuery();
resultSet.next();
return resultSet.getInt(1);
}
}
private void createTestTable() throws SQLException {
try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(CREATE_TABLE_SQL)) {
stmnt.executeUpdate();
}
}
@After
public void after() {
hiveMiniCluster.stop();
UserGroupInformation.setConfiguration(new Configuration());
}
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.InOrder;
import org.mockito.Mockito;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHiveServer2Client {
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final String CREATE_TABLE_STATEMENT = "createTableStatement";
private static final String LOAD_DATA_STATEMENT = "loadDataStatement";
private static final List<String> TEST_COMMANDS = asList("command1", "command2", "command3");
private HiveServer2Client hs2Client;
private HiveServer2Client hs2ClientSpy;
private SqoopOptions sqoopOptions;
private TableDefWriter tableDefWriter;
private JdbcConnectionFactory hs2ConnectionFactory;
private Connection hs2Connection;
private PreparedStatement preparedStatement;
private HiveClientCommon hiveClientCommon;
private Path finalPath;
private Configuration configuration;
@Before
public void before() throws Exception {
sqoopOptions = mock(SqoopOptions.class);
tableDefWriter = mock(TableDefWriter.class);
hs2ConnectionFactory = mock(JdbcConnectionFactory.class);
hs2Connection = mock(Connection.class);
preparedStatement = mock(PreparedStatement.class);
hiveClientCommon = mock(HiveClientCommon.class);
finalPath = mock(Path.class);
configuration = mock(Configuration.class);
when(sqoopOptions.getConf()).thenReturn(configuration);
when(hs2ConnectionFactory.createConnection()).thenReturn(hs2Connection);
when(hs2Connection.prepareStatement(anyString())).thenReturn(preparedStatement);
when(tableDefWriter.getCreateTableStmt()).thenReturn(CREATE_TABLE_STATEMENT);
when(tableDefWriter.getLoadDataStmt()).thenReturn(LOAD_DATA_STATEMENT);
when(tableDefWriter.getFinalPath()).thenReturn(finalPath);
hs2Client = new HiveServer2Client(sqoopOptions, tableDefWriter, hs2ConnectionFactory, hiveClientCommon);
hs2ClientSpy = spy(hs2Client);
}
@Test
public void testImportTableExecutesHiveImportWithCreateTableAndLoadDataCommands() throws Exception {
doNothing().when(hs2ClientSpy).executeHiveImport(anyList());
hs2ClientSpy.importTable();
verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT, LOAD_DATA_STATEMENT));
}
@Test
public void testCreateTableExecutesHiveImportWithCreateTableCommandOnly() throws Exception {
doNothing().when(hs2ClientSpy).executeHiveImport(anyList());
hs2ClientSpy.createTable();
verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT));
}
@Test
public void testExecuteHiveImportInvokesMethodsInCorrectSequence() throws Exception {
InOrder inOrder = Mockito.inOrder(hiveClientCommon, hs2ClientSpy);
doNothing().when(hs2ClientSpy).executeCommands(TEST_COMMANDS);
hs2ClientSpy.executeHiveImport(TEST_COMMANDS);
inOrder.verify(hiveClientCommon).removeTempLogs(configuration, finalPath);
inOrder.verify(hiveClientCommon).indexLzoFiles(sqoopOptions, finalPath);
inOrder.verify(hs2ClientSpy).executeCommands(TEST_COMMANDS);
inOrder.verify(hiveClientCommon).cleanUp(configuration, finalPath);
}
@Test
public void testExecuteHiveImportThrowsRuntimeExceptionWhenExecuteCommandsThrows() throws Exception {
SQLException sqlException = mock(SQLException.class);
doThrow(sqlException).when(hs2ClientSpy).executeCommands(anyList());
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Error executing Hive import.");
hs2ClientSpy.executeHiveImport(TEST_COMMANDS);
}
@Test
public void testExecuteCommandsCreatesExactlyOneConnection() throws Exception {
hs2Client.executeCommands(TEST_COMMANDS);
verify(hs2ConnectionFactory, times(1)).createConnection();
}
@Test
public void testExecuteCommandsClosesConnectionWhenStatementExecutionIsSuccessful() throws Exception {
hs2Client.executeCommands(TEST_COMMANDS);
verify(hs2Connection).close();
}
@Test
public void testExecuteCommandsClosesConnectionWhenStatementExecutionThrows() throws Exception {
when(hs2Connection.prepareStatement(anyString())).thenThrow(new SQLException());
expectedException.expect(SQLException.class);
hs2Client.executeCommands(TEST_COMMANDS);
verify(hs2Connection).close();
}
@Test
public void testExecuteCommandsClosesPreparedStatementsWhenStatementExecutionIsSuccessful() throws Exception {
hs2Client.executeCommands(TEST_COMMANDS);
verify(preparedStatement, times(TEST_COMMANDS.size())).close();
}
@Test
public void testExecuteCommandsClosesPreparedStatementWhenStatementExecutionThrows() throws Exception {
when(preparedStatement.execute()).thenThrow(new SQLException());
expectedException.expect(SQLException.class);
hs2Client.executeCommands(TEST_COMMANDS);
verify(preparedStatement).close();
}
@Test
public void testExecuteCommandsThrowsWhenCreateConnectionThrows() throws Exception {
RuntimeException expected = mock(RuntimeException.class);
when(hs2ConnectionFactory.createConnection()).thenThrow(expected);
expectedException.expect(equalTo(expected));
hs2Client.executeCommands(TEST_COMMANDS);
}
@Test
public void testExecuteCommandsThrowsWhenPrepareStatementThrows() throws Exception {
SQLException expected = mock(SQLException.class);
when(hs2Connection.prepareStatement(anyString())).thenThrow(expected);
expectedException.expect(equalTo(expected));
hs2Client.executeCommands(TEST_COMMANDS);
}
@Test
public void testExecuteCommandsThrowsWhenExecuteStatementThrows() throws Exception {
SQLException expected = mock(SQLException.class);
when(preparedStatement.execute()).thenThrow(expected);
expectedException.expect(equalTo(expected));
hs2Client.executeCommands(TEST_COMMANDS);
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.HiveServer2TestUtil;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestHiveServer2TextImport extends ImportJobTestCase {
@ClassRule
public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule();
private HiveMiniCluster hiveMiniCluster;
private HiveServer2TestUtil hiveServer2TestUtil;
@Override
@Before
public void setUp() {
super.setUp();
KerberosAuthenticationConfiguration authenticationConfiguration = new KerberosAuthenticationConfiguration(miniKdcInfrastructure);
hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration);
hiveMiniCluster.start();
hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
}
@Override
@After
public void tearDown() {
super.tearDown();
hiveMiniCluster.stop();
}
@Test
public void testImport() throws Exception {
List<Object> columnValues = Arrays.<Object>asList("test", 42, "somestring");
String[] types = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
createTableWithColTypes(types, toStringArray(columnValues));
String[] args = new ArgumentArrayBuilder()
.withProperty(YarnConfiguration.RM_PRINCIPAL, miniKdcInfrastructure.getTestPrincipal())
.withOption("connect", getConnectString())
.withOption("table", getTableName())
.withOption("hive-import")
.withOption("hs2-url", hiveMiniCluster.getUrl())
.withOption("split-by", getColName(1))
.build();
runImport(args);
List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
assertEquals(columnValues, rows.get(0));
}
private String[] toStringArray(List<Object> columnValues) {
String[] result = new String[columnValues.size()];
for (int i = 0; i < columnValues.size(); i++) {
if (columnValues.get(i) instanceof String) {
result[i] = StringUtils.wrap((String) columnValues.get(i), '\'');
} else {
result[i] = columnValues.get(i).toString();
}
}
return result;
}
}

View File

@ -42,6 +42,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -248,6 +249,13 @@ public void testHiveDatabase() throws Exception {
assertTrue(createTable.contains("`db`.`outputTable`"));
}
@Test
public void testGetCreateTableStmtDiscardsConnection() throws Exception {
writer.getCreateTableStmt();
verify(connManager).discardConnection(true);
}
private void setUpMockConnManager(String tableName, Map<String, Integer> typeMap) {
when(connManager.getColumnTypes(tableName)).thenReturn(typeMap);
when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{}));

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import org.apache.sqoop.db.JdbcConnectionFactory;
import java.security.PrivilegedAction;
import java.util.Map;
public interface AuthenticationConfiguration {
Map<String, String> getAuthenticationConfig();
String getUrlParams();
<T> T doAsAuthenticated(PrivilegedAction<T> action);
void init();
JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory);
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.Service;
import org.apache.hive.service.server.HiveServer2;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class HiveMiniCluster {
private static final Log LOG = LogFactory.getLog(HiveMiniCluster.class.getName());
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 10000;
private final String hostName;
private final int port;
private final String tempFolderPath;
private final AuthenticationConfiguration authenticationConfiguration;
private final HiveServer2 hiveServer2;
private HiveConf config;
public HiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) {
this(DEFAULT_HOST, DEFAULT_PORT, authenticationConfiguration);
}
public HiveMiniCluster(String hostname, int port, AuthenticationConfiguration authenticationConfiguration) {
this(hostname, port, Files.createTempDir().getAbsolutePath(), authenticationConfiguration);
}
public HiveMiniCluster(String hostname, int port, String tempFolderPath, AuthenticationConfiguration authenticationConfiguration) {
this.hostName = hostname;
this.port = port;
this.tempFolderPath = tempFolderPath;
this.authenticationConfiguration = authenticationConfiguration;
this.hiveServer2 = new HiveServer2();
}
private void createHiveConf() {
config = new HiveConf();
config.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, tempFolderPath);
config.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getHostName());
config.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, getPort());
config.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, getMetastoreConnectUrl());
for (Map.Entry<String, String> authConfig : authenticationConfiguration.getAuthenticationConfig().entrySet()) {
config.set(authConfig.getKey(), authConfig.getValue());
}
}
public void start() {
try {
authenticationConfiguration.init();
createHiveConf();
createHiveSiteXml();
startHiveServer();
waitForStartUp();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void createHiveSiteXml() throws IOException {
File hiveSiteXmlFile = new File(tempFolderPath,"hive-site.xml");
try (OutputStream out = new FileOutputStream(hiveSiteXmlFile)) {
config.writeXml(out);
}
HiveConf.setHiveSiteLocation(hiveSiteXmlFile.toURI().toURL());
}
private void startHiveServer() throws Exception {
authenticationConfiguration.doAsAuthenticated(new PrivilegedAction<Void>() {
@Override
public Void run() {
hiveServer2.init(config);
hiveServer2.start();
return null;
}
});
}
public void stop() {
hiveServer2.stop();
HiveConf.setHiveSiteLocation(null);
try {
FileUtils.deleteDirectory(new File(tempFolderPath));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public HiveConf getConfig() {
return config;
}
public int getPort() {
return port;
}
public String getHostName() {
return hostName;
}
public String getUrl() {
return String.format("jdbc:hive2://%s:%d/default%s", hostName, port, authenticationConfiguration.getUrlParams());
}
public String getTempFolderPath() {
return tempFolderPath;
}
public String getMetastoreConnectUrl() {
return String.format("jdbc:derby:;databaseName=%s/minicluster_metastore_db;create=true", tempFolderPath);
}
public boolean isStarted() {
return hiveServer2.getServiceState() == Service.STATE.STARTED;
}
private void waitForStartUp() throws InterruptedException, TimeoutException {
final int numberOfAttempts = 500;
final long sleepTime = 100;
for (int i = 0; i < numberOfAttempts; ++i) {
try {
LOG.debug("Attempt " + (i + 1) + " to access " + hostName + ":" + port);
new Socket(InetAddress.getByName(hostName), port).close();
return;
} catch (RuntimeException | IOException e) {
LOG.debug("Failed to connect to " + hostName + ":" + port, e);
}
Thread.sleep(sleepTime);
}
throw new RuntimeException("Couldn't access new server: " + hostName + ":" + port);
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.sqoop.authentication.KerberosAuthenticator;
import org.apache.sqoop.db.JdbcConnectionFactory;
import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
import org.apache.sqoop.infrastructure.kerberos.KerberosConfigurationProvider;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
public class KerberosAuthenticationConfiguration implements AuthenticationConfiguration {
private final KerberosConfigurationProvider kerberosConfig;
private KerberosAuthenticator authenticator;
public KerberosAuthenticationConfiguration(KerberosConfigurationProvider kerberosConfig) {
this.kerberosConfig = kerberosConfig;
}
@Override
public Map<String, String> getAuthenticationConfig() {
Map<String, String> result = new HashMap<>();
result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, kerberosConfig.getTestPrincipal());
result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB.varname, kerberosConfig.getKeytabFilePath());
result.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.KERBEROS.toString());
result.put(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, HiveAuthFactory.AuthTypes.KERBEROS.toString());
result.put(YarnConfiguration.RM_PRINCIPAL, kerberosConfig.getTestPrincipal());
return result;
}
@Override
public String getUrlParams() {
return ";principal=" + kerberosConfig.getTestPrincipal();
}
@Override
public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
return authenticator.authenticate().doAs(action);
}
@Override
public void init() {
authenticator = createKerberosAuthenticator();
}
@Override
public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
return new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator);
}
private KerberosAuthenticator createKerberosAuthenticator() {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
KerberosAuthenticator result = new KerberosAuthenticator(conf, kerberosConfig.getTestPrincipal(), kerberosConfig.getKeytabFilePath());
return result;
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import org.apache.commons.lang3.StringUtils;
import org.apache.sqoop.db.JdbcConnectionFactory;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Map;
public class NoAuthenticationConfiguration implements AuthenticationConfiguration {
@Override
public Map<String, String> getAuthenticationConfig() {
return Collections.emptyMap();
}
@Override
public String getUrlParams() {
return StringUtils.EMPTY;
}
@Override
public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
return action.run();
}
@Override
public void init() {
// do nothing
}
@Override
public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
return connectionFactory;
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive.minicluster;
import org.apache.commons.lang3.StringUtils;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.PasswdAuthenticationProvider;
import org.apache.sqoop.db.JdbcConnectionFactory;
import javax.security.sasl.AuthenticationException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS;
public class PasswordAuthenticationConfiguration implements AuthenticationConfiguration {
private static String TEST_USERNAME;
private static String TEST_PASSWORD;
private static final class TestPasswordAuthenticationProvider implements PasswdAuthenticationProvider {
@Override
public void Authenticate(String user, String password) throws AuthenticationException {
if (!(TEST_USERNAME.equals(user) && TEST_PASSWORD.equals(password))) {
throw new AuthenticationException("Authentication failed!");
}
}
}
public PasswordAuthenticationConfiguration(String testUsername, String testPassword) {
TEST_USERNAME = testUsername;
TEST_PASSWORD = testPassword;
}
@Override
public Map<String, String> getAuthenticationConfig() {
Map<String, String> result = new HashMap<>();
result.put(HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.CUSTOM.getAuthName());
result.put(HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS.varname, TestPasswordAuthenticationProvider.class.getName());
return result;
}
@Override
public String getUrlParams() {
return StringUtils.EMPTY;
}
@Override
public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
return action.run();
}
@Override
public void init() {
//do nothing
}
@Override
public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
return connectionFactory;
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil;
import org.apache.sqoop.hive.HiveServer2ConnectionFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
public class HiveServer2TestUtil {
private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s";
private HiveServer2ConnectionFactory hs2ConnectionFactory;
public HiveServer2TestUtil(String url) {
this(url, null, null);
}
public HiveServer2TestUtil(String url, String username, String password) {
hs2ConnectionFactory = new HiveServer2ConnectionFactory(url, username, password);
}
public List<LinkedHashMap<String, Object>> loadRowsFromTable(String tableName) {
List<LinkedHashMap<String, Object>> result = new ArrayList<>();
try(Connection connection = hs2ConnectionFactory.createConnection();
PreparedStatement query = connection.prepareStatement(String.format(SELECT_TABLE_QUERY, tableName))) {
ResultSet resultSet = query.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
LinkedHashMap<String, Object> row = new LinkedHashMap<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
row.put(metaData.getColumnName(i), resultSet.getObject(i));
}
result.add(row);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return result;
}
public List<List<Object>> loadRawRowsFromTable(String tableName) {
List<List<Object>> result = new ArrayList<>();
List<LinkedHashMap<String, Object>> rowsWithColumnNames = loadRowsFromTable(tableName);
for (LinkedHashMap<String, Object> rowWithColumnNames : rowsWithColumnNames) {
result.add(new ArrayList<>(rowWithColumnNames.values()));
}
return result;
}
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.tool;
import org.apache.sqoop.SqoopOptions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Arrays;
import java.util.Properties;
import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
import static org.apache.sqoop.SqoopOptions.IncrementalMode.None;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestHiveServer2OptionValidations {
@Parameters(name = "sqoopTool = {0}")
public static Iterable<? extends Object> parameters() {
return Arrays.asList(
new ImportTool(),
new ImportAllTablesTool(),
new CreateHiveTableTool());
}
private static final String TEST_HS2_URL = "test-hs2-url";
private static final String TEST_HS2_USER = "test-hs2-user";
private static final String TEST_HS2_KEYTAB = "test-hs2-keytab";
private static final String TEST_TABLE = "testtable";
private static final String TEST_CONNECTION_STRING = "testconnectstring";
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final BaseSqoopTool sqoopTool;
private SqoopOptions sqoopOptions;
public TestHiveServer2OptionValidations(BaseSqoopTool sqoopTool) {
this.sqoopTool = spy(sqoopTool);
}
@Before
public void before() {
sqoopOptions = mock(SqoopOptions.class);
when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE);
when(sqoopOptions.getIncrementalMode()).thenReturn(None);
when(sqoopOptions.getConnectString()).thenReturn(TEST_CONNECTION_STRING);
when(sqoopOptions.getMapColumnHive()).thenReturn(new Properties());
doReturn(0).when(sqoopTool).getDashPosition(any(String[].class));
}
@Test
public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithoutHiveImport() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option.");
when(sqoopOptions.doHiveImport()).thenReturn(false);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithHCatalogImport() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option.");
when(sqoopOptions.getHCatTableName()).thenReturn(TEST_TABLE);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsThrowsWhenHs2UserIsUsedWithoutHs2Url() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-user option cannot be used without the hs2-url option.");
when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsThrowsWhenHs2KeytabIsUsedWithoutHs2User() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-keytab option cannot be used without the hs2-user option.");
when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImport() throws Exception {
when(sqoopOptions.doHiveImport()).thenReturn(true);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImportAndHs2UserButWithoutHs2Keytab() throws Exception {
when(sqoopOptions.doHiveImport()).thenReturn(true);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_URL);
sqoopTool.validateOptions(sqoopOptions);
}
@Test
public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception {
expectedException.expect(SqoopOptions.InvalidOptionsException.class);
expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option.");
when(sqoopOptions.doHiveImport()).thenReturn(true);
when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile);
sqoopTool.validateOptions(sqoopOptions);
}
}

View File

@ -23,7 +23,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -33,10 +32,10 @@
import java.sql.Connection;
import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
import org.apache.sqoop.hive.HiveImport;
import org.apache.avro.Schema;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.avro.AvroSchemaMismatchException;
import org.apache.sqoop.hive.HiveClientFactory;
import org.apache.sqoop.util.ExpectedLogMessage;
import org.junit.Assert;
import org.junit.Rule;
@ -75,7 +74,7 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex
final String actualSchemaString = "actualSchema";
final String errorMessage = "Import failed";
ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false));
ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false, mock(HiveClientFactory.class)));
doReturn(true).when(importTool).init(any(SqoopOptions.class));
@ -85,7 +84,7 @@ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Ex
when(actualSchema.toString()).thenReturn(actualSchemaString);
AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema);
doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class), anyString(), any(HiveImport.class));
doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class));
SqoopOptions sqoopOptions = mock(SqoopOptions.class);
when(sqoopOptions.doHiveImport()).thenReturn(true);