mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 08:19:57 +08:00
SQOOP-2437: Use hive configuration to connect to secure metastore
(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
parent
b8df3a8033
commit
c081094e5a
66
src/java/org/apache/sqoop/hive/HiveConfig.java
Normal file
66
src/java/org/apache/sqoop/hive/HiveConfig.java
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.sqoop.hive;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class HiveConfig {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(HiveConfig.class.getName());
|
||||||
|
|
||||||
|
public static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf";
|
||||||
|
|
||||||
|
public static final String HIVE_SASL_ENABLED = "hive.metastore.sasl.enabled";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dynamically create hive configuration object.
|
||||||
|
* @param conf
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static Configuration getHiveConf(Configuration conf) {
|
||||||
|
try {
|
||||||
|
Class HiveConfClass = Class.forName(HIVE_CONF_CLASS);
|
||||||
|
return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class)
|
||||||
|
.newInstance(conf, Configuration.class)));
|
||||||
|
} catch (ClassNotFoundException ex) {
|
||||||
|
LOG.error("Could not load " + HIVE_CONF_CLASS
|
||||||
|
+ ". Make sure HIVE_CONF_DIR is set correctly.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Could not instantiate HiveConf instance.", ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add hive conf to configuration object without overriding already set properties.
|
||||||
|
* @param hiveConf
|
||||||
|
* @param conf
|
||||||
|
*/
|
||||||
|
public static void addHiveConfigs(Configuration hiveConf, Configuration conf) {
|
||||||
|
for (Map.Entry<String, String> item : hiveConf) {
|
||||||
|
conf.setIfUnset(item.getKey(), item.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -389,7 +389,9 @@ private String[] getHiveArgs(String... args) {
|
|||||||
List<String> newArgs = new LinkedList<String>();
|
List<String> newArgs = new LinkedList<String>();
|
||||||
newArgs.addAll(Arrays.asList(args));
|
newArgs.addAll(Arrays.asList(args));
|
||||||
|
|
||||||
if (System.getProperty("mapreduce.job.credentials.binary") != null) {
|
HiveConfig.addHiveConfigs(HiveConfig.getHiveConf(configuration), configuration);
|
||||||
|
|
||||||
|
if (configuration.getBoolean(HiveConfig.HIVE_SASL_ENABLED, false)) {
|
||||||
newArgs.add("--hiveconf");
|
newArgs.add("--hiveconf");
|
||||||
newArgs.add("hive.metastore.sasl.enabled=true");
|
newArgs.add("hive.metastore.sasl.enabled=true");
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.sqoop.hive.HiveConfig;
|
||||||
import org.kitesdk.data.CompressionType;
|
import org.kitesdk.data.CompressionType;
|
||||||
import org.kitesdk.data.Dataset;
|
import org.kitesdk.data.Dataset;
|
||||||
import org.kitesdk.data.DatasetDescriptor;
|
import org.kitesdk.data.DatasetDescriptor;
|
||||||
@ -38,7 +39,6 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class for setting up a Parquet MapReduce job.
|
* Helper class for setting up a Parquet MapReduce job.
|
||||||
@ -47,7 +47,6 @@ public final class ParquetJob {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
|
public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
|
||||||
|
|
||||||
public static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf";
|
|
||||||
public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
|
public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
|
||||||
public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
|
public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
|
||||||
// Purposefully choosing the same token alias as the one Oozie chooses.
|
// Purposefully choosing the same token alias as the one Oozie chooses.
|
||||||
@ -90,12 +89,12 @@ public static CompressionType getCompressionType(Configuration conf) {
|
|||||||
public static void configureImportJob(JobConf conf, Schema schema,
|
public static void configureImportJob(JobConf conf, Schema schema,
|
||||||
String uri, WriteMode writeMode) throws IOException {
|
String uri, WriteMode writeMode) throws IOException {
|
||||||
Dataset dataset;
|
Dataset dataset;
|
||||||
Configuration hiveConf = getHiveConf(conf);
|
Configuration hiveConf = HiveConfig.getHiveConf(conf);
|
||||||
|
|
||||||
// Add hive delegation token only if we don't already have one.
|
// Add hive delegation token only if we don't already have one.
|
||||||
if (uri.startsWith("dataset:hive") && isSecureMetastore(hiveConf)) {
|
if (uri.startsWith("dataset:hive") && isSecureMetastore(hiveConf)) {
|
||||||
// Copy hive configs to job config
|
// Copy hive configs to job config
|
||||||
addHiveConfigs(hiveConf, conf);
|
HiveConfig.addHiveConfigs(hiveConf, conf);
|
||||||
|
|
||||||
if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
|
if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
|
||||||
addHiveDelegationToken(conf);
|
addHiveDelegationToken(conf);
|
||||||
@ -144,25 +143,6 @@ private static boolean isSecureMetastore(Configuration conf) {
|
|||||||
return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
|
return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Dynamically create hive configuration object.
|
|
||||||
* @param conf
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private static Configuration getHiveConf(Configuration conf) {
|
|
||||||
try {
|
|
||||||
Class HiveConfClass = Class.forName(HIVE_CONF_CLASS);
|
|
||||||
return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class)
|
|
||||||
.newInstance(conf, Configuration.class)));
|
|
||||||
} catch (ClassNotFoundException ex) {
|
|
||||||
LOG.error("Could not load " + HIVE_CONF_CLASS
|
|
||||||
+ ". Make sure HIVE_CONF_DIR is set correctly.");
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.error("Could not instantiate HiveConf instance.", ex);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add hive delegation token to credentials store.
|
* Add hive delegation token to credentials store.
|
||||||
* @param conf
|
* @param conf
|
||||||
@ -182,9 +162,9 @@ private static void addHiveDelegationToken(JobConf conf) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HiveConfClass = Class.forName(HIVE_CONF_CLASS);
|
HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
|
||||||
} catch (ClassNotFoundException ex) {
|
} catch (ClassNotFoundException ex) {
|
||||||
LOG.error("Could not load " + HIVE_CONF_CLASS
|
LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
|
||||||
+ " when adding hive delegation token."
|
+ " when adding hive delegation token."
|
||||||
+ " Make sure HIVE_CONF_DIR is set correctly.", ex);
|
+ " Make sure HIVE_CONF_DIR is set correctly.", ex);
|
||||||
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
||||||
@ -209,15 +189,4 @@ private static void addHiveDelegationToken(JobConf conf) {
|
|||||||
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
throw new RuntimeException("Couldn't fetch delegation token.", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Add hive conf to configuration object without overriding already set properties.
|
|
||||||
* @param hiveConf
|
|
||||||
* @param conf
|
|
||||||
*/
|
|
||||||
private static void addHiveConfigs(Configuration hiveConf, Configuration conf) {
|
|
||||||
for (Map.Entry<String, String> item : hiveConf) {
|
|
||||||
conf.setIfUnset(item.getKey(), item.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user