diff --git a/src/java/com/cloudera/sqoop/util/AppendUtils.java b/src/java/com/cloudera/sqoop/util/AppendUtils.java index 3712468a..0b9f8877 100644 --- a/src/java/com/cloudera/sqoop/util/AppendUtils.java +++ b/src/java/com/cloudera/sqoop/util/AppendUtils.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,215 +18,16 @@ package com.cloudera.sqoop.util; -import java.io.IOException; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import com.cloudera.sqoop.manager.ImportJobContext; -import com.cloudera.sqoop.SqoopOptions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** - * Utilities used when appending imported files to an existing dir. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class AppendUtils { - - public static final Log LOG = LogFactory.getLog(AppendUtils.class.getName()); - - private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat( - "ddHHmmssSSS"); - private static final String TEMP_IMPORT_ROOT = - System.getProperty("sqoop.test.import.rootDir", "_sqoop"); - - private static final int PARTITION_DIGITS = 5; - private static final String FILEPART_SEPARATOR = "-"; - private static final String FILEEXT_SEPARATOR = "."; - - private ImportJobContext context = null; +public class AppendUtils + extends org.apache.sqoop.util.AppendUtils { public AppendUtils(ImportJobContext context) { - this.context = context; - } - - /** - * Moves the imported files from temporary directory to specified target-dir, - * renaming partition number if appending file exists. - */ - public void append() throws IOException { - - SqoopOptions options = context.getOptions(); - FileSystem fs = FileSystem.get(options.getConf()); - Path tempDir = context.getDestination(); - - // Try in this order: target-dir or warehouse-dir - Path userDestDir = null; - if (options.getTargetDir() != null) { - userDestDir = new Path(options.getTargetDir()); - } else if (options.getWarehouseDir() != null) { - userDestDir = new Path(options.getWarehouseDir(), - context.getTableName()); - } else { - userDestDir = new Path(context.getTableName()); - } - - int nextPartition = 0; - - if (!fs.exists(tempDir)) { - // This occurs if there was no source (tmp) dir. This might happen - // if the import was an HBase-target import, but the user specified - // --append anyway. This is a warning, not an error. - LOG.warn("Cannot append files to target dir; no such directory: " - + tempDir); - return; - } - - // Create target directory. - if (!fs.exists(userDestDir)) { - LOG.info("Creating missing output directory - " + userDestDir.getName()); - fs.mkdirs(userDestDir); - nextPartition = 0; - } else { - LOG.info("Appending to directory " + userDestDir.getName()); - // Get the right next partition for the imported files - nextPartition = getNextPartition(fs, userDestDir); - } - - // move files - moveFiles(fs, tempDir, userDestDir, nextPartition); - - // delete temporary path - LOG.debug("Deleting temporary folder " + tempDir.getName()); - fs.delete(tempDir, true); - } - - /** - * Returns the greatest partition number available for appending, for data - * files in targetDir. - */ - private int getNextPartition(FileSystem fs, Path targetDir) - throws IOException { - - int nextPartition = 0; - FileStatus[] existingFiles = fs.listStatus(targetDir); - if (existingFiles != null && existingFiles.length > 0) { - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); - for (FileStatus fileStat : existingFiles) { - if (!fileStat.isDir()) { - String filename = fileStat.getPath().getName(); - Matcher mat = patt.matcher(filename); - if (mat.matches()) { - int thisPart = Integer.parseInt(mat.group(1)); - if (thisPart >= nextPartition) { - nextPartition = thisPart; - nextPartition++; - } - } - } - } - } - - if (nextPartition > 0) { - LOG.info("Using found partition " + nextPartition); - } - - return nextPartition; - } - - /** - * Move files from source to target using a specified starting partition. - */ - private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir, - int partitionStart) throws IOException { - - NumberFormat numpart = NumberFormat.getInstance(); - numpart.setMinimumIntegerDigits(PARTITION_DIGITS); - numpart.setGroupingUsed(false); - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); - FileStatus[] tempFiles = fs.listStatus(sourceDir); - - if (null == tempFiles) { - // If we've already checked that the dir exists, and now it can't be - // listed, this is a genuine error (permissions, fs integrity, or other). - throw new IOException("Could not list files from " + sourceDir); - } - - // Move and rename files & directories from temporary to target-dir thus - // appending file's next partition - for (FileStatus fileStat : tempFiles) { - if (!fileStat.isDir()) { - // Move imported data files - String filename = fileStat.getPath().getName(); - Matcher mat = patt.matcher(filename); - if (mat.matches()) { - String name = getFilename(filename); - String fileToMove = name.concat(numpart.format(partitionStart++)); - String extension = getFileExtension(filename); - if (extension != null) { - fileToMove = fileToMove.concat(extension); - } - LOG.debug("Filename: " + filename + " repartitioned to: " - + fileToMove); - fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove)); - } - } else { - // Move directories (_logs & any other) - String dirName = fileStat.getPath().getName(); - Path path = new Path(targetDir, dirName); - int dirNumber = 0; - while (fs.exists(path)) { - path = new Path(targetDir, dirName.concat("-").concat( - numpart.format(dirNumber++))); - } - LOG.debug("Directory: " + dirName + " renamed to: " + path.getName()); - fs.rename(fileStat.getPath(), path); - } - } - } - - /** returns the name component of a file. */ - private String getFilename(String filename) { - String result = null; - int pos = filename.lastIndexOf(FILEPART_SEPARATOR); - if (pos != -1) { - result = filename.substring(0, pos + 1); - } else { - pos = filename.lastIndexOf(FILEEXT_SEPARATOR); - if (pos != -1) { - result = filename.substring(0, pos); - } else { - result = filename; - } - } - return result; - } - - /** returns the extension component of a filename. */ - private String getFileExtension(String filename) { - int pos = filename.lastIndexOf(FILEEXT_SEPARATOR); - if (pos != -1) { - return filename.substring(pos, filename.length()); - } else { - return null; - } - } - - /** - * Creates a unique path object inside the sqoop temporary directory. - * - * @param tableName - * @return a path pointing to the temporary directory - */ - public static Path getTempAppendDir(String tableName) { - String timeId = DATE_FORM.format(new Date(System.currentTimeMillis())); - String tempDir = TEMP_IMPORT_ROOT + Path.SEPARATOR + timeId + tableName; - return new Path(tempDir); + super(context); } } diff --git a/src/java/com/cloudera/sqoop/util/AsyncSink.java b/src/java/com/cloudera/sqoop/util/AsyncSink.java index 414daefb..1a869bab 100644 --- a/src/java/com/cloudera/sqoop/util/AsyncSink.java +++ b/src/java/com/cloudera/sqoop/util/AsyncSink.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,28 +18,9 @@ package com.cloudera.sqoop.util; -import java.io.InputStream; - /** - * An interface describing a factory class for a Thread class that handles - * input from some sort of stream. - * - * When the stream is closed, the thread should terminate. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public abstract class AsyncSink { - - /** - * Create and run a thread to handle input from the provided InputStream. - * When processStream returns, the thread should be running; it should - * continue to run until the InputStream is exhausted. - */ - public abstract void processStream(InputStream is); - - /** - * Wait until the stream has been processed. - * @return a status code indicating success or failure. 0 is typical for - * success. - */ - public abstract int join() throws InterruptedException; +public abstract class AsyncSink + extends org.apache.sqoop.util.AsyncSink { } - diff --git a/src/java/com/cloudera/sqoop/util/ClassLoaderStack.java b/src/java/com/cloudera/sqoop/util/ClassLoaderStack.java index e42e7d6c..fcec0ce5 100644 --- a/src/java/com/cloudera/sqoop/util/ClassLoaderStack.java +++ b/src/java/com/cloudera/sqoop/util/ClassLoaderStack.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,84 +18,23 @@ package com.cloudera.sqoop.util; -import java.io.File; import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** - * Allows you to add and remove jar-files from the running JVM by - * instantiating classloaders for them. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class ClassLoaderStack { - public static final Log LOG = LogFactory.getLog( - ClassLoaderStack.class.getName()); + private ClassLoaderStack() { } - private ClassLoaderStack() { - } - - /** - * Sets the classloader for the current thread. - */ public static void setCurrentClassLoader(ClassLoader cl) { - LOG.debug("Restoring classloader: " + cl.toString()); - Thread.currentThread().setContextClassLoader(cl); + org.apache.sqoop.util.ClassLoaderStack.setCurrentClassLoader(cl); } - /** - * Adds a ClassLoader to the top of the stack that will load from the Jar - * file of your choice. Returns the previous classloader so you can restore - * it if need be, later. - * - * @param jarFile The filename of a jar file that you want loaded into this - * JVM. - * @param testClassName The name of the class to load immediately - * (optional). - */ public static ClassLoader addJarFile(String jarFile, String testClassName) throws IOException { - - ClassLoader prevClassLoader = - Thread.currentThread().getContextClassLoader(); - - if (null != testClassName) { - try { - // Test to see if testClassName is already available. If so, do not - // load this jar. - LOG.debug("Checking for existing class: " + testClassName); - Class.forName(testClassName, true, prevClassLoader); - LOG.debug("Class is already available. Skipping jar " + jarFile); - return prevClassLoader; - } catch (ClassNotFoundException cnfe) { - // Expected this; we need to load the jar. continue. - } - } - - String urlPath = "jar:file://" + new File(jarFile).getAbsolutePath() + "!/"; - LOG.debug("Attempting to load jar through URL: " + urlPath); - LOG.debug("Previous classloader is " + prevClassLoader); - URL [] jarUrlArray = {new URL(urlPath)}; - URLClassLoader cl = URLClassLoader.newInstance(jarUrlArray, - prevClassLoader); - try { - if (null != testClassName) { - // try to load a class from the jar to force loading now. - LOG.debug("Testing class in jar: " + testClassName); - Class.forName(testClassName, true, cl); - } - LOG.debug("Loaded jar into current JVM: " + urlPath); - } catch (ClassNotFoundException cnfe) { - throw new IOException("Could not load jar " + jarFile - + " into JVM. (Could not find class " - + testClassName + ".)", cnfe); - } - - LOG.debug("Added classloader for jar " + jarFile + ": " + cl); - Thread.currentThread().setContextClassLoader(cl); - return prevClassLoader; + return org.apache.sqoop.util.ClassLoaderStack.addJarFile(jarFile, + testClassName); } + } diff --git a/src/java/com/cloudera/sqoop/util/DirectImportUtils.java b/src/java/com/cloudera/sqoop/util/DirectImportUtils.java index 5a65a4de..9b847480 100644 --- a/src/java/com/cloudera/sqoop/util/DirectImportUtils.java +++ b/src/java/com/cloudera/sqoop/util/DirectImportUtils.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,105 +18,35 @@ package com.cloudera.sqoop.util; -import java.io.IOException; import java.io.File; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.conf.Configuration; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.io.CodecMap; -import com.cloudera.sqoop.io.SplittingOutputStream; -import com.cloudera.sqoop.io.SplittableBufferedWriter; -import org.apache.hadoop.util.Shell; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.io.SplittableBufferedWriter; import com.cloudera.sqoop.manager.ImportJobContext; /** - * Utility methods that are common to various the direct import managers. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class DirectImportUtils { - public static final Log LOG = LogFactory.getLog( - DirectImportUtils.class.getName()); + private DirectImportUtils() { } - private DirectImportUtils() { - } - - /** - * Executes chmod on the specified file, passing in the mode string 'modstr' - * which may be e.g. "a+x" or "0600", etc. - * @throws IOException if chmod failed. - */ public static void setFilePermissions(File file, String modstr) throws IOException { - // Set this file to be 0600. Java doesn't have a built-in mechanism for this - // so we need to go out to the shell to execute chmod. - try { - Shell.execCommand("chmod", modstr, file.toString()); - } catch (IOException ioe) { - // Shell.execCommand will throw IOException on exit code != 0. - LOG.error("Could not chmod " + modstr + " " + file.toString()); - throw new IOException("Could not ensure password file security.", ioe); - } + org.apache.sqoop.util.DirectImportUtils.setFilePermissions(file, modstr); } - /** - * Open a file in HDFS for write to hold the data associated with a table. - * Creates any necessary directories, and returns the OutputStream to write - * to. The caller is responsible for calling the close() method on the - * returned stream. - */ public static SplittableBufferedWriter createHdfsSink(Configuration conf, SqoopOptions options, ImportJobContext context) throws IOException { - - FileSystem fs = FileSystem.get(conf); - Path destDir = context.getDestination(); - - LOG.debug("Writing to filesystem: " + conf.get("fs.default.name")); - LOG.debug("Creating destination directory " + destDir); - fs.mkdirs(destDir); - - // This Writer will be closed by the caller. - return new SplittableBufferedWriter( - new SplittingOutputStream(conf, destDir, "data-", - options.getDirectSplitSize(), getCodec(conf, options))); + return org.apache.sqoop.util.DirectImportUtils.createHdfsSink(conf, + options, context); } - private static CompressionCodec getCodec(Configuration conf, - SqoopOptions options) throws IOException { - if (options.shouldUseCompression()) { - if (options.getCompressionCodec() == null) { - return new GzipCodec(); - } else { - return CodecMap.getCodec(options.getCompressionCodec(), conf); - } - } - return null; - } - - /** @return true if someHost refers to localhost. - */ public static boolean isLocalhost(String someHost) { - if (null == someHost) { - return false; - } - - try { - InetAddress localHostAddr = InetAddress.getLocalHost(); - InetAddress someAddr = InetAddress.getByName(someHost); - - return localHostAddr.equals(someAddr); - } catch (UnknownHostException uhe) { - return false; - } + return org.apache.sqoop.util.DirectImportUtils.isLocalhost(someHost); } -} +} diff --git a/src/java/com/cloudera/sqoop/util/ErrorableAsyncSink.java b/src/java/com/cloudera/sqoop/util/ErrorableAsyncSink.java index d05b6536..4af8d72f 100644 --- a/src/java/com/cloudera/sqoop/util/ErrorableAsyncSink.java +++ b/src/java/com/cloudera/sqoop/util/ErrorableAsyncSink.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,20 +19,8 @@ package com.cloudera.sqoop.util; /** - * Partial implementation of AsyncSink that relies on ErrorableThread to - * provide a status bit for the join() method. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public abstract class ErrorableAsyncSink extends AsyncSink { - - protected ErrorableThread child; - - public int join() throws InterruptedException { - child.join(); - if (child.isErrored()) { - return 1; - } else { - return 0; - } - } +public abstract class ErrorableAsyncSink + extends org.apache.sqoop.util.ErrorableAsyncSink { } - diff --git a/src/java/com/cloudera/sqoop/util/ErrorableThread.java b/src/java/com/cloudera/sqoop/util/ErrorableThread.java index 70b265ea..6cde23af 100644 --- a/src/java/com/cloudera/sqoop/util/ErrorableThread.java +++ b/src/java/com/cloudera/sqoop/util/ErrorableThread.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,22 +19,9 @@ package com.cloudera.sqoop.util; /** - * A thread which has an error bit which can be set from within the thread. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public abstract class ErrorableThread extends Thread { - - private volatile boolean error; - - public ErrorableThread() { - this.error = false; - } - - protected void setError() { - this.error = true; - } - - public boolean isErrored() { - return this.error; - } +public abstract class ErrorableThread + extends org.apache.sqoop.util.ErrorableThread { } diff --git a/src/java/com/cloudera/sqoop/util/Executor.java b/src/java/com/cloudera/sqoop/util/Executor.java index 08d6f0e1..366a2fa8 100644 --- a/src/java/com/cloudera/sqoop/util/Executor.java +++ b/src/java/com/cloudera/sqoop/util/Executor.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,99 +19,33 @@ package com.cloudera.sqoop.util; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.util.AsyncSink; /** - * Runs a process via Runtime.exec() and allows handling of stdout/stderr to be - * deferred to other threads. - * + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class Executor { - public static final Log LOG = LogFactory.getLog(Executor.class.getName()); + private Executor() { } - private Executor() { - } - - /** - * Execute a program defined by the args array with default stream sinks - * that consume the program's output (to prevent it from blocking on buffers) - * and then ignore said output. - */ public static int exec(String [] args) throws IOException { - NullAsyncSink s = new NullAsyncSink(); - return exec(args, s, s); + return org.apache.sqoop.util.Executor.exec(args); } - /** - * Run a command via Runtime.exec(), with its stdout and stderr streams - * directed to be handled by threads generated by AsyncSinks. - * Block until the child process terminates. - * - * @return the exit status of the ran program - */ public static int exec(String [] args, AsyncSink outSink, AsyncSink errSink) throws IOException { - return exec(args, null, outSink, errSink); + return org.apache.sqoop.util.Executor.exec(args, outSink, errSink); } - - /** - * Run a command via Runtime.exec(), with its stdout and stderr streams - * directed to be handled by threads generated by AsyncSinks. - * Block until the child process terminates. Allows the programmer to - * specify an environment for the child program. - * - * @return the exit status of the ran program - */ public static int exec(String [] args, String [] envp, AsyncSink outSink, AsyncSink errSink) throws IOException { - - // launch the process. - Process p = Runtime.getRuntime().exec(args, envp); - - // dispatch its stdout and stderr to stream sinks if available. - if (null != outSink) { - outSink.processStream(p.getInputStream()); - } - - if (null != errSink) { - errSink.processStream(p.getErrorStream()); - } - - // wait for the return value. - while (true) { - try { - int ret = p.waitFor(); - return ret; - } catch (InterruptedException ie) { - continue; - } - } + return org.apache.sqoop.util.Executor.exec(args, envp, outSink, errSink); } - - /** - * @return An array formatted correctly for use as an envp based on the - * current environment for this program. - */ public static List getCurEnvpStrings() { - Map curEnv = System.getenv(); - ArrayList array = new ArrayList(); - - if (null == curEnv) { - return null; - } - - for (Map.Entry entry : curEnv.entrySet()) { - array.add(entry.getKey() + "=" + entry.getValue()); - } - - return array; + return org.apache.sqoop.util.Executor.getCurEnvpStrings(); } + } diff --git a/src/java/com/cloudera/sqoop/util/ExitSecurityException.java b/src/java/com/cloudera/sqoop/util/ExitSecurityException.java index ec3666c1..e9ec4641 100644 --- a/src/java/com/cloudera/sqoop/util/ExitSecurityException.java +++ b/src/java/com/cloudera/sqoop/util/ExitSecurityException.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,41 +19,22 @@ package com.cloudera.sqoop.util; /** - * SecurityException suppressing a System.exit() call. - * - * Allows retrieval of the would-be exit status code. + * @deprecated Moving to use org.apache.sqoop namespace. */ @SuppressWarnings("serial") -public class ExitSecurityException extends SecurityException { - - private final int exitStatus; +public class ExitSecurityException + extends org.apache.sqoop.util.ExitSecurityException { public ExitSecurityException() { - super("ExitSecurityException"); - this.exitStatus = 0; + super(); } public ExitSecurityException(final String message) { super(message); - this.exitStatus = 0; } - /** - * Register a System.exit() event being suppressed with a particular - * exit status code. - */ public ExitSecurityException(int status) { - super("ExitSecurityException"); - this.exitStatus = status; + super(status); } - @Override - public String toString() { - String msg = getMessage(); - return (null == msg) ? ("exit with status " + exitStatus) : msg; - } - - public int getExitStatus() { - return this.exitStatus; - } } diff --git a/src/java/com/cloudera/sqoop/util/ExportException.java b/src/java/com/cloudera/sqoop/util/ExportException.java index 105152b2..6c86795e 100644 --- a/src/java/com/cloudera/sqoop/util/ExportException.java +++ b/src/java/com/cloudera/sqoop/util/ExportException.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,13 +19,14 @@ package com.cloudera.sqoop.util; /** - * General error during export process. + * @deprecated Moving to use org.apache.sqoop namespace. */ @SuppressWarnings("serial") -public class ExportException extends Exception { +public class ExportException + extends org.apache.sqoop.util.ExportException { public ExportException() { - super("ExportException"); + super(); } public ExportException(final String message) { @@ -42,9 +41,4 @@ public ExportException(final String message, final Throwable cause) { super(message, cause); } - @Override - public String toString() { - String msg = getMessage(); - return (null == msg) ? "ExportException" : msg; - } } diff --git a/src/java/com/cloudera/sqoop/util/FileListing.java b/src/java/com/cloudera/sqoop/util/FileListing.java index 9859cb08..12494d85 100644 --- a/src/java/com/cloudera/sqoop/util/FileListing.java +++ b/src/java/com/cloudera/sqoop/util/FileListing.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,115 +18,30 @@ package com.cloudera.sqoop.util; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.List; /** - * Recursive file listing under a specified directory. - * - * Taken from http://www.javapractices.com/topic/TopicAction.do?Id=68 - * Used under the terms of the CC Attribution license: - * http://creativecommons.org/licenses/by/3.0/ - * - * Method by Alex Wong (javapractices.com) + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class FileListing { private FileListing() { } - /** - * Demonstrate use. - * - * @param aArgs - aArgs[0] is the full name of an existing - * directory that can be read. - */ public static void main(String... aArgs) throws FileNotFoundException { - File startingDirectory = new File(aArgs[0]); - List files = FileListing.getFileListing(startingDirectory); - - //print out all file names, in the the order of File.compareTo() - for (File file : files) { - System.out.println(file); - } + org.apache.sqoop.util.FileListing.main(aArgs); } - /** - * Recursively walk a directory tree and return a List of all - * Files found; the List is sorted using File.compareTo(). - * - * @param aStartingDir is a valid directory, which can be read. - */ public static List getFileListing(File aStartingDir) throws FileNotFoundException { - validateDirectory(aStartingDir); - List result = getFileListingNoSort(aStartingDir); - Collections.sort(result); - return result; + return org.apache.sqoop.util.FileListing.getFileListing(aStartingDir); } - private static List getFileListingNoSort(File aStartingDir) - throws FileNotFoundException { - List result = new ArrayList(); - File[] filesAndDirs = aStartingDir.listFiles(); - List filesDirs = Arrays.asList(filesAndDirs); - for (File file : filesDirs) { - result.add(file); //always add, even if directory - if (!file.isFile()) { - //must be a directory - //recursive call! - List deeperList = getFileListingNoSort(file); - result.addAll(deeperList); - } - } - return result; - } - - /** - * Directory is valid if it exists, does not represent a file, and can be read. - */ - private static void validateDirectory(File aDirectory) - throws FileNotFoundException { - if (aDirectory == null) { - throw new IllegalArgumentException("Directory should not be null."); - } - if (!aDirectory.exists()) { - throw new FileNotFoundException("Directory does not exist: " - + aDirectory); - } - if (!aDirectory.isDirectory()) { - throw new IllegalArgumentException("Is not a directory: " + aDirectory); - } - if (!aDirectory.canRead()) { - throw new IllegalArgumentException("Directory cannot be read: " - + aDirectory); - } - } - - /** - * Recursively delete a directory and all its children. - * @param dir is a valid directory. - */ public static void recursiveDeleteDir(File dir) throws IOException { - if (!dir.exists()) { - throw new FileNotFoundException(dir.toString() + " does not exist"); - } - - if (dir.isDirectory()) { - // recursively descend into all children and delete them. - File [] children = dir.listFiles(); - for (File child : children) { - recursiveDeleteDir(child); - } - } - - if (!dir.delete()) { - throw new IOException("Could not remove: " + dir); - } + org.apache.sqoop.util.FileListing.recursiveDeleteDir(dir); } + } diff --git a/src/java/com/cloudera/sqoop/util/ImportException.java b/src/java/com/cloudera/sqoop/util/ImportException.java index 5307c5b3..34e67679 100644 --- a/src/java/com/cloudera/sqoop/util/ImportException.java +++ b/src/java/com/cloudera/sqoop/util/ImportException.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,13 +19,14 @@ package com.cloudera.sqoop.util; /** - * General error during the import process. + * @deprecated Moving to use org.apache.sqoop namespace. */ @SuppressWarnings("serial") -public class ImportException extends Exception { +public class ImportException + extends org.apache.sqoop.util.ImportException { public ImportException() { - super("ImportException"); + super(); } public ImportException(final String message) { @@ -42,9 +41,4 @@ public ImportException(final String message, final Throwable cause) { super(message, cause); } - @Override - public String toString() { - String msg = getMessage(); - return (null == msg) ? "ImportException" : msg; - } } diff --git a/src/java/com/cloudera/sqoop/util/Jars.java b/src/java/com/cloudera/sqoop/util/Jars.java index 8a8b8391..09a4f165 100644 --- a/src/java/com/cloudera/sqoop/util/Jars.java +++ b/src/java/com/cloudera/sqoop/util/Jars.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,88 +18,26 @@ package com.cloudera.sqoop.util; -import java.io.IOException; -import java.net.URL; -import java.net.URLDecoder; -import java.util.Enumeration; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import com.cloudera.sqoop.manager.ConnManager; /** - * Utility class; returns the locations of various jars. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class Jars { - public static final Log LOG = LogFactory.getLog( - Jars.class.getName()); + private Jars() { } - private Jars() { - } - - /** - * @return the path to the main Sqoop jar. - */ public static String getSqoopJarPath() { - return getJarPathForClass(Jars.class); + return org.apache.sqoop.util.Jars.getSqoopJarPath(); } - /** - * Return the jar file path that contains a particular class. - * Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar(). - */ public static String getJarPathForClass(Class classObj) { - ClassLoader loader = classObj.getClassLoader(); - String classFile = classObj.getName().replaceAll("\\.", "/") + ".class"; - try { - for (Enumeration itr = loader.getResources(classFile); - itr.hasMoreElements();) { - URL url = (URL) itr.nextElement(); - if ("jar".equals(url.getProtocol())) { - String toReturn = url.getPath(); - if (toReturn.startsWith("file:")) { - toReturn = toReturn.substring("file:".length()); - } - // URLDecoder is a misnamed class, since it actually decodes - // x-www-form-urlencoded MIME type rather than actual - // URL encoding (which the file path has). Therefore it would - // decode +s to ' 's which is incorrect (spaces are actually - // either unencoded or encoded as "%20"). Replace +s first, so - // that they are kept sacred during the decoding process. - toReturn = toReturn.replaceAll("\\+", "%2B"); - toReturn = URLDecoder.decode(toReturn, "UTF-8"); - return toReturn.replaceAll("!.*$", ""); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; + return org.apache.sqoop.util.Jars.getJarPathForClass(classObj); } - /** - * Return the path to the jar containing the JDBC driver - * for a ConnManager. - */ public static String getDriverClassJar(ConnManager mgr) { - if (null == mgr) { - return null; - } - - String driverClassName = mgr.getDriverClass(); - if (null == driverClassName) { - return null; - } - - try { - Class driverClass = Class.forName(driverClassName); - return getJarPathForClass(driverClass); - } catch (ClassNotFoundException cnfe) { - LOG.warn("No such class " + driverClassName + " available."); - return null; - } + return org.apache.sqoop.util.Jars.getDriverClassJar(mgr); } + } diff --git a/src/java/com/cloudera/sqoop/util/JdbcUrl.java b/src/java/com/cloudera/sqoop/util/JdbcUrl.java index b11fedb5..f7ef10b9 100644 --- a/src/java/com/cloudera/sqoop/util/JdbcUrl.java +++ b/src/java/com/cloudera/sqoop/util/JdbcUrl.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,107 +18,23 @@ package com.cloudera.sqoop.util; -import java.net.MalformedURLException; -import java.net.URL; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** - * Some utilities for parsing JDBC URLs which may not be tolerated - * by Java's java.net.URL class. - * java.net.URL does not support multi:part:scheme:// components, which - * virtually all JDBC connect string URLs have. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class JdbcUrl { - public static final Log LOG = LogFactory.getLog(JdbcUrl.class.getName()); + private JdbcUrl() { } - private JdbcUrl() { - } - - /** - * @return the database name from the connect string, which is typically the - * 'path' component, or null if we can't. - */ public static String getDatabaseName(String connectString) { - try { - String sanitizedString = null; - int schemeEndOffset = connectString.indexOf("://"); - if (-1 == schemeEndOffset) { - // couldn't find one? try our best here. - sanitizedString = "http://" + connectString; - LOG.warn("Could not find database access scheme in connect string " - + connectString); - } else { - sanitizedString = "http" + connectString.substring(schemeEndOffset); - } - - URL connectUrl = new URL(sanitizedString); - String databaseName = connectUrl.getPath(); - if (null == databaseName) { - return null; - } - - // This is taken from a 'path' part of a URL, which may have leading '/' - // characters; trim them off. - while (databaseName.startsWith("/")) { - databaseName = databaseName.substring(1); - } - - return databaseName; - } catch (MalformedURLException mue) { - LOG.error("Malformed connect string URL: " + connectString - + "; reason is " + mue.toString()); - return null; - } + return org.apache.sqoop.util.JdbcUrl.getDatabaseName(connectString); } - /** - * @return the hostname from the connect string, or null if we can't. - */ public static String getHostName(String connectString) { - try { - String sanitizedString = null; - int schemeEndOffset = connectString.indexOf("://"); - if (-1 == schemeEndOffset) { - // Couldn't find one? ok, then there's no problem, it should work as a - // URL. - sanitizedString = connectString; - } else { - sanitizedString = "http" + connectString.substring(schemeEndOffset); - } - - URL connectUrl = new URL(sanitizedString); - return connectUrl.getHost(); - } catch (MalformedURLException mue) { - LOG.error("Malformed connect string URL: " + connectString - + "; reason is " + mue.toString()); - return null; - } + return org.apache.sqoop.util.JdbcUrl.getHostName(connectString); } - /** - * @return the port from the connect string, or -1 if we can't. - */ public static int getPort(String connectString) { - try { - String sanitizedString = null; - int schemeEndOffset = connectString.indexOf("://"); - if (-1 == schemeEndOffset) { - // Couldn't find one? ok, then there's no problem, it should work as a - // URL. - sanitizedString = connectString; - } else { - sanitizedString = "http" + connectString.substring(schemeEndOffset); - } - - URL connectUrl = new URL(sanitizedString); - return connectUrl.getPort(); - } catch (MalformedURLException mue) { - LOG.error("Malformed connect string URL: " + connectString - + "; reason is " + mue.toString()); - return -1; - } + return org.apache.sqoop.util.JdbcUrl.getPort(connectString); } + } diff --git a/src/java/com/cloudera/sqoop/util/LoggingAsyncSink.java b/src/java/com/cloudera/sqoop/util/LoggingAsyncSink.java index 7caea893..9699252a 100644 --- a/src/java/com/cloudera/sqoop/util/LoggingAsyncSink.java +++ b/src/java/com/cloudera/sqoop/util/LoggingAsyncSink.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,80 +18,17 @@ package com.cloudera.sqoop.util; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.IOException; - import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** - * An AsyncSink that takes the contents of a stream and writes - * it to log4j. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class LoggingAsyncSink extends AsyncSink { - - public static final Log LOG = LogFactory.getLog( - LoggingAsyncSink.class.getName()); - - private Log contextLog; +public class LoggingAsyncSink + extends org.apache.sqoop.util.LoggingAsyncSink { public LoggingAsyncSink(final Log context) { - if (null == context) { - this.contextLog = LOG; - } else { - this.contextLog = context; - } + super(context); } - private Thread child; - - public void processStream(InputStream is) { - child = new LoggingThread(is); - child.start(); - } - - public int join() throws InterruptedException { - child.join(); - return 0; // always successful. - } - - /** - * Run a background thread that copies the contents of the stream - * to the output context log. - */ - private class LoggingThread extends Thread { - - private InputStream stream; - - LoggingThread(final InputStream is) { - this.stream = is; - } - - public void run() { - InputStreamReader isr = new InputStreamReader(this.stream); - BufferedReader r = new BufferedReader(isr); - - try { - while (true) { - String line = r.readLine(); - if (null == line) { - break; // stream was closed by remote end. - } - - LoggingAsyncSink.this.contextLog.info(line); - } - } catch (IOException ioe) { - LOG.error("IOException reading from stream: " + ioe.toString()); - } - - try { - r.close(); - } catch (IOException ioe) { - LOG.warn("Error closing stream in LoggingAsyncSink: " + ioe.toString()); - } - } - } } diff --git a/src/java/com/cloudera/sqoop/util/LoggingUtils.java b/src/java/com/cloudera/sqoop/util/LoggingUtils.java index 1ea17f52..9cd50fff 100644 --- a/src/java/com/cloudera/sqoop/util/LoggingUtils.java +++ b/src/java/com/cloudera/sqoop/util/LoggingUtils.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,29 +23,14 @@ import org.apache.commons.logging.Log; /** - * A helper class for logging. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class LoggingUtils { - /** - * Private constructor to prevent instantiation. - */ - private LoggingUtils() { - } + private LoggingUtils() { } - /** - * Log every exception in the chain if - * the exception is a chain of exceptions. - */ public static void logAll(Log log, SQLException e) { - log.error("Top level exception: ", e); - e = e.getNextException(); - int indx = 1; - while (e != null) { - log.error("Chained exception " + indx + ": ", e); - e = e.getNextException(); - indx++; - } + org.apache.sqoop.util.LoggingUtils.logAll(log, e); } } diff --git a/src/java/com/cloudera/sqoop/util/NullAsyncSink.java b/src/java/com/cloudera/sqoop/util/NullAsyncSink.java index 3a5803b9..7d607d5d 100644 --- a/src/java/com/cloudera/sqoop/util/NullAsyncSink.java +++ b/src/java/com/cloudera/sqoop/util/NullAsyncSink.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,68 +18,9 @@ package com.cloudera.sqoop.util; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** - * An AsyncSink that takes the contents of a stream and ignores it. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class NullAsyncSink extends AsyncSink { - - public static final Log LOG = LogFactory.getLog( - NullAsyncSink.class.getName()); - - private Thread child; - - public void processStream(InputStream is) { - child = new IgnoringThread(is); - child.start(); - } - - public int join() throws InterruptedException { - child.join(); - return 0; // always successful. - } - - /** - * Run a background thread that reads and ignores the - * contents of the stream. - */ - private static class IgnoringThread extends Thread { - - private InputStream stream; - - IgnoringThread(final InputStream is) { - this.stream = is; - } - - public void run() { - InputStreamReader isr = new InputStreamReader(this.stream); - BufferedReader r = new BufferedReader(isr); - - try { - while (true) { - String line = r.readLine(); - if (null == line) { - break; // stream was closed by remote end. - } - } - } catch (IOException ioe) { - LOG.warn("IOException reading from (ignored) stream: " - + ioe.toString()); - } - - try { - r.close(); - } catch (IOException ioe) { - LOG.warn("Error closing stream in NullAsyncSink: " + ioe.toString()); - } - } - } +public class NullAsyncSink + extends org.apache.sqoop.util.NullAsyncSink { } - diff --git a/src/java/com/cloudera/sqoop/util/OptionsFileUtil.java b/src/java/com/cloudera/sqoop/util/OptionsFileUtil.java index 0286114e..6c6cacd2 100644 --- a/src/java/com/cloudera/sqoop/util/OptionsFileUtil.java +++ b/src/java/com/cloudera/sqoop/util/OptionsFileUtil.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,162 +18,19 @@ package com.cloudera.sqoop.util; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.cloudera.sqoop.Sqoop; - /** - * Provides utility functions to read in options file. An options file is a - * regular text file with each line specifying a separate option. An option - * may continue into a following line by using a back-slash separator character - * at the end of the non-terminating line. Options file also allow empty lines - * and comment lines which are disregarded. Comment lines must begin with the - * hash character as the first character. Leading and trailing white-spaces are - * ignored for any options read from the Options file. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class OptionsFileUtil { - public static final Log LOG = LogFactory.getLog( - OptionsFileUtil.class.getName()); + private OptionsFileUtil() { } - /** - * Expands any options file that may be present in the given set of arguments. - * - * @param args the given arguments - * @return a new string array that contains the expanded arguments. - * @throws Exception - */ public static String[] expandArguments(String[] args) throws Exception { List options = new ArrayList(); - - for (int i = 0; i < args.length; i++) { - if (args[i].equals(Sqoop.SQOOP_OPTIONS_FILE_SPECIFIER)) { - if (i == args.length - 1) { - throw new Exception("Missing options file"); - } - - String fileName = args[++i]; - File optionsFile = new File(fileName); - BufferedReader reader = null; - StringBuilder buffer = new StringBuilder(); - try { - reader = new BufferedReader(new FileReader(optionsFile)); - String nextLine = null; - while ((nextLine = reader.readLine()) != null) { - nextLine = nextLine.trim(); - if (nextLine.length() == 0 || nextLine.startsWith("#")) { - // empty line or comment - continue; - } - - buffer.append(nextLine); - if (nextLine.endsWith("\\")) { - if (buffer.charAt(0) == '\'' || buffer.charAt(0) == '"') { - throw new Exception( - "Multiline quoted strings not supported in file(" - + fileName + "): " + buffer.toString()); - } - // Remove the trailing back-slash and continue - buffer.deleteCharAt(buffer.length() - 1); - } else { - // The buffer contains a full option - options.add( - removeQuotesEncolosingOption(fileName, buffer.toString())); - buffer.delete(0, buffer.length()); - } - } - - // Assert that the buffer is empty - if (buffer.length() != 0) { - throw new Exception("Malformed option in options file(" - + fileName + "): " + buffer.toString()); - } - } catch (IOException ex) { - throw new Exception("Unable to read options file: " + fileName, ex); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException ex) { - LOG.info("Exception while closing reader", ex); - } - } - } - } else { - // Regular option. Parse it and put it on the appropriate list - options.add(args[i]); - } - } - - return options.toArray(new String[options.size()]); - } - - /** - * Removes the surrounding quote characters as needed. It first attempts to - * remove surrounding double quotes. If successful, the resultant string is - * returned. If no surrounding double quotes are found, it attempts to remove - * surrounding single quote characters. If successful, the resultant string - * is returned. If not the original string is returnred. - * @param fileName - * @param option - * @return - * @throws Exception - */ - private static String removeQuotesEncolosingOption( - String fileName, String option) throws Exception { - - // Attempt to remove double quotes. If successful, return. - String option1 = removeQuoteCharactersIfNecessary(fileName, option, '"'); - if (!option1.equals(option)) { - // Quotes were successfully removed - return option1; - } - - // Attempt to remove single quotes. - return removeQuoteCharactersIfNecessary(fileName, option, '\''); - } - - /** - * Removes the surrounding quote characters from the given string. The quotes - * are identified by the quote parameter, the given string by option. The - * fileName parameter is used for raising exceptions with relevant message. - * @param fileName - * @param option - * @param quote - * @return - * @throws Exception - */ - private static String removeQuoteCharactersIfNecessary(String fileName, - String option, char quote) throws Exception { - boolean startingQuote = (option.charAt(0) == quote); - boolean endingQuote = (option.charAt(option.length() - 1) == quote); - - if (startingQuote && endingQuote) { - if (option.length() == 1) { - throw new Exception("Malformed option in options file(" - + fileName + "): " + option); - } - return option.substring(1, option.length() - 1); - } - - if (startingQuote || endingQuote) { - throw new Exception("Malformed option in options file(" - + fileName + "): " + option); - } - - return option; - } - - private OptionsFileUtil() { - // Disable object creation + return org.apache.sqoop.util.OptionsFileUtil.expandArguments(args); } } diff --git a/src/java/com/cloudera/sqoop/util/PerfCounters.java b/src/java/com/cloudera/sqoop/util/PerfCounters.java index 9a69f7b2..a8460dd5 100644 --- a/src/java/com/cloudera/sqoop/util/PerfCounters.java +++ b/src/java/com/cloudera/sqoop/util/PerfCounters.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,115 +18,14 @@ package com.cloudera.sqoop.util; -import java.text.NumberFormat; - /** - * A quick set of performance counters for reporting import speed. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class PerfCounters { - - private long bytes; - private long nanoseconds; - - private long startTime; +public class PerfCounters + extends org.apache.sqoop.util.PerfCounters { public PerfCounters() { + super(); } - public void addBytes(long more) { - bytes += more; - } - - public void startClock() { - startTime = System.nanoTime(); - } - - public void stopClock() { - nanoseconds = System.nanoTime() - startTime; - } - - private static final double ONE_BILLION = 1000.0 * 1000.0 * 1000.0; - - /** Maximum number of digits after the decimal place. */ - private static final int MAX_PLACES = 4; - - /** - * @return A value in nanoseconds scaled to report in seconds - */ - private Double inSeconds(long nanos) { - return (double) nanos / ONE_BILLION; - } - - private static final long ONE_GB = 1024 * 1024 * 1024; - private static final long ONE_MB = 1024 * 1024; - private static final long ONE_KB = 1024; - - - /** - * @return a string of the form "xxxx bytes" or "xxxxx KB" or "xxxx GB", - * scaled as is appropriate for the current value. - */ - private String formatBytes() { - double val; - String scale; - if (bytes > ONE_GB) { - val = (double) bytes / (double) ONE_GB; - scale = "GB"; - } else if (bytes > ONE_MB) { - val = (double) bytes / (double) ONE_MB; - scale = "MB"; - } else if (bytes > ONE_KB) { - val = (double) bytes / (double) ONE_KB; - scale = "KB"; - } else { - val = (double) bytes; - scale = "bytes"; - } - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setMaximumFractionDigits(MAX_PLACES); - return fmt.format(val) + " " + scale; - } - - private String formatTimeInSeconds() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setMaximumFractionDigits(MAX_PLACES); - return fmt.format(inSeconds(this.nanoseconds)) + " seconds"; - } - - /** - * @return a string of the form "xxx bytes/sec" or "xxx KB/sec" scaled as is - * appropriate for the current value. - */ - private String formatSpeed() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setMaximumFractionDigits(MAX_PLACES); - - Double seconds = inSeconds(this.nanoseconds); - - double speed = (double) bytes / seconds; - double val; - String scale; - if (speed > ONE_GB) { - val = speed / (double) ONE_GB; - scale = "GB"; - } else if (speed > ONE_MB) { - val = speed / (double) ONE_MB; - scale = "MB"; - } else if (speed > ONE_KB) { - val = speed / (double) ONE_KB; - scale = "KB"; - } else { - val = speed; - scale = "bytes"; - } - - return fmt.format(val) + " " + scale + "/sec"; - } - - public String toString() { - return formatBytes() + " in " + formatTimeInSeconds() + " (" - + formatSpeed() + ")"; - } } - diff --git a/src/java/com/cloudera/sqoop/util/RandomHash.java b/src/java/com/cloudera/sqoop/util/RandomHash.java index e3d0d47f..d3ba497e 100644 --- a/src/java/com/cloudera/sqoop/util/RandomHash.java +++ b/src/java/com/cloudera/sqoop/util/RandomHash.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,45 +18,20 @@ package com.cloudera.sqoop.util; -import java.rmi.server.UID; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - /** - * Securely generate random MD5 signatures for use as nonce values. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class RandomHash { - private RandomHash() { - } + private RandomHash() { } - /** - * Generate a new random md5 hash. - * @return a securely-generated random 16 byte sequence. - */ public static byte [] generateMD5Bytes() { - try { - MessageDigest digester = MessageDigest.getInstance("MD5"); - long time = System.currentTimeMillis(); - digester.update((new UID() + "@" + time).getBytes()); - return digester.digest(); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } + return org.apache.sqoop.util.RandomHash.generateMD5Bytes(); } - /** - * Generate a new random md5 hash and convert it to a string. - * @return a securely-generated random string. - */ public static String generateMD5String() { - byte [] bytes = generateMD5Bytes(); - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - int x = ((int) b) & 0xFF; - sb.append(String.format("%02x", x)); - } - return sb.toString(); + return org.apache.sqoop.util.RandomHash.generateMD5String(); } + } diff --git a/src/java/com/cloudera/sqoop/util/ResultSetPrinter.java b/src/java/com/cloudera/sqoop/util/ResultSetPrinter.java index fa9598c5..4dc9c724 100644 --- a/src/java/com/cloudera/sqoop/util/ResultSetPrinter.java +++ b/src/java/com/cloudera/sqoop/util/ResultSetPrinter.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,103 +18,10 @@ package com.cloudera.sqoop.util; -import java.io.IOException; -import java.io.PrintWriter; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.StringUtils; - /** - * Utility methods to format and print ResultSet objects. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class ResultSetPrinter { - - public static final Log LOG = LogFactory.getLog( - ResultSetPrinter.class.getName()); - - // max output width to allocate to any column of the printed results. - private static final int MAX_COL_WIDTH = 20; - - /** - * Print 'str' to the string builder, padded to 'width' chars. - */ - private static void printPadded(StringBuilder sb, String str, int width) { - int numPad; - if (null == str) { - sb.append("(null)"); - numPad = width - "(null)".length(); - } else { - sb.append(str); - numPad = width - str.length(); - } - - for (int i = 0; i < numPad; i++) { - sb.append(' '); - } - } - - private static final String COL_SEPARATOR = " | "; - private static final String LEFT_BORDER = "| "; - - /** - * Format the contents of the ResultSet into something that could be printed - * neatly; the results are appended to the supplied StringBuilder. - */ - public final void printResultSet(PrintWriter pw, ResultSet results) - throws IOException { - try { - StringBuilder sbNames = new StringBuilder(); - int cols = results.getMetaData().getColumnCount(); - - int [] colWidths = new int[cols]; - ResultSetMetaData metadata = results.getMetaData(); - sbNames.append(LEFT_BORDER); - for (int i = 1; i < cols + 1; i++) { - String colName = metadata.getColumnName(i); - colWidths[i - 1] = Math.min(metadata.getColumnDisplaySize(i), - MAX_COL_WIDTH); - if (colName == null || colName.equals("")) { - colName = metadata.getColumnLabel(i) + "*"; - } - printPadded(sbNames, colName, colWidths[i - 1]); - sbNames.append(COL_SEPARATOR); - } - sbNames.append('\n'); - - StringBuilder sbPad = new StringBuilder(); - for (int i = 0; i < cols; i++) { - for (int j = 0; j < COL_SEPARATOR.length() + colWidths[i]; j++) { - sbPad.append('-'); - } - } - sbPad.append('-'); - sbPad.append('\n'); - - pw.print(sbPad.toString()); - pw.print(sbNames.toString()); - pw.print(sbPad.toString()); - - while (results.next()) { - StringBuilder sb = new StringBuilder(); - sb.append(LEFT_BORDER); - for (int i = 1; i < cols + 1; i++) { - printPadded(sb, results.getString(i), colWidths[i - 1]); - sb.append(COL_SEPARATOR); - } - sb.append('\n'); - pw.print(sb.toString()); - } - - pw.print(sbPad.toString()); - } catch (SQLException sqlException) { - LOG.error("Error reading from database: " - + StringUtils.stringifyException(sqlException)); - } - } - +public class ResultSetPrinter + extends org.apache.sqoop.util.ResultSetPrinter { } diff --git a/src/java/com/cloudera/sqoop/util/StoredAsProperty.java b/src/java/com/cloudera/sqoop/util/StoredAsProperty.java index 2e23d416..eebfcda8 100644 --- a/src/java/com/cloudera/sqoop/util/StoredAsProperty.java +++ b/src/java/com/cloudera/sqoop/util/StoredAsProperty.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,8 +25,7 @@ import java.lang.annotation.Target; /** - * Used by SqoopOptions to denote that a field is stored in a particular - * named property when reifying the object's state to permanent storage. + * @deprecated Moving to use org.apache.sqoop namespace. */ @Documented @Retention(RetentionPolicy.RUNTIME) diff --git a/src/java/com/cloudera/sqoop/util/SubprocessSecurityManager.java b/src/java/com/cloudera/sqoop/util/SubprocessSecurityManager.java index bed050e6..787564aa 100644 --- a/src/java/com/cloudera/sqoop/util/SubprocessSecurityManager.java +++ b/src/java/com/cloudera/sqoop/util/SubprocessSecurityManager.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,87 +18,15 @@ package com.cloudera.sqoop.util; -import java.security.Permission; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** - * A SecurityManager used to run subprocesses and disallow certain actions. - * - * This specifically disallows System.exit(). - * - * This SecurityManager will also check with any existing SecurityManager as - * to the validity of any permissions. The SubprocessSecurityManager should be - * installed with the install() method, which will retain a handle to any - * previously-installed SecurityManager instance. - * - * When this SecurityManager is no longer necessary, the uninstall() method - * should be used which reinstates the previous SecurityManager as the active - * SecurityManager. + * @deprecated Moving to use org.apache.sqoop namespace. */ -public class SubprocessSecurityManager extends SecurityManager { - - public static final Log LOG = LogFactory.getLog( - SubprocessSecurityManager.class.getName()); - - private SecurityManager parentSecurityManager; - private boolean installed; - private boolean allowReplacement; +public class SubprocessSecurityManager + extends org.apache.sqoop.util.SubprocessSecurityManager { public SubprocessSecurityManager() { - this.installed = false; - this.allowReplacement = false; + super(); } - /** - * Install this SecurityManager and retain a reference to any - * previously-installed SecurityManager. - */ - public void install() { - LOG.debug("Installing subprocess security manager"); - this.parentSecurityManager = System.getSecurityManager(); - System.setSecurityManager(this); - this.installed = true; - } - - /** - * Restore an existing SecurityManager, uninstalling this one. - */ - public void uninstall() { - if (this.installed) { - LOG.debug("Uninstalling subprocess security manager"); - this.allowReplacement = true; - System.setSecurityManager(this.parentSecurityManager); - } - } - - @Override - /** - * Disallow the capability to call System.exit() or otherwise - * terminate the JVM. - */ - public void checkExit(int status) { - LOG.debug("Rejecting System.exit call with status=" + status); - throw new ExitSecurityException(status); - } - - @Override - /** - * Check a particular permission. Checks with this SecurityManager - * as well as any previously-installed manager. - * - * @param perm the Permission to check; must not be null. - */ - public void checkPermission(Permission perm) { - if (null != this.parentSecurityManager) { - // Check if the prior SecurityManager would have rejected this. - parentSecurityManager.checkPermission(perm); - } - - if (!allowReplacement && perm.getName().equals("setSecurityManager")) { - throw new SecurityException("Cannot replace security manager"); - } - } } diff --git a/src/java/com/cloudera/sqoop/util/TaskId.java b/src/java/com/cloudera/sqoop/util/TaskId.java index 744b8bcc..e5b6aecf 100644 --- a/src/java/com/cloudera/sqoop/util/TaskId.java +++ b/src/java/com/cloudera/sqoop/util/TaskId.java @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,48 +26,18 @@ import com.cloudera.sqoop.config.ConfigurationConstants; /** - * Utility class; returns task attempt Id of the current job - * regardless of Hadoop version being used. + * @deprecated Moving to use org.apache.sqoop namespace. */ public final class TaskId { - private TaskId() { - } + private TaskId() { } - /** - * Return the task attempt id as a string. - * @param conf the Configuration to check for the current task attempt id. - * @param defaultVal the value to return if a task attempt id is not set. - * @return the current task attempt id, or the default value if one isn't set. - */ public static String get(Configuration conf, String defaultVal) { - return conf.get("mapreduce.task.id", - conf.get("mapred.task.id", defaultVal)); + return org.apache.sqoop.util.TaskId.get(conf, defaultVal); } - /** - * Return the local filesystem dir where the current task attempt can - * perform work. - * @return a File describing a directory where local temp data for the - * task attempt can be stored. - */ public static File getLocalWorkPath(Configuration conf) throws IOException { - String tmpDir = conf.get( - ConfigurationConstants.PROP_JOB_LOCAL_DIRECTORY, - "/tmp/"); - - // Create a local subdir specific to this task attempt. - String taskAttemptStr = TaskId.get(conf, "task_attempt"); - File taskAttemptDir = new File(tmpDir, taskAttemptStr); - if (!taskAttemptDir.exists()) { - boolean createdDir = taskAttemptDir.mkdirs(); - if (!createdDir) { - throw new IOException("Could not create missing task attempt dir: " - + taskAttemptDir.toString()); - } - } - - return taskAttemptDir; + return org.apache.sqoop.util.TaskId.getLocalWorkPath(conf); } } diff --git a/src/java/org/apache/sqoop/util/AppendUtils.java b/src/java/org/apache/sqoop/util/AppendUtils.java new file mode 100644 index 00000000..873c7185 --- /dev/null +++ b/src/java/org/apache/sqoop/util/AppendUtils.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.SqoopOptions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Utilities used when appending imported files to an existing dir. + */ +public class AppendUtils { + + public static final Log LOG = LogFactory.getLog(AppendUtils.class.getName()); + + private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat( + "ddHHmmssSSS"); + private static final String TEMP_IMPORT_ROOT = + System.getProperty("sqoop.test.import.rootDir", "_sqoop"); + + private static final int PARTITION_DIGITS = 5; + private static final String FILEPART_SEPARATOR = "-"; + private static final String FILEEXT_SEPARATOR = "."; + + private ImportJobContext context = null; + + public AppendUtils(ImportJobContext context) { + this.context = context; + } + + /** + * Moves the imported files from temporary directory to specified target-dir, + * renaming partition number if appending file exists. + */ + public void append() throws IOException { + + SqoopOptions options = context.getOptions(); + FileSystem fs = FileSystem.get(options.getConf()); + Path tempDir = context.getDestination(); + + // Try in this order: target-dir or warehouse-dir + Path userDestDir = null; + if (options.getTargetDir() != null) { + userDestDir = new Path(options.getTargetDir()); + } else if (options.getWarehouseDir() != null) { + userDestDir = new Path(options.getWarehouseDir(), + context.getTableName()); + } else { + userDestDir = new Path(context.getTableName()); + } + + int nextPartition = 0; + + if (!fs.exists(tempDir)) { + // This occurs if there was no source (tmp) dir. This might happen + // if the import was an HBase-target import, but the user specified + // --append anyway. This is a warning, not an error. + LOG.warn("Cannot append files to target dir; no such directory: " + + tempDir); + return; + } + + // Create target directory. + if (!fs.exists(userDestDir)) { + LOG.info("Creating missing output directory - " + userDestDir.getName()); + fs.mkdirs(userDestDir); + nextPartition = 0; + } else { + LOG.info("Appending to directory " + userDestDir.getName()); + // Get the right next partition for the imported files + nextPartition = getNextPartition(fs, userDestDir); + } + + // move files + moveFiles(fs, tempDir, userDestDir, nextPartition); + + // delete temporary path + LOG.debug("Deleting temporary folder " + tempDir.getName()); + fs.delete(tempDir, true); + } + + /** + * Returns the greatest partition number available for appending, for data + * files in targetDir. + */ + private int getNextPartition(FileSystem fs, Path targetDir) + throws IOException { + + int nextPartition = 0; + FileStatus[] existingFiles = fs.listStatus(targetDir); + if (existingFiles != null && existingFiles.length > 0) { + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + for (FileStatus fileStat : existingFiles) { + if (!fileStat.isDir()) { + String filename = fileStat.getPath().getName(); + Matcher mat = patt.matcher(filename); + if (mat.matches()) { + int thisPart = Integer.parseInt(mat.group(1)); + if (thisPart >= nextPartition) { + nextPartition = thisPart; + nextPartition++; + } + } + } + } + } + + if (nextPartition > 0) { + LOG.info("Using found partition " + nextPartition); + } + + return nextPartition; + } + + /** + * Move files from source to target using a specified starting partition. + */ + private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir, + int partitionStart) throws IOException { + + NumberFormat numpart = NumberFormat.getInstance(); + numpart.setMinimumIntegerDigits(PARTITION_DIGITS); + numpart.setGroupingUsed(false); + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + FileStatus[] tempFiles = fs.listStatus(sourceDir); + + if (null == tempFiles) { + // If we've already checked that the dir exists, and now it can't be + // listed, this is a genuine error (permissions, fs integrity, or other). + throw new IOException("Could not list files from " + sourceDir); + } + + // Move and rename files & directories from temporary to target-dir thus + // appending file's next partition + for (FileStatus fileStat : tempFiles) { + if (!fileStat.isDir()) { + // Move imported data files + String filename = fileStat.getPath().getName(); + Matcher mat = patt.matcher(filename); + if (mat.matches()) { + String name = getFilename(filename); + String fileToMove = name.concat(numpart.format(partitionStart++)); + String extension = getFileExtension(filename); + if (extension != null) { + fileToMove = fileToMove.concat(extension); + } + LOG.debug("Filename: " + filename + " repartitioned to: " + + fileToMove); + fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove)); + } + } else { + // Move directories (_logs & any other) + String dirName = fileStat.getPath().getName(); + Path path = new Path(targetDir, dirName); + int dirNumber = 0; + while (fs.exists(path)) { + path = new Path(targetDir, dirName.concat("-").concat( + numpart.format(dirNumber++))); + } + LOG.debug("Directory: " + dirName + " renamed to: " + path.getName()); + fs.rename(fileStat.getPath(), path); + } + } + } + + /** returns the name component of a file. */ + private String getFilename(String filename) { + String result = null; + int pos = filename.lastIndexOf(FILEPART_SEPARATOR); + if (pos != -1) { + result = filename.substring(0, pos + 1); + } else { + pos = filename.lastIndexOf(FILEEXT_SEPARATOR); + if (pos != -1) { + result = filename.substring(0, pos); + } else { + result = filename; + } + } + return result; + } + + /** returns the extension component of a filename. */ + private String getFileExtension(String filename) { + int pos = filename.lastIndexOf(FILEEXT_SEPARATOR); + if (pos != -1) { + return filename.substring(pos, filename.length()); + } else { + return null; + } + } + + /** + * Creates a unique path object inside the sqoop temporary directory. + * + * @param tableName + * @return a path pointing to the temporary directory + */ + public static Path getTempAppendDir(String tableName) { + String timeId = DATE_FORM.format(new Date(System.currentTimeMillis())); + String tempDir = TEMP_IMPORT_ROOT + Path.SEPARATOR + timeId + tableName; + return new Path(tempDir); + } + +} diff --git a/src/java/org/apache/sqoop/util/AsyncSink.java b/src/java/org/apache/sqoop/util/AsyncSink.java new file mode 100644 index 00000000..794592f8 --- /dev/null +++ b/src/java/org/apache/sqoop/util/AsyncSink.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.InputStream; + +/** + * An interface describing a factory class for a Thread class that handles + * input from some sort of stream. + * + * When the stream is closed, the thread should terminate. + */ +public abstract class AsyncSink { + + /** + * Create and run a thread to handle input from the provided InputStream. + * When processStream returns, the thread should be running; it should + * continue to run until the InputStream is exhausted. + */ + public abstract void processStream(InputStream is); + + /** + * Wait until the stream has been processed. + * @return a status code indicating success or failure. 0 is typical for + * success. + */ + public abstract int join() throws InterruptedException; + +} + diff --git a/src/java/org/apache/sqoop/util/ClassLoaderStack.java b/src/java/org/apache/sqoop/util/ClassLoaderStack.java new file mode 100644 index 00000000..8e41cb30 --- /dev/null +++ b/src/java/org/apache/sqoop/util/ClassLoaderStack.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Allows you to add and remove jar-files from the running JVM by + * instantiating classloaders for them. + */ +public final class ClassLoaderStack { + + public static final Log LOG = LogFactory.getLog( + ClassLoaderStack.class.getName()); + + private ClassLoaderStack() { + } + + /** + * Sets the classloader for the current thread. + */ + public static void setCurrentClassLoader(ClassLoader cl) { + LOG.debug("Restoring classloader: " + cl.toString()); + Thread.currentThread().setContextClassLoader(cl); + } + + /** + * Adds a ClassLoader to the top of the stack that will load from the Jar + * file of your choice. Returns the previous classloader so you can restore + * it if need be, later. + * + * @param jarFile The filename of a jar file that you want loaded into this + * JVM. + * @param testClassName The name of the class to load immediately + * (optional). + */ + public static ClassLoader addJarFile(String jarFile, String testClassName) + throws IOException { + + ClassLoader prevClassLoader = + Thread.currentThread().getContextClassLoader(); + + if (null != testClassName) { + try { + // Test to see if testClassName is already available. If so, do not + // load this jar. + LOG.debug("Checking for existing class: " + testClassName); + Class.forName(testClassName, true, prevClassLoader); + LOG.debug("Class is already available. Skipping jar " + jarFile); + return prevClassLoader; + } catch (ClassNotFoundException cnfe) { + // Expected this; we need to load the jar. continue. + } + } + + String urlPath = "jar:file://" + new File(jarFile).getAbsolutePath() + "!/"; + LOG.debug("Attempting to load jar through URL: " + urlPath); + LOG.debug("Previous classloader is " + prevClassLoader); + URL [] jarUrlArray = {new URL(urlPath)}; + URLClassLoader cl = URLClassLoader.newInstance(jarUrlArray, + prevClassLoader); + try { + if (null != testClassName) { + // try to load a class from the jar to force loading now. + LOG.debug("Testing class in jar: " + testClassName); + Class.forName(testClassName, true, cl); + } + LOG.debug("Loaded jar into current JVM: " + urlPath); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Could not load jar " + jarFile + + " into JVM. (Could not find class " + + testClassName + ".)", cnfe); + } + + LOG.debug("Added classloader for jar " + jarFile + ": " + cl); + Thread.currentThread().setContextClassLoader(cl); + return prevClassLoader; + } + +} diff --git a/src/java/org/apache/sqoop/util/DirectImportUtils.java b/src/java/org/apache/sqoop/util/DirectImportUtils.java new file mode 100644 index 00000000..8ea9662c --- /dev/null +++ b/src/java/org/apache/sqoop/util/DirectImportUtils.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.conf.Configuration; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.io.CodecMap; +import com.cloudera.sqoop.io.SplittingOutputStream; +import com.cloudera.sqoop.io.SplittableBufferedWriter; + +import org.apache.hadoop.util.Shell; +import com.cloudera.sqoop.manager.ImportJobContext; + +/** + * Utility methods that are common to various the direct import managers. + */ +public final class DirectImportUtils { + + public static final Log LOG = LogFactory.getLog( + DirectImportUtils.class.getName()); + + private DirectImportUtils() { + } + + /** + * Executes chmod on the specified file, passing in the mode string 'modstr' + * which may be e.g. "a+x" or "0600", etc. + * @throws IOException if chmod failed. + */ + public static void setFilePermissions(File file, String modstr) + throws IOException { + // Set this file to be 0600. Java doesn't have a built-in mechanism for this + // so we need to go out to the shell to execute chmod. + try { + Shell.execCommand("chmod", modstr, file.toString()); + } catch (IOException ioe) { + // Shell.execCommand will throw IOException on exit code != 0. + LOG.error("Could not chmod " + modstr + " " + file.toString()); + throw new IOException("Could not ensure password file security.", ioe); + } + } + + /** + * Open a file in HDFS for write to hold the data associated with a table. + * Creates any necessary directories, and returns the OutputStream to write + * to. The caller is responsible for calling the close() method on the + * returned stream. + */ + public static SplittableBufferedWriter createHdfsSink(Configuration conf, + SqoopOptions options, ImportJobContext context) throws IOException { + + FileSystem fs = FileSystem.get(conf); + Path destDir = context.getDestination(); + + LOG.debug("Writing to filesystem: " + conf.get("fs.default.name")); + LOG.debug("Creating destination directory " + destDir); + fs.mkdirs(destDir); + + // This Writer will be closed by the caller. + return new SplittableBufferedWriter( + new SplittingOutputStream(conf, destDir, "data-", + options.getDirectSplitSize(), getCodec(conf, options))); + } + + private static CompressionCodec getCodec(Configuration conf, + SqoopOptions options) throws IOException { + if (options.shouldUseCompression()) { + if (options.getCompressionCodec() == null) { + return new GzipCodec(); + } else { + return CodecMap.getCodec(options.getCompressionCodec(), conf); + } + } + return null; + } + + /** @return true if someHost refers to localhost. + */ + public static boolean isLocalhost(String someHost) { + if (null == someHost) { + return false; + } + + try { + InetAddress localHostAddr = InetAddress.getLocalHost(); + InetAddress someAddr = InetAddress.getByName(someHost); + + return localHostAddr.equals(someAddr); + } catch (UnknownHostException uhe) { + return false; + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/ErrorableAsyncSink.java b/src/java/org/apache/sqoop/util/ErrorableAsyncSink.java new file mode 100644 index 00000000..aaea3223 --- /dev/null +++ b/src/java/org/apache/sqoop/util/ErrorableAsyncSink.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import com.cloudera.sqoop.util.AsyncSink; + +/** + * Partial implementation of AsyncSink that relies on ErrorableThread to + * provide a status bit for the join() method. + */ +public abstract class ErrorableAsyncSink extends AsyncSink { + + protected ErrorableThread child; + + public int join() throws InterruptedException { + child.join(); + if (child.isErrored()) { + return 1; + } else { + return 0; + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/ErrorableThread.java b/src/java/org/apache/sqoop/util/ErrorableThread.java new file mode 100644 index 00000000..090043a5 --- /dev/null +++ b/src/java/org/apache/sqoop/util/ErrorableThread.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +/** + * A thread which has an error bit which can be set from within the thread. + */ +public abstract class ErrorableThread extends Thread { + + private volatile boolean error; + + public ErrorableThread() { + this.error = false; + } + + protected void setError() { + this.error = true; + } + + public boolean isErrored() { + return this.error; + } + +} + diff --git a/src/java/org/apache/sqoop/util/Executor.java b/src/java/org/apache/sqoop/util/Executor.java new file mode 100644 index 00000000..651e9dca --- /dev/null +++ b/src/java/org/apache/sqoop/util/Executor.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Runs a process via Runtime.exec() and allows handling of stdout/stderr to be + * deferred to other threads. + * + */ +public final class Executor { + + public static final Log LOG = LogFactory.getLog(Executor.class.getName()); + + private Executor() { + } + + /** + * Execute a program defined by the args array with default stream sinks + * that consume the program's output (to prevent it from blocking on buffers) + * and then ignore said output. + */ + public static int exec(String [] args) throws IOException { + NullAsyncSink s = new NullAsyncSink(); + return exec(args, s, s); + } + + /** + * Run a command via Runtime.exec(), with its stdout and stderr streams + * directed to be handled by threads generated by AsyncSinks. + * Block until the child process terminates. + * + * @return the exit status of the ran program + */ + public static int exec(String [] args, AsyncSink outSink, + AsyncSink errSink) throws IOException { + return exec(args, null, outSink, errSink); + } + + + /** + * Run a command via Runtime.exec(), with its stdout and stderr streams + * directed to be handled by threads generated by AsyncSinks. + * Block until the child process terminates. Allows the programmer to + * specify an environment for the child program. + * + * @return the exit status of the ran program + */ + public static int exec(String [] args, String [] envp, AsyncSink outSink, + AsyncSink errSink) throws IOException { + + // launch the process. + Process p = Runtime.getRuntime().exec(args, envp); + + // dispatch its stdout and stderr to stream sinks if available. + if (null != outSink) { + outSink.processStream(p.getInputStream()); + } + + if (null != errSink) { + errSink.processStream(p.getErrorStream()); + } + + // wait for the return value. + while (true) { + try { + int ret = p.waitFor(); + return ret; + } catch (InterruptedException ie) { + continue; + } + } + } + + + /** + * @return An array formatted correctly for use as an envp based on the + * current environment for this program. + */ + public static List getCurEnvpStrings() { + Map curEnv = System.getenv(); + ArrayList array = new ArrayList(); + + if (null == curEnv) { + return null; + } + + for (Map.Entry entry : curEnv.entrySet()) { + array.add(entry.getKey() + "=" + entry.getValue()); + } + + return array; + } + +} diff --git a/src/java/org/apache/sqoop/util/ExitSecurityException.java b/src/java/org/apache/sqoop/util/ExitSecurityException.java new file mode 100644 index 00000000..b4e57cba --- /dev/null +++ b/src/java/org/apache/sqoop/util/ExitSecurityException.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +/** + * SecurityException suppressing a System.exit() call. + * + * Allows retrieval of the would-be exit status code. + */ +@SuppressWarnings("serial") +public class ExitSecurityException extends SecurityException { + + private final int exitStatus; + + public ExitSecurityException() { + super("ExitSecurityException"); + this.exitStatus = 0; + } + + public ExitSecurityException(final String message) { + super(message); + this.exitStatus = 0; + } + + /** + * Register a System.exit() event being suppressed with a particular + * exit status code. + */ + public ExitSecurityException(int status) { + super("ExitSecurityException"); + this.exitStatus = status; + } + + @Override + public String toString() { + String msg = getMessage(); + return (null == msg) ? ("exit with status " + exitStatus) : msg; + } + + public int getExitStatus() { + return this.exitStatus; + } + +} diff --git a/src/java/org/apache/sqoop/util/ExportException.java b/src/java/org/apache/sqoop/util/ExportException.java new file mode 100644 index 00000000..675840a1 --- /dev/null +++ b/src/java/org/apache/sqoop/util/ExportException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +/** + * General error during export process. + */ +@SuppressWarnings("serial") +public class ExportException extends Exception { + + public ExportException() { + super("ExportException"); + } + + public ExportException(final String message) { + super(message); + } + + public ExportException(final Throwable cause) { + super(cause); + } + + public ExportException(final String message, final Throwable cause) { + super(message, cause); + } + + @Override + public String toString() { + String msg = getMessage(); + return (null == msg) ? "ExportException" : msg; + } + +} diff --git a/src/java/org/apache/sqoop/util/FileListing.java b/src/java/org/apache/sqoop/util/FileListing.java new file mode 100644 index 00000000..35a39b91 --- /dev/null +++ b/src/java/org/apache/sqoop/util/FileListing.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * Recursive file listing under a specified directory. + * + * Taken from http://www.javapractices.com/topic/TopicAction.do?Id=68 + * Used under the terms of the CC Attribution license: + * http://creativecommons.org/licenses/by/3.0/ + * + * Method by Alex Wong (javapractices.com) + */ +public final class FileListing { + + private FileListing() { } + + /** + * Demonstrate use. + * + * @param aArgs - aArgs[0] is the full name of an existing + * directory that can be read. + */ + public static void main(String... aArgs) throws FileNotFoundException { + File startingDirectory = new File(aArgs[0]); + List files = FileListing.getFileListing(startingDirectory); + + //print out all file names, in the the order of File.compareTo() + for (File file : files) { + System.out.println(file); + } + } + + /** + * Recursively walk a directory tree and return a List of all + * Files found; the List is sorted using File.compareTo(). + * + * @param aStartingDir is a valid directory, which can be read. + */ + public static List getFileListing(File aStartingDir) + throws FileNotFoundException { + validateDirectory(aStartingDir); + List result = getFileListingNoSort(aStartingDir); + Collections.sort(result); + return result; + } + + private static List getFileListingNoSort(File aStartingDir) + throws FileNotFoundException { + List result = new ArrayList(); + File[] filesAndDirs = aStartingDir.listFiles(); + List filesDirs = Arrays.asList(filesAndDirs); + for (File file : filesDirs) { + result.add(file); //always add, even if directory + if (!file.isFile()) { + //must be a directory + //recursive call! + List deeperList = getFileListingNoSort(file); + result.addAll(deeperList); + } + } + return result; + } + + /** + * Directory is valid if it exists, does not represent a file, and can be read. + */ + private static void validateDirectory(File aDirectory) + throws FileNotFoundException { + if (aDirectory == null) { + throw new IllegalArgumentException("Directory should not be null."); + } + if (!aDirectory.exists()) { + throw new FileNotFoundException("Directory does not exist: " + + aDirectory); + } + if (!aDirectory.isDirectory()) { + throw new IllegalArgumentException("Is not a directory: " + aDirectory); + } + if (!aDirectory.canRead()) { + throw new IllegalArgumentException("Directory cannot be read: " + + aDirectory); + } + } + + /** + * Recursively delete a directory and all its children. + * @param dir is a valid directory. + */ + public static void recursiveDeleteDir(File dir) throws IOException { + if (!dir.exists()) { + throw new FileNotFoundException(dir.toString() + " does not exist"); + } + + if (dir.isDirectory()) { + // recursively descend into all children and delete them. + File [] children = dir.listFiles(); + for (File child : children) { + recursiveDeleteDir(child); + } + } + + if (!dir.delete()) { + throw new IOException("Could not remove: " + dir); + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/ImportException.java b/src/java/org/apache/sqoop/util/ImportException.java new file mode 100644 index 00000000..d0fdbb35 --- /dev/null +++ b/src/java/org/apache/sqoop/util/ImportException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +/** + * General error during the import process. + */ +@SuppressWarnings("serial") +public class ImportException extends Exception { + + public ImportException() { + super("ImportException"); + } + + public ImportException(final String message) { + super(message); + } + + public ImportException(final Throwable cause) { + super(cause); + } + + public ImportException(final String message, final Throwable cause) { + super(message, cause); + } + + @Override + public String toString() { + String msg = getMessage(); + return (null == msg) ? "ImportException" : msg; + } + +} diff --git a/src/java/org/apache/sqoop/util/Jars.java b/src/java/org/apache/sqoop/util/Jars.java new file mode 100644 index 00000000..476d59a7 --- /dev/null +++ b/src/java/org/apache/sqoop/util/Jars.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.Enumeration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.manager.ConnManager; + +/** + * Utility class; returns the locations of various jars. + */ +public final class Jars { + + public static final Log LOG = LogFactory.getLog( + Jars.class.getName()); + + private Jars() { + } + + /** + * @return the path to the main Sqoop jar. + */ + public static String getSqoopJarPath() { + return getJarPathForClass(Jars.class); + } + + /** + * Return the jar file path that contains a particular class. + * Method mostly cloned from o.a.h.mapred.JobConf.findContainingJar(). + */ + public static String getJarPathForClass(Class classObj) { + ClassLoader loader = classObj.getClassLoader(); + String classFile = classObj.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(classFile); + itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + /** + * Return the path to the jar containing the JDBC driver + * for a ConnManager. + */ + public static String getDriverClassJar(ConnManager mgr) { + if (null == mgr) { + return null; + } + + String driverClassName = mgr.getDriverClass(); + if (null == driverClassName) { + return null; + } + + try { + Class driverClass = Class.forName(driverClassName); + return getJarPathForClass(driverClass); + } catch (ClassNotFoundException cnfe) { + LOG.warn("No such class " + driverClassName + " available."); + return null; + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/JdbcUrl.java b/src/java/org/apache/sqoop/util/JdbcUrl.java new file mode 100644 index 00000000..dd17ad5d --- /dev/null +++ b/src/java/org/apache/sqoop/util/JdbcUrl.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Some utilities for parsing JDBC URLs which may not be tolerated + * by Java's java.net.URL class. + * java.net.URL does not support multi:part:scheme:// components, which + * virtually all JDBC connect string URLs have. + */ +public final class JdbcUrl { + + public static final Log LOG = LogFactory.getLog(JdbcUrl.class.getName()); + + private JdbcUrl() { + } + + /** + * @return the database name from the connect string, which is typically the + * 'path' component, or null if we can't. + */ + public static String getDatabaseName(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // couldn't find one? try our best here. + sanitizedString = "http://" + connectString; + LOG.warn("Could not find database access scheme in connect string " + + connectString); + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + String databaseName = connectUrl.getPath(); + if (null == databaseName) { + return null; + } + + // This is taken from a 'path' part of a URL, which may have leading '/' + // characters; trim them off. + while (databaseName.startsWith("/")) { + databaseName = databaseName.substring(1); + } + + return databaseName; + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return null; + } + } + + /** + * @return the hostname from the connect string, or null if we can't. + */ + public static String getHostName(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // Couldn't find one? ok, then there's no problem, it should work as a + // URL. + sanitizedString = connectString; + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + return connectUrl.getHost(); + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return null; + } + } + + /** + * @return the port from the connect string, or -1 if we can't. + */ + public static int getPort(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // Couldn't find one? ok, then there's no problem, it should work as a + // URL. + sanitizedString = connectString; + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + return connectUrl.getPort(); + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return -1; + } + } + +} diff --git a/src/java/org/apache/sqoop/util/LoggingAsyncSink.java b/src/java/org/apache/sqoop/util/LoggingAsyncSink.java new file mode 100644 index 00000000..5f20539a --- /dev/null +++ b/src/java/org/apache/sqoop/util/LoggingAsyncSink.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.util.AsyncSink; + +/** + * An AsyncSink that takes the contents of a stream and writes + * it to log4j. + */ +public class LoggingAsyncSink extends AsyncSink { + + public static final Log LOG = LogFactory.getLog( + LoggingAsyncSink.class.getName()); + + private Log contextLog; + + public LoggingAsyncSink(final Log context) { + if (null == context) { + this.contextLog = LOG; + } else { + this.contextLog = context; + } + } + + private Thread child; + + public void processStream(InputStream is) { + child = new LoggingThread(is); + child.start(); + } + + public int join() throws InterruptedException { + child.join(); + return 0; // always successful. + } + + /** + * Run a background thread that copies the contents of the stream + * to the output context log. + */ + private class LoggingThread extends Thread { + + private InputStream stream; + + LoggingThread(final InputStream is) { + this.stream = is; + } + + public void run() { + InputStreamReader isr = new InputStreamReader(this.stream); + BufferedReader r = new BufferedReader(isr); + + try { + while (true) { + String line = r.readLine(); + if (null == line) { + break; // stream was closed by remote end. + } + + LoggingAsyncSink.this.contextLog.info(line); + } + } catch (IOException ioe) { + LOG.error("IOException reading from stream: " + ioe.toString()); + } + + try { + r.close(); + } catch (IOException ioe) { + LOG.warn("Error closing stream in LoggingAsyncSink: " + ioe.toString()); + } + } + } + +} diff --git a/src/java/org/apache/sqoop/util/LoggingUtils.java b/src/java/org/apache/sqoop/util/LoggingUtils.java new file mode 100644 index 00000000..cff66e51 --- /dev/null +++ b/src/java/org/apache/sqoop/util/LoggingUtils.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.sql.SQLException; + +import org.apache.commons.logging.Log; + +/** + * A helper class for logging. + */ +public final class LoggingUtils { + + private LoggingUtils() { } + + /** + * Log every exception in the chain if + * the exception is a chain of exceptions. + */ + public static void logAll(Log log, SQLException e) { + log.error("Top level exception: ", e); + e = e.getNextException(); + int indx = 1; + while (e != null) { + log.error("Chained exception " + indx + ": ", e); + e = e.getNextException(); + indx++; + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/NullAsyncSink.java b/src/java/org/apache/sqoop/util/NullAsyncSink.java new file mode 100644 index 00000000..a42e4e95 --- /dev/null +++ b/src/java/org/apache/sqoop/util/NullAsyncSink.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.util.AsyncSink; + +/** + * An AsyncSink that takes the contents of a stream and ignores it. + */ +public class NullAsyncSink extends AsyncSink { + + public static final Log LOG = LogFactory.getLog( + NullAsyncSink.class.getName()); + + private Thread child; + + public void processStream(InputStream is) { + child = new IgnoringThread(is); + child.start(); + } + + public int join() throws InterruptedException { + child.join(); + return 0; // always successful. + } + + /** + * Run a background thread that reads and ignores the + * contents of the stream. + */ + private static class IgnoringThread extends Thread { + + private InputStream stream; + + IgnoringThread(final InputStream is) { + this.stream = is; + } + + public void run() { + InputStreamReader isr = new InputStreamReader(this.stream); + BufferedReader r = new BufferedReader(isr); + + try { + while (true) { + String line = r.readLine(); + if (null == line) { + break; // stream was closed by remote end. + } + } + } catch (IOException ioe) { + LOG.warn("IOException reading from (ignored) stream: " + + ioe.toString()); + } + + try { + r.close(); + } catch (IOException ioe) { + LOG.warn("Error closing stream in NullAsyncSink: " + ioe.toString()); + } + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/OptionsFileUtil.java b/src/java/org/apache/sqoop/util/OptionsFileUtil.java new file mode 100644 index 00000000..c476e005 --- /dev/null +++ b/src/java/org/apache/sqoop/util/OptionsFileUtil.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.cloudera.sqoop.Sqoop; + +/** + * Provides utility functions to read in options file. An options file is a + * regular text file with each line specifying a separate option. An option + * may continue into a following line by using a back-slash separator character + * at the end of the non-terminating line. Options file also allow empty lines + * and comment lines which are disregarded. Comment lines must begin with the + * hash character as the first character. Leading and trailing white-spaces are + * ignored for any options read from the Options file. + */ +public final class OptionsFileUtil { + + public static final Log LOG = LogFactory.getLog( + OptionsFileUtil.class.getName()); + + private OptionsFileUtil() { } + + /** + * Expands any options file that may be present in the given set of arguments. + * + * @param args the given arguments + * @return a new string array that contains the expanded arguments. + * @throws Exception + */ + public static String[] expandArguments(String[] args) throws Exception { + List options = new ArrayList(); + + for (int i = 0; i < args.length; i++) { + if (args[i].equals(Sqoop.SQOOP_OPTIONS_FILE_SPECIFIER)) { + if (i == args.length - 1) { + throw new Exception("Missing options file"); + } + + String fileName = args[++i]; + File optionsFile = new File(fileName); + BufferedReader reader = null; + StringBuilder buffer = new StringBuilder(); + try { + reader = new BufferedReader(new FileReader(optionsFile)); + String nextLine = null; + while ((nextLine = reader.readLine()) != null) { + nextLine = nextLine.trim(); + if (nextLine.length() == 0 || nextLine.startsWith("#")) { + // empty line or comment + continue; + } + + buffer.append(nextLine); + if (nextLine.endsWith("\\")) { + if (buffer.charAt(0) == '\'' || buffer.charAt(0) == '"') { + throw new Exception( + "Multiline quoted strings not supported in file(" + + fileName + "): " + buffer.toString()); + } + // Remove the trailing back-slash and continue + buffer.deleteCharAt(buffer.length() - 1); + } else { + // The buffer contains a full option + options.add( + removeQuotesEncolosingOption(fileName, buffer.toString())); + buffer.delete(0, buffer.length()); + } + } + + // Assert that the buffer is empty + if (buffer.length() != 0) { + throw new Exception("Malformed option in options file(" + + fileName + "): " + buffer.toString()); + } + } catch (IOException ex) { + throw new Exception("Unable to read options file: " + fileName, ex); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException ex) { + LOG.info("Exception while closing reader", ex); + } + } + } + } else { + // Regular option. Parse it and put it on the appropriate list + options.add(args[i]); + } + } + + return options.toArray(new String[options.size()]); + } + + /** + * Removes the surrounding quote characters as needed. It first attempts to + * remove surrounding double quotes. If successful, the resultant string is + * returned. If no surrounding double quotes are found, it attempts to remove + * surrounding single quote characters. If successful, the resultant string + * is returned. If not the original string is returnred. + * @param fileName + * @param option + * @return + * @throws Exception + */ + private static String removeQuotesEncolosingOption( + String fileName, String option) throws Exception { + + // Attempt to remove double quotes. If successful, return. + String option1 = removeQuoteCharactersIfNecessary(fileName, option, '"'); + if (!option1.equals(option)) { + // Quotes were successfully removed + return option1; + } + + // Attempt to remove single quotes. + return removeQuoteCharactersIfNecessary(fileName, option, '\''); + } + + /** + * Removes the surrounding quote characters from the given string. The quotes + * are identified by the quote parameter, the given string by option. The + * fileName parameter is used for raising exceptions with relevant message. + * @param fileName + * @param option + * @param quote + * @return + * @throws Exception + */ + private static String removeQuoteCharactersIfNecessary(String fileName, + String option, char quote) throws Exception { + boolean startingQuote = (option.charAt(0) == quote); + boolean endingQuote = (option.charAt(option.length() - 1) == quote); + + if (startingQuote && endingQuote) { + if (option.length() == 1) { + throw new Exception("Malformed option in options file(" + + fileName + "): " + option); + } + return option.substring(1, option.length() - 1); + } + + if (startingQuote || endingQuote) { + throw new Exception("Malformed option in options file(" + + fileName + "): " + option); + } + + return option; + } + +} diff --git a/src/java/org/apache/sqoop/util/PerfCounters.java b/src/java/org/apache/sqoop/util/PerfCounters.java new file mode 100644 index 00000000..648417e8 --- /dev/null +++ b/src/java/org/apache/sqoop/util/PerfCounters.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.text.NumberFormat; + +/** + * A quick set of performance counters for reporting import speed. + */ +public class PerfCounters { + + private long bytes; + private long nanoseconds; + + private long startTime; + + public PerfCounters() { + } + + public void addBytes(long more) { + bytes += more; + } + + public void startClock() { + startTime = System.nanoTime(); + } + + public void stopClock() { + nanoseconds = System.nanoTime() - startTime; + } + + private static final double ONE_BILLION = 1000.0 * 1000.0 * 1000.0; + + /** Maximum number of digits after the decimal place. */ + private static final int MAX_PLACES = 4; + + /** + * @return A value in nanoseconds scaled to report in seconds + */ + private Double inSeconds(long nanos) { + return (double) nanos / ONE_BILLION; + } + + private static final long ONE_GB = 1024 * 1024 * 1024; + private static final long ONE_MB = 1024 * 1024; + private static final long ONE_KB = 1024; + + + /** + * @return a string of the form "xxxx bytes" or "xxxxx KB" or "xxxx GB", + * scaled as is appropriate for the current value. + */ + private String formatBytes() { + double val; + String scale; + if (bytes > ONE_GB) { + val = (double) bytes / (double) ONE_GB; + scale = "GB"; + } else if (bytes > ONE_MB) { + val = (double) bytes / (double) ONE_MB; + scale = "MB"; + } else if (bytes > ONE_KB) { + val = (double) bytes / (double) ONE_KB; + scale = "KB"; + } else { + val = (double) bytes; + scale = "bytes"; + } + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setMaximumFractionDigits(MAX_PLACES); + return fmt.format(val) + " " + scale; + } + + private String formatTimeInSeconds() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setMaximumFractionDigits(MAX_PLACES); + return fmt.format(inSeconds(this.nanoseconds)) + " seconds"; + } + + /** + * @return a string of the form "xxx bytes/sec" or "xxx KB/sec" scaled as is + * appropriate for the current value. + */ + private String formatSpeed() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setMaximumFractionDigits(MAX_PLACES); + + Double seconds = inSeconds(this.nanoseconds); + + double speed = (double) bytes / seconds; + double val; + String scale; + if (speed > ONE_GB) { + val = speed / (double) ONE_GB; + scale = "GB"; + } else if (speed > ONE_MB) { + val = speed / (double) ONE_MB; + scale = "MB"; + } else if (speed > ONE_KB) { + val = speed / (double) ONE_KB; + scale = "KB"; + } else { + val = speed; + scale = "bytes"; + } + + return fmt.format(val) + " " + scale + "/sec"; + } + + public String toString() { + return formatBytes() + " in " + formatTimeInSeconds() + " (" + + formatSpeed() + ")"; + } + +} + diff --git a/src/java/org/apache/sqoop/util/RandomHash.java b/src/java/org/apache/sqoop/util/RandomHash.java new file mode 100644 index 00000000..25348e60 --- /dev/null +++ b/src/java/org/apache/sqoop/util/RandomHash.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.rmi.server.UID; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Securely generate random MD5 signatures for use as nonce values. + */ +public final class RandomHash { + + private RandomHash() { } + + /** + * Generate a new random md5 hash. + * @return a securely-generated random 16 byte sequence. + */ + public static byte [] generateMD5Bytes() { + try { + MessageDigest digester = MessageDigest.getInstance("MD5"); + long time = System.currentTimeMillis(); + digester.update((new UID() + "@" + time).getBytes()); + return digester.digest(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + /** + * Generate a new random md5 hash and convert it to a string. + * @return a securely-generated random string. + */ + public static String generateMD5String() { + byte [] bytes = generateMD5Bytes(); + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + int x = ((int) b) & 0xFF; + sb.append(String.format("%02x", x)); + } + return sb.toString(); + } +} + diff --git a/src/java/org/apache/sqoop/util/ResultSetPrinter.java b/src/java/org/apache/sqoop/util/ResultSetPrinter.java new file mode 100644 index 00000000..c6ec09df --- /dev/null +++ b/src/java/org/apache/sqoop/util/ResultSetPrinter.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +/** + * Utility methods to format and print ResultSet objects. + */ +public class ResultSetPrinter { + + public static final Log LOG = LogFactory.getLog( + ResultSetPrinter.class.getName()); + + // max output width to allocate to any column of the printed results. + private static final int MAX_COL_WIDTH = 20; + + /** + * Print 'str' to the string builder, padded to 'width' chars. + */ + private static void printPadded(StringBuilder sb, String str, int width) { + int numPad; + if (null == str) { + sb.append("(null)"); + numPad = width - "(null)".length(); + } else { + sb.append(str); + numPad = width - str.length(); + } + + for (int i = 0; i < numPad; i++) { + sb.append(' '); + } + } + + private static final String COL_SEPARATOR = " | "; + private static final String LEFT_BORDER = "| "; + + /** + * Format the contents of the ResultSet into something that could be printed + * neatly; the results are appended to the supplied StringBuilder. + */ + public final void printResultSet(PrintWriter pw, ResultSet results) + throws IOException { + try { + StringBuilder sbNames = new StringBuilder(); + int cols = results.getMetaData().getColumnCount(); + + int [] colWidths = new int[cols]; + ResultSetMetaData metadata = results.getMetaData(); + sbNames.append(LEFT_BORDER); + for (int i = 1; i < cols + 1; i++) { + String colName = metadata.getColumnName(i); + colWidths[i - 1] = Math.min(metadata.getColumnDisplaySize(i), + MAX_COL_WIDTH); + if (colName == null || colName.equals("")) { + colName = metadata.getColumnLabel(i) + "*"; + } + printPadded(sbNames, colName, colWidths[i - 1]); + sbNames.append(COL_SEPARATOR); + } + sbNames.append('\n'); + + StringBuilder sbPad = new StringBuilder(); + for (int i = 0; i < cols; i++) { + for (int j = 0; j < COL_SEPARATOR.length() + colWidths[i]; j++) { + sbPad.append('-'); + } + } + sbPad.append('-'); + sbPad.append('\n'); + + pw.print(sbPad.toString()); + pw.print(sbNames.toString()); + pw.print(sbPad.toString()); + + while (results.next()) { + StringBuilder sb = new StringBuilder(); + sb.append(LEFT_BORDER); + for (int i = 1; i < cols + 1; i++) { + printPadded(sb, results.getString(i), colWidths[i - 1]); + sb.append(COL_SEPARATOR); + } + sb.append('\n'); + pw.print(sb.toString()); + } + + pw.print(sbPad.toString()); + } catch (SQLException sqlException) { + LOG.error("Error reading from database: " + + StringUtils.stringifyException(sqlException)); + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/StoredAsProperty.java b/src/java/org/apache/sqoop/util/StoredAsProperty.java new file mode 100644 index 00000000..07901ac3 --- /dev/null +++ b/src/java/org/apache/sqoop/util/StoredAsProperty.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Used by SqoopOptions to denote that a field is stored in a particular + * named property when reifying the object's state to permanent storage. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface StoredAsProperty { + String value(); +} + diff --git a/src/java/org/apache/sqoop/util/SubprocessSecurityManager.java b/src/java/org/apache/sqoop/util/SubprocessSecurityManager.java new file mode 100644 index 00000000..4951627a --- /dev/null +++ b/src/java/org/apache/sqoop/util/SubprocessSecurityManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.security.Permission; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A SecurityManager used to run subprocesses and disallow certain actions. + * + * This specifically disallows System.exit(). + * + * This SecurityManager will also check with any existing SecurityManager as + * to the validity of any permissions. The SubprocessSecurityManager should be + * installed with the install() method, which will retain a handle to any + * previously-installed SecurityManager instance. + * + * When this SecurityManager is no longer necessary, the uninstall() method + * should be used which reinstates the previous SecurityManager as the active + * SecurityManager. + */ +public class SubprocessSecurityManager extends SecurityManager { + + public static final Log LOG = LogFactory.getLog( + SubprocessSecurityManager.class.getName()); + + private SecurityManager parentSecurityManager; + private boolean installed; + private boolean allowReplacement; + + public SubprocessSecurityManager() { + this.installed = false; + this.allowReplacement = false; + } + + /** + * Install this SecurityManager and retain a reference to any + * previously-installed SecurityManager. + */ + public void install() { + LOG.debug("Installing subprocess security manager"); + this.parentSecurityManager = System.getSecurityManager(); + System.setSecurityManager(this); + this.installed = true; + } + + /** + * Restore an existing SecurityManager, uninstalling this one. + */ + public void uninstall() { + if (this.installed) { + LOG.debug("Uninstalling subprocess security manager"); + this.allowReplacement = true; + System.setSecurityManager(this.parentSecurityManager); + } + } + + @Override + /** + * Disallow the capability to call System.exit() or otherwise + * terminate the JVM. + */ + public void checkExit(int status) { + LOG.debug("Rejecting System.exit call with status=" + status); + throw new ExitSecurityException(status); + } + + @Override + /** + * Check a particular permission. Checks with this SecurityManager + * as well as any previously-installed manager. + * + * @param perm the Permission to check; must not be null. + */ + public void checkPermission(Permission perm) { + if (null != this.parentSecurityManager) { + // Check if the prior SecurityManager would have rejected this. + parentSecurityManager.checkPermission(perm); + } + + if (!allowReplacement && perm.getName().equals("setSecurityManager")) { + throw new SecurityException("Cannot replace security manager"); + } + } + +} + diff --git a/src/java/org/apache/sqoop/util/TaskId.java b/src/java/org/apache/sqoop/util/TaskId.java new file mode 100644 index 00000000..c543754a --- /dev/null +++ b/src/java/org/apache/sqoop/util/TaskId.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.config.ConfigurationConstants; + +/** + * Utility class; returns task attempt Id of the current job + * regardless of Hadoop version being used. + */ +public final class TaskId { + + private TaskId() { } + + /** + * Return the task attempt id as a string. + * @param conf the Configuration to check for the current task attempt id. + * @param defaultVal the value to return if a task attempt id is not set. + * @return the current task attempt id, or the default value if one isn't set. + */ + public static String get(Configuration conf, String defaultVal) { + return conf.get("mapreduce.task.id", + conf.get("mapred.task.id", defaultVal)); + } + + /** + * Return the local filesystem dir where the current task attempt can + * perform work. + * @return a File describing a directory where local temp data for the + * task attempt can be stored. + */ + public static File getLocalWorkPath(Configuration conf) throws IOException { + String tmpDir = conf.get( + ConfigurationConstants.PROP_JOB_LOCAL_DIRECTORY, + "/tmp/"); + + // Create a local subdir specific to this task attempt. + String taskAttemptStr = TaskId.get(conf, "task_attempt"); + File taskAttemptDir = new File(tmpDir, taskAttemptStr); + if (!taskAttemptDir.exists()) { + boolean createdDir = taskAttemptDir.mkdirs(); + if (!createdDir) { + throw new IOException("Could not create missing task attempt dir: " + + taskAttemptDir.toString()); + } + } + + return taskAttemptDir; + } + +} diff --git a/src/test/checkstyle-java-header.txt b/src/test/checkstyle-java-header.txt index 23dfc065..5d5f1e35 100644 --- a/src/test/checkstyle-java-header.txt +++ b/src/test/checkstyle-java-header.txt @@ -1,6 +1,4 @@ /** - * Copyright 2011 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information