5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 21:31:27 +08:00

Hive import can use in-process Hive instance.

If the Hive CliDriver is present on the classpath, Sqoop will
call it directly in the same JVM, rather than invoke a separate
JVM instance for Hive.

From: Aaron Kimball <aaron@cloudera.com>

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Bayer 2011-07-22 20:03:57 +00:00
parent 1d80c4ded7
commit b3023e2bf9
3 changed files with 268 additions and 13 deletions

View File

@ -23,6 +23,8 @@
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@ -35,7 +37,9 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.util.Executor;
import com.cloudera.sqoop.util.ExitSecurityException;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.SubprocessSecurityManager;
/**
* Utility to import a table into the Hive metastore. Manages the connection
@ -51,6 +55,10 @@ public class HiveImport {
private Configuration configuration;
private boolean generateOnly;
/** Entry point through which Hive invocation should be attempted. */
private static final String HIVE_MAIN_CLASS =
"org.apache.hadoop.hive.cli.CliDriver";
public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
final Configuration conf, final boolean generateOnly) {
this.options = opts;
@ -202,19 +210,7 @@ public void importTable(String inputTableName, String outputTableName,
}
if (!isGenerateOnly()) {
// run Hive on the script and note the return code.
String hiveExec = getHiveBinPath();
ArrayList<String> args = new ArrayList<String>();
args.add(hiveExec);
args.add("-f");
args.add(filename);
LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
int ret = Executor.exec(args.toArray(new String[0]),
env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
executeScript(filename, env);
LOG.info("Hive import complete.");
}
@ -229,5 +225,101 @@ public void importTable(String inputTableName, String outputTableName,
}
}
}
@SuppressWarnings("unchecked")
/**
* Execute the script file via Hive.
* If Hive's jars are on the classpath, run it in the same process.
* Otherwise, execute the file with 'bin/hive'.
*
* @param filename The script file to run.
* @param env the environment strings to pass to any subprocess.
* @throws IOException if Hive did not exit successfully.
*/
private void executeScript(String filename, List<String> env)
throws IOException {
SubprocessSecurityManager subprocessSM = null;
try {
Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
// We loaded the CLI Driver in this JVM, so we will just
// call it in-process. The CliDriver class has a method:
// void main(String [] args) throws Exception.
//
// We'll call that here to invoke 'hive -f scriptfile'.
// Because this method will call System.exit(), we use
// a SecurityManager to prevent this.
LOG.debug("Using in-process Hive instance.");
subprocessSM = new SubprocessSecurityManager();
subprocessSM.install();
// Create the argv for the Hive Cli Driver.
String [] argArray = new String[2];
argArray[0] = "-f";
argArray[1] = filename;
// And invoke the static method on this array.
Method mainMethod = cliDriverClass.getMethod("main", argArray.getClass());
mainMethod.invoke(null, (Object) argArray);
} catch (ClassNotFoundException cnfe) {
// Hive is not on the classpath. Run externally.
// This is not an error path.
LOG.debug("Using external Hive process.");
executeExternalHiveScript(filename, env);
} catch (NoSuchMethodException nsme) {
// Could not find a handle to the main() method.
throw new IOException("Could not access CliDriver.main()", nsme);
} catch (IllegalAccessException iae) {
// Error getting a handle on the main() method.
throw new IOException("Could not access CliDriver.main()", iae);
} catch (InvocationTargetException ite) {
// We ran CliDriver.main() and an exception was thrown from within Hive.
// This may have been the ExitSecurityException triggered by the
// SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
// an IOException and rethrow.
Throwable cause = ite.getCause();
if (cause instanceof ExitSecurityException) {
ExitSecurityException ese = (ExitSecurityException) cause;
int status = ese.getExitStatus();
if (status != 0) {
throw new IOException("Hive CliDriver exited with status=" + status);
}
} else {
throw new IOException("Exception thrown in Hive", ite);
}
} finally {
if (null != subprocessSM) {
// Uninstall the SecurityManager used to trap System.exit().
subprocessSM.uninstall();
}
}
}
/**
* Execute Hive via an external 'bin/hive' process.
* @param filename the Script file to run.
* @param env the environment strings to pass to any subprocess.
* @throws IOException if Hive did not exit successfully.
*/
private void executeExternalHiveScript(String filename, List<String> env)
throws IOException {
// run Hive on the script and note the return code.
String hiveExec = getHiveBinPath();
ArrayList<String> args = new ArrayList<String>();
args.add(hiveExec);
args.add("-f");
args.add(filename);
LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
int ret = Executor.exec(args.toArray(new String[0]),
env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.util;
/**
* SecurityException suppressing a System.exit() call.
*
* Allows retrieval of the would-be exit status code.
*/
@SuppressWarnings("serial")
public class ExitSecurityException extends SecurityException {
private final int exitStatus;
public ExitSecurityException() {
super("ExitSecurityException");
this.exitStatus = 0;
}
public ExitSecurityException(final String message) {
super(message);
this.exitStatus = 0;
}
/**
* Register a System.exit() event being suppressed with a particular
* exit status code.
*/
public ExitSecurityException(int status) {
super("ExitSecurityException");
this.exitStatus = status;
}
@Override
public String toString() {
String msg = getMessage();
return (null == msg) ? ("exit with status " + exitStatus) : msg;
}
public int getExitStatus() {
return this.exitStatus;
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.sqoop.util;
import java.security.Permission;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A SecurityManager used to run subprocesses and disallow certain actions.
*
* This specifically disallows System.exit().
*
* This SecurityManager will also check with any existing SecurityManager as
* to the validity of any permissions. The SubprocessSecurityManager should be
* installed with the install() method, which will retain a handle to any
* previously-installed SecurityManager instance.
*
* When this SecurityManager is no longer necessary, the uninstall() method
* should be used which reinstates the previous SecurityManager as the active
* SecurityManager.
*/
public class SubprocessSecurityManager extends SecurityManager {
public static final Log LOG = LogFactory.getLog(
SubprocessSecurityManager.class.getName());
private SecurityManager parentSecurityManager;
private boolean installed;
private boolean allowReplacement;
public SubprocessSecurityManager() {
this.installed = false;
this.allowReplacement = false;
}
/**
* Install this SecurityManager and retain a reference to any
* previously-installed SecurityManager.
*/
public void install() {
LOG.debug("Installing subprocess security manager");
this.parentSecurityManager = System.getSecurityManager();
System.setSecurityManager(this);
this.installed = true;
}
/**
* Restore an existing SecurityManager, uninstalling this one.
*/
public void uninstall() {
if (this.installed) {
LOG.debug("Uninstalling subprocess security manager");
this.allowReplacement = true;
System.setSecurityManager(this.parentSecurityManager);
}
}
@Override
/**
* Disallow the capability to call System.exit() or otherwise
* terminate the JVM.
*/
public void checkExit(int status) {
LOG.debug("Rejecting System.exit call with status=" + status);
throw new ExitSecurityException(status);
}
@Override
/**
* Check a particular permission. Checks with this SecurityManager
* as well as any previously-installed manager.
*
* @param perm the Permission to check; must not be null.
*/
public void checkPermission(Permission perm) {
if (null != this.parentSecurityManager) {
// Check if the prior SecurityManager would have rejected this.
parentSecurityManager.checkPermission(perm);
}
if (!allowReplacement && perm.getName().equals("setSecurityManager")) {
throw new SecurityException("Cannot replace security manager");
}
}
}