diff --git a/build.xml b/build.xml index 99f4f575..49eed772 100644 --- a/build.xml +++ b/build.xml @@ -70,6 +70,12 @@ to call at top-level: ant deploy-contrib compile-core-test + + + diff --git a/src/java/org/apache/hadoop/sqoop/ImportOptions.java b/src/java/org/apache/hadoop/sqoop/ImportOptions.java index 23017d25..b88b3e81 100644 --- a/src/java/org/apache/hadoop/sqoop/ImportOptions.java +++ b/src/java/org/apache/hadoop/sqoop/ImportOptions.java @@ -93,8 +93,9 @@ public enum FileLayout { private String warehouseDir; private FileLayout layout; private boolean local; // if true and conn is mysql, use mysqldump. - private String tmpDir; // where temp data goes; usually /tmp + private String hiveHome; + private boolean hiveImport; private static final String DEFAULT_CONFIG_FILE = "sqoop.properties"; @@ -138,11 +139,17 @@ private void loadFromProperties() { this.orderByCol = props.getProperty("db.sort.column", this.orderByCol); this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName); this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir); + this.hiveHome = props.getProperty("hive.home", this.hiveHome); String localImport = props.getProperty("local.import", Boolean.toString(this.local)).toLowerCase(); this.local = "true".equals(localImport) || "yes".equals(localImport) || "1".equals(localImport); + + String hiveImportStr = props.getProperty("hive.import", + Boolean.toString(this.hiveImport)).toLowerCase(); + this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr) + || "1".equals(hiveImportStr); } catch (IOException ioe) { LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString()); } finally { @@ -156,11 +163,25 @@ private void loadFromProperties() { } } + /** + * @return the temp directory to use; this is guaranteed to end with + * the file separator character (e.g., '/') + */ + public String getTempDir() { + return this.tmpDir; + } + private void initDefaults() { // first, set the true defaults if nothing else happens. // default action is to run the full pipeline. this.action = ControlAction.FullImport; this.hadoopHome = System.getenv("HADOOP_HOME"); + + // Set this with $HIVE_HOME, but -Dhive.home can override. + this.hiveHome = System.getenv("HIVE_HOME"); + this.hiveHome = System.getProperty("hive.home", this.hiveHome); + + // Set this to cwd, but -Dsqoop.src.dir can override. this.codeOutputDir = System.getProperty("sqoop.src.dir", "."); String myTmpDir = System.getProperty("test.build.data", "/tmp/"); @@ -193,11 +214,13 @@ public static void printUsage() { System.out.println("--columns (col,col,col...) Columns to export from table"); System.out.println("--order-by (column-name) Column of the table used to order results"); System.out.println("--hadoop-home (dir) Override $HADOOP_HOME"); + System.out.println("--hive-home (dir) Override $HIVE_HOME"); System.out.println("--warehouse-dir (dir) HDFS path for table destination"); System.out.println("--as-sequencefile Imports data to SequenceFiles"); System.out.println("--as-textfile Imports data as plain text (default)"); System.out.println("--all-tables Import all tables in database"); System.out.println(" (Ignores --table, --columns and --order-by)"); + System.out.println("--hive-import If set, then import the table into Hive"); System.out.println(""); System.out.println("Code generation options:"); System.out.println("--outdir (dir) Output directory for generated code"); @@ -254,6 +277,10 @@ public void parse(String [] args) throws InvalidOptionsException { this.password = args[++i]; } else if (args[i].equals("--hadoop-home")) { this.hadoopHome = args[++i]; + } else if (args[i].equals("--hive-home")) { + this.hiveHome = args[++i]; + } else if (args[i].equals("--hive-import")) { + this.hiveImport = true; } else if (args[i].equals("--outdir")) { this.codeOutputDir = args[++i]; } else if (args[i].equals("--as-sequencefile")) { @@ -358,6 +385,15 @@ public boolean isLocal() { return local; } + public String getHiveHome() { + return hiveHome; + } + + /** @return true if we should import the table into Hive */ + public boolean doHiveImport() { + return hiveImport; + } + /** * @return location where .java files go; guaranteed to end with '/' */ diff --git a/src/java/org/apache/hadoop/sqoop/Sqoop.java b/src/java/org/apache/hadoop/sqoop/Sqoop.java index bf548ba2..a4a49639 100644 --- a/src/java/org/apache/hadoop/sqoop/Sqoop.java +++ b/src/java/org/apache/hadoop/sqoop/Sqoop.java @@ -26,6 +26,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.sqoop.hive.HiveImport; import org.apache.hadoop.sqoop.manager.ConnManager; import org.apache.hadoop.sqoop.orm.ClassWriter; import org.apache.hadoop.sqoop.orm.CompilationManager; @@ -42,6 +43,7 @@ public class Sqoop extends Configured implements Tool { private ImportOptions options; private ConnManager manager; + private HiveImport hiveImport; public Sqoop() { } @@ -69,12 +71,18 @@ private void importTable(String tableName) throws IOException, ImportError { String jarFile = null; // Generate the ORM code for the tables. - // TODO(aaron): Allow this to be bypassed if the user has already generated code + // TODO(aaron): Allow this to be bypassed if the user has already generated code, + // or if they're using a non-MapReduce import method (e.g., mysqldump). jarFile = generateORM(tableName); if (options.getAction() == ImportOptions.ControlAction.FullImport) { // Proceed onward to do the import. manager.importTable(tableName, jarFile, getConf()); + + // If the user wants this table to be in Hive, perform that post-load. + if (options.doHiveImport()) { + hiveImport.importTable(tableName); + } } } @@ -101,6 +109,10 @@ public int run(String [] args) { return 1; } + if (options.doHiveImport()) { + hiveImport = new HiveImport(options, manager, getConf()); + } + ImportOptions.ControlAction action = options.getAction(); if (action == ImportOptions.ControlAction.ListTables) { String [] tables = manager.listTables(); diff --git a/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java b/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java new file mode 100644 index 00000000..298bc19d --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.hive; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.util.Executor; +import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory; + +/** + * Utility to import a table into the Hive metastore. Manages the connection + * to Hive itself as well as orchestrating the use of the other classes in this + * package. + */ +public class HiveImport { + + public static final Log LOG = LogFactory.getLog(HiveImport.class.getName()); + + private ImportOptions options; + private ConnManager connManager; + private Configuration configuration; + + public HiveImport(final ImportOptions opts, final ConnManager connMgr, final Configuration conf) { + this.options = opts; + this.connManager = connMgr; + this.configuration = conf; + } + + + /** + * @return the filename of the hive executable to run to do the import + */ + private String getHiveBinPath() { + // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it + // exists. + // Fall back to just plain 'hive' and hope it's in the path. + + String hiveHome = options.getHiveHome(); + if (null == hiveHome) { + return "hive"; + } + + Path p = new Path(hiveHome); + p = new Path(p, "bin"); + p = new Path(p, "hive"); + String hiveBinStr = p.toString(); + if (new File(hiveBinStr).exists()) { + return hiveBinStr; + } else { + return "hive"; + } + } + + /** + * If we used a MapReduce-based upload of the data, remove the _logs dir + * from where we put it, before running Hive LOAD DATA INPATH + */ + private void removeTempLogs(String tableName) throws IOException { + FileSystem fs = FileSystem.get(configuration); + String warehouseDir = options.getWarehouseDir(); + Path tablePath; + if (warehouseDir != null) { + tablePath = new Path(new Path(warehouseDir), tableName); + } else { + tablePath = new Path(tableName); + } + + Path logsPath = new Path(tablePath, "_logs"); + if (fs.exists(logsPath)) { + LOG.info("Removing temporary files from import process: " + logsPath); + if (!fs.delete(logsPath, true)) { + LOG.warn("Could not delete temporary files; continuing with import, but it may fail."); + } + } + } + + public void importTable(String tableName) throws IOException { + removeTempLogs(tableName); + + LOG.info("Loading uploaded data into Hive"); + + // For testing purposes against our mock hive implementation, + // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT + // environment variable for the child hive process. We also disable + // timestamp comments so that we have deterministic table creation scripts. + String expectedScript = System.getProperty("expected.script"); + List env = Executor.getCurEnvpStrings(); + boolean debugMode = expectedScript != null; + if (debugMode) { + env.add("EXPECTED_SCRIPT=" + expectedScript); + env.add("TMPDIR=" + options.getTempDir()); + } + + // generate the HQL statements to run. + TableDefWriter tableWriter = new TableDefWriter(options, connManager, tableName, + configuration, !debugMode); + String createTableStr = tableWriter.getCreateTableStmt() + ";\n"; + String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n"; + + // write them to a script file. + File tempFile = File.createTempFile("hive-script-",".txt", new File(options.getTempDir())); + try { + String tmpFilename = tempFile.toString(); + BufferedWriter w = null; + try { + FileOutputStream fos = new FileOutputStream(tempFile); + w = new BufferedWriter(new OutputStreamWriter(fos)); + w.write(createTableStr, 0, createTableStr.length()); + w.write(loadDataStmtStr, 0, loadDataStmtStr.length()); + } catch (IOException ioe) { + LOG.error("Error writing Hive load-in script: " + ioe.toString()); + ioe.printStackTrace(); + throw ioe; + } finally { + if (null != w) { + try { + w.close(); + } catch (IOException ioe) { + LOG.warn("IOException closing stream to Hive script: " + ioe.toString()); + } + } + } + + // run Hive on the script and note the return code. + String hiveExec = getHiveBinPath(); + ArrayList args = new ArrayList(); + args.add(hiveExec); + args.add("-f"); + args.add(tmpFilename); + + LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG); + int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf); + if (0 != ret) { + throw new IOException("Hive exited with status " + ret); + } + + LOG.info("Hive import complete."); + } finally { + if (!tempFile.delete()) { + LOG.warn("Could not remove temporary file: " + tempFile.toString()); + // try to delete the file later. + tempFile.deleteOnExit(); + } + } + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java new file mode 100644 index 00000000..59c6fda6 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.sql.Types; + +/** + * Defines conversion between SQL types and Hive types. + */ +public class HiveTypes { + + public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName()); + + /** + * Given JDBC SQL types coming from another database, what is the best + * mapping to a Hive-specific type? + */ + public static String toHiveType(int sqlType) { + if (sqlType == Types.INTEGER) { + return "INT"; + } else if (sqlType == Types.VARCHAR) { + return "STRING"; + } else if (sqlType == Types.CHAR) { + return "STRING"; + } else if (sqlType == Types.LONGVARCHAR) { + return "STRING"; + } else if (sqlType == Types.NUMERIC) { + // Per suggestion on hive-user, this is converted to DOUBLE for now. + return "DOUBLE"; + } else if (sqlType == Types.DECIMAL) { + // Per suggestion on hive-user, this is converted to DOUBLE for now. + return "DOUBLE"; + } else if (sqlType == Types.BIT) { + return "BOOLEAN"; + } else if (sqlType == Types.BOOLEAN) { + return "BOOLEAN"; + } else if (sqlType == Types.TINYINT) { + return "TINYINT"; + } else if (sqlType == Types.SMALLINT) { + return "INTEGER"; + } else if (sqlType == Types.BIGINT) { + return "BIGINT"; + } else if (sqlType == Types.REAL) { + return "DOUBLE"; + } else if (sqlType == Types.FLOAT) { + return "DOUBLE"; + } else if (sqlType == Types.DOUBLE) { + return "DOUBLE"; + } else if (sqlType == Types.DATE) { + // unfortunate type coercion + return "STRING"; + } else if (sqlType == Types.TIME) { + // unfortunate type coercion + return "STRING"; + } else if (sqlType == Types.TIMESTAMP) { + // unfortunate type coercion + return "STRING"; + } else { + // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, + // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT. + return null; + } + } + + /** + * @return true if a sql type can't be translated to a precise match + * in Hive, and we have to cast it to something more generic. + */ + public static boolean isHiveTypeImprovised(int sqlType) { + return sqlType == Types.DATE || sqlType == Types.TIME + || sqlType == Types.TIMESTAMP + || sqlType == Types.DECIMAL + || sqlType == Types.NUMERIC; + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java b/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java new file mode 100644 index 00000000..9798e171 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.hive.HiveTypes; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Date; +import java.text.DateFormat; +import java.text.SimpleDateFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Creates (Hive-specific) SQL DDL statements to create tables to hold data + * we're importing from another source. + * + * After we import the database into HDFS, we can inject it into Hive using + * the CREATE TABLE and LOAD DATA INPATH statements generated by this object. + */ +public class TableDefWriter { + + public static final Log LOG = LogFactory.getLog(TableDefWriter.class.getName()); + + private ImportOptions options; + private ConnManager connManager; + private Configuration configuration; + private String tableName; + private boolean commentsEnabled; + + /** + * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement. + * @param opts program-wide options + * @param connMgr the connection manager used to describe the table. + * @param table the name of the table to read. + * @param config the Hadoop configuration to use to connect to the dfs + * @param withComments if true, then tables will be created with a + * timestamp comment. + */ + public TableDefWriter(final ImportOptions opts, final ConnManager connMgr, + final String table, final Configuration config, final boolean withComments) { + this.options = opts; + this.connManager = connMgr; + this.tableName = table; + this.configuration = config; + this.commentsEnabled = withComments; + } + + /** + * @return the CREATE TABLE statement for the table to load into hive. + */ + public String getCreateTableStmt() throws IOException { + Map columnTypes = connManager.getColumnTypes(tableName); + + String [] colNames = options.getColumns(); + if (null == colNames) { + colNames = connManager.getColumnNames(tableName); + } + + StringBuilder sb = new StringBuilder(); + + sb.append("CREATE TABLE " + tableName + " ( "); + + boolean first = true; + for (String col : colNames) { + if (!first) { + sb.append(", "); + } + + first = false; + + Integer colType = columnTypes.get(col); + String hiveColType = HiveTypes.toHiveType(colType); + if (null == hiveColType) { + throw new IOException("Hive does not support the SQL type for column " + col); + } + + sb.append(col + " " + hiveColType); + + if (HiveTypes.isHiveTypeImprovised(colType)) { + LOG.warn("Column " + col + " had to be cast to a less precise type in Hive"); + } + } + + sb.append(") "); + + if (commentsEnabled) { + DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + String curDateStr = dateFormat.format(new Date()); + sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' "); + } + + sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' "); + sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE"); + + LOG.debug("Create statement: " + sb.toString()); + return sb.toString(); + } + + private static final int DEFAULT_HDFS_PORT = + org.apache.hadoop.hdfs.server.namenode.NameNode.DEFAULT_PORT; + + /** + * @return the LOAD DATA statement to import the data in HDFS into hive + */ + public String getLoadDataStmt() throws IOException { + String warehouseDir = options.getWarehouseDir(); + if (null == warehouseDir) { + warehouseDir = ""; + } else if (!warehouseDir.endsWith(File.separator)) { + warehouseDir = warehouseDir + File.separator; + } + + String tablePath = warehouseDir + tableName; + FileSystem fs = FileSystem.get(configuration); + Path finalPath = new Path(tablePath).makeQualified(fs); + String finalPathStr = finalPath.toString(); + if (finalPathStr.startsWith("hdfs://") && finalPathStr.indexOf(":", 7) == -1) { + // Hadoop removed the port number from the fully-qualified URL. + // We need to reinsert this or else Hive will complain. + // Do this right before the third instance of the '/' character. + int insertPoint = 0; + for (int i = 0; i < 3; i++) { + insertPoint = finalPathStr.indexOf("/", insertPoint + 1); + } + + if (insertPoint == -1) { + LOG.warn("Fully-qualified HDFS path does not contain a port."); + LOG.warn("this may cause a Hive error."); + } else { + finalPathStr = finalPathStr.substring(0, insertPoint) + ":" + DEFAULT_HDFS_PORT + + finalPathStr.substring(insertPoint, finalPathStr.length()); + } + } + + StringBuilder sb = new StringBuilder(); + sb.append("LOAD DATA INPATH '"); + sb.append(finalPathStr); + sb.append("' INTO TABLE "); + sb.append(tableName); + + LOG.debug("Load statement: " + sb.toString()); + return sb.toString(); + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java b/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java index ad82c1ed..1cca2057 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java @@ -75,13 +75,6 @@ public interface ConnManager { */ Connection getConnection() throws SQLException; - /** - * Resolve a database-specific type to the Java type that should contain it. - * @param sqlType - * @return the name of a Java type to hold the sql datatype, or null if none. - */ - String toJavaType(int sqlType); - /** * @return a string identifying the driver class to load for this JDBC connection type. */ diff --git a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java index eb4cbee1..6d1d0fdf 100644 --- a/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java @@ -261,7 +261,12 @@ protected ResultSet execute(String stmt, Object... args) { // Or must statement.close() be called too? } - public String toJavaType(int sqlType) { + /** + * Resolve a database-specific type to the Java type that should contain it. + * @param sqlType + * @return the name of a Java type to hold the sql datatype, or null if none. + */ + public static String toJavaType(int sqlType) { // mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html if (sqlType == Types.INTEGER) { return "Integer"; diff --git a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java index 83ea9f01..3cc45a6a 100644 --- a/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java @@ -20,6 +20,7 @@ import org.apache.hadoop.sqoop.ImportOptions; import org.apache.hadoop.sqoop.manager.ConnManager; +import org.apache.hadoop.sqoop.manager.SqlManager; import org.apache.hadoop.sqoop.lib.BigDecimalSerializer; import org.apache.hadoop.sqoop.lib.JdbcWritableBridge; @@ -247,7 +248,7 @@ private void generateFields(Map columnTypes, String [] colNames for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("Cannot resolve SQL type " + sqlType); continue; @@ -277,7 +278,7 @@ private void generateDbRead(Map columnTypes, String [] colNames fieldNum++; int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -314,7 +315,7 @@ private void generateDbWrite(Map columnTypes, String [] colName fieldNum++; int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -347,7 +348,7 @@ private void generateHadoopRead(Map columnTypes, String [] colN for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -380,7 +381,7 @@ private void generateToString(Map columnTypes, String [] colNam boolean first = true; for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; @@ -420,7 +421,7 @@ private void generateHadoopWrite(Map columnTypes, String [] col for (String col : colNames) { int sqlType = columnTypes.get(col); - String javaType = connManager.toJavaType(sqlType); + String javaType = SqlManager.toJavaType(sqlType); if (null == javaType) { LOG.error("No Java type for SQL type " + sqlType); continue; diff --git a/src/java/org/apache/hadoop/sqoop/util/Executor.java b/src/java/org/apache/hadoop/sqoop/util/Executor.java new file mode 100644 index 00000000..911366e4 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/util/Executor.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Runs a process via Runtime.exec() and allows handling of stdout/stderr to be + * deferred to other threads. + * + */ +public final class Executor { + + public static final Log LOG = LogFactory.getLog(Executor.class.getName()); + + private Executor() { + } + + /** + * Execute a program defined by the args array with default stream handlers + * that consume the program's output (to prevent it from blocking on buffers) + * and then ignore said output. + */ + public static int exec(String [] args) throws IOException { + NullStreamHandlerFactory f = new NullStreamHandlerFactory(); + return exec(args, f, f); + } + + /** + * Run a command via Runtime.exec(), with its stdout and stderr streams + * directed to be handled by threads generated by StreamHandlerFactories. + * Block until the child process terminates. + * + * @return the exit status of the ran program + */ + public static int exec(String [] args, StreamHandlerFactory outHandler, + StreamHandlerFactory errHandler) throws IOException { + return exec(args, null, outHandler, errHandler); + } + + + /** + * Run a command via Runtime.exec(), with its stdout and stderr streams + * directed to be handled by threads generated by StreamHandlerFactories. + * Block until the child process terminates. Allows the programmer to + * specify an environment for the child program. + * + * @return the exit status of the ran program + */ + public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler, + StreamHandlerFactory errHandler) throws IOException { + + // launch the process. + Process p = Runtime.getRuntime().exec(args, envp); + + // dispatch its stdout and stderr to stream handlers if available. + if (null != outHandler) { + outHandler.processStream(p.getInputStream()); + } + + if (null != errHandler) { + errHandler.processStream(p.getErrorStream()); + } + + // wait for the return value. + while (true) { + try { + int ret = p.waitFor(); + return ret; + } catch (InterruptedException ie) { + continue; + } + } + } + + + /** + * @return An array formatted correctly for use as an envp based on the + * current environment for this program. + */ + public static List getCurEnvpStrings() { + Map curEnv = System.getenv(); + ArrayList array = new ArrayList(); + + if (null == curEnv) { + return null; + } + + for (Map.Entry entry : curEnv.entrySet()) { + array.add(entry.getKey() + "=" + entry.getValue()); + } + + return array; + } +} diff --git a/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java b/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java new file mode 100644 index 00000000..5836aa37 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A StreamHandlerFactory that takes the contents of a stream and writes + * it to log4j. + * + */ +public class LoggingStreamHandlerFactory implements StreamHandlerFactory { + + public static final Log LOG = LogFactory.getLog(LoggingStreamHandlerFactory.class.getName()); + + private Log contextLog; + + public LoggingStreamHandlerFactory(final Log context) { + if (null == context) { + this.contextLog = LOG; + } else { + this.contextLog = context; + } + } + + public void processStream(InputStream is) { + new LoggingThread(is).start(); + } + + /** + * Run a background thread that copies the contents of the stream + * to the output context log. + */ + private class LoggingThread extends Thread { + + private InputStream stream; + + LoggingThread(final InputStream is) { + this.stream = is; + } + + public void run() { + InputStreamReader isr = new InputStreamReader(this.stream); + BufferedReader r = new BufferedReader(isr); + + try { + while (true) { + String line = r.readLine(); + if (null == line) { + break; // stream was closed by remote end. + } + + LoggingStreamHandlerFactory.this.contextLog.info(line); + } + } catch (IOException ioe) { + LOG.error("IOException reading from stream: " + ioe.toString()); + } + + try { + r.close(); + } catch (IOException ioe) { + LOG.warn("Error closing stream in LoggingStreamHandler: " + ioe.toString()); + } + } + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java b/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java new file mode 100644 index 00000000..96e984d4 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A StreamHandlerFactory that takes the contents of a stream and ignores it. + * + */ +public class NullStreamHandlerFactory implements StreamHandlerFactory { + + public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName()); + + public void processStream(InputStream is) { + new IgnoringThread(is).start(); + } + + /** + * Run a background thread that reads and ignores the + * contents of the stream. + */ + private class IgnoringThread extends Thread { + + private InputStream stream; + + IgnoringThread(final InputStream is) { + this.stream = is; + } + + public void run() { + InputStreamReader isr = new InputStreamReader(this.stream); + BufferedReader r = new BufferedReader(isr); + + try { + while (true) { + String line = r.readLine(); + if (null == line) { + break; // stream was closed by remote end. + } + } + } catch (IOException ioe) { + LOG.warn("IOException reading from (ignored) stream: " + ioe.toString()); + } + + try { + r.close(); + } catch (IOException ioe) { + LOG.warn("Error closing stream in NullStreamHandler: " + ioe.toString()); + } + } + } +} + diff --git a/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java b/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java new file mode 100644 index 00000000..ba87f1b5 --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.io.InputStream; + +/** + * An interface describing a factory class for a Thread class that handles + * input from some sort of stream. + * + * When the stream is closed, the thread should terminate. + * + */ +public interface StreamHandlerFactory { + + /** + * Create and run a thread to handle input from the provided InputStream. + * When processStream returns, the thread should be running; it should + * continue to run until the InputStream is exhausted. + */ + void processStream(InputStream is); +} + diff --git a/src/test/org/apache/hadoop/sqoop/AllTests.java b/src/test/org/apache/hadoop/sqoop/AllTests.java index 35135f87..6a410a27 100644 --- a/src/test/org/apache/hadoop/sqoop/AllTests.java +++ b/src/test/org/apache/hadoop/sqoop/AllTests.java @@ -18,6 +18,7 @@ package org.apache.hadoop.sqoop; +import org.apache.hadoop.sqoop.hive.TestHiveImport; import org.apache.hadoop.sqoop.manager.LocalMySQLTest; import org.apache.hadoop.sqoop.manager.TestHsqldbManager; import org.apache.hadoop.sqoop.manager.TestSqlManager; @@ -46,6 +47,7 @@ public static Test suite() { suite.addTestSuite(TestMultiCols.class); suite.addTestSuite(TestOrderBy.class); suite.addTestSuite(LocalMySQLTest.class); + suite.addTestSuite(TestHiveImport.class); return suite; } diff --git a/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java new file mode 100644 index 00000000..5ea5849b --- /dev/null +++ b/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.hive; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.testutil.HsqldbTestServer; +import org.apache.hadoop.sqoop.testutil.ImportJobTestCase; + +/** + * Test HiveImport capability after an import to HDFS. + */ +public class TestHiveImport extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog(TestHiveImport.class.getName()); + + /** + * Create the argv to pass to Sqoop + * @return the argv as an array of strings. + */ + private String [] getArgv(boolean includeHadoopFlags) { + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + args.add("-D"); + args.add("mapred.job.tracker=local"); + args.add("-D"); + args.add("mapred.map.tasks=1"); + args.add("-D"); + args.add("fs.default.name=file:///"); + } + + args.add("--table"); + args.add(getTableName()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--hive-import"); + args.add("--order-by"); + args.add(getColNames()[0]); + + return args.toArray(new String[0]); + } + + private ImportOptions getImportOptions() { + ImportOptions opts = new ImportOptions(); + try { + opts.parse(getArgv(false)); + } catch (ImportOptions.InvalidOptionsException ioe) { + fail("Invalid options: " + ioe.toString()); + } + + return opts; + } + + private void runImportTest(String tableName, String [] types, String [] values, + String verificationScript) throws IOException { + + // create a table and populate it with a row... + setCurTableName(tableName); + createTableWithColTypes(types, values); + + // set up our mock hive shell to compare our generated script + // against the correct expected one. + ImportOptions options = getImportOptions(); + String hiveHome = options.getHiveHome(); + assertNotNull("hive.home was not set", hiveHome); + Path testDataPath = new Path(new Path(hiveHome), "scripts/" + verificationScript); + System.setProperty("expected.script", testDataPath.toString()); + + // verify that we can import it correctly into hive. + runImport(getArgv(true)); + } + + /** Test that strings and ints are handled in the normal fashion */ + @Test + public void testNormalHiveImport() throws IOException { + String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" }; + String [] vals = { "'test'", "42", "'somestring'" }; + runImportTest("NORMAL_HIVE_IMPORT", types, vals, "normalImport.q"); + } + + /** Test that dates are coerced properly to strings */ + @Test + public void testDate() throws IOException { + String [] types = { "VARCHAR(32)", "DATE" }; + String [] vals = { "'test'", "'2009-05-12'" }; + runImportTest("DATE_HIVE_IMPORT", types, vals, "dateImport.q"); + } + + /** Test that NUMERICs are coerced to doubles */ + @Test + public void testNumeric() throws IOException { + String [] types = { "NUMERIC", "CHAR(64)" }; + String [] vals = { "3.14159", "'foo'" }; + runImportTest("NUMERIC_HIVE_IMPORT", types, vals, "numericImport.q"); + } + + /** If bin/hive returns an error exit status, we should get an IOException */ + @Test + public void testHiveExitFails() { + // The expected script is different than the one which would be generated + // by this, so we expect an IOException out. + String [] types = { "NUMERIC", "CHAR(64)" }; + String [] vals = { "3.14159", "'foo'" }; + try { + runImportTest("FAILING_HIVE_IMPORT", types, vals, "failingImport.q"); + // If we get here, then the run succeeded -- which is incorrect. + fail("FAILING_HIVE_IMPORT test should have thrown IOException"); + } catch (IOException ioe) { + // expected; ok. + } + } + +} + diff --git a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java index 2203b136..4c9b80f5 100644 --- a/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java +++ b/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java @@ -46,9 +46,6 @@ /** * Class that implements common methods required for tests which import data * from SQL into HDFS and verify correct import. - * - * - * */ public class ImportJobTestCase extends TestCase { @@ -71,6 +68,13 @@ public class ImportJobTestCase extends TestCase { LOCAL_WAREHOUSE_DIR = TEMP_BASE_DIR + "sqoop/warehouse"; } + // Used if a test manually sets the table name to be used. + private String curTableName; + + protected void setCurTableName(String curName) { + this.curTableName = curName; + } + /** * Because of how classloading works, we don't actually want to name * all the tables the same thing -- they'll actually just use the same @@ -83,7 +87,11 @@ public class ImportJobTestCase extends TestCase { static final String TABLE_NAME = "IMPORT_TABLE_"; protected String getTableName() { - return TABLE_NAME + Integer.toString(tableNum); + if (null != curTableName) { + return curTableName; + } else { + return TABLE_NAME + Integer.toString(tableNum); + } } protected String getWarehouseDir() { @@ -140,12 +148,15 @@ public void setUp() { @After public void tearDown() { + setCurTableName(null); // clear user-override table name. + try { manager.close(); } catch (SQLException sqlE) { LOG.error("Got SQLException: " + sqlE.toString()); fail("Got SQLException: " + sqlE.toString()); } + } static final String BASE_COL_NAME = "DATA_COL"; @@ -385,7 +396,9 @@ protected void runImport(String [] argv) throws IOException { } // expect a successful return. - assertEquals("Failure during job", 0, ret); + if (0 != ret) { + throw new IOException("Failure during job; return status " + ret); + } } } diff --git a/testdata/hive/bin/hive b/testdata/hive/bin/hive new file mode 100644 index 00000000..a4cb8520 --- /dev/null +++ b/testdata/hive/bin/hive @@ -0,0 +1,59 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a mock "Hive" shell that validates whether various test imports +# succeeded. It accepts commands of the form 'hive -f scriptname' +# and validates that the script contents match those of an expected script. +# The filename to that expected script is set via the environment variable +# EXPECTED_SCRIPT. + +# The script will contain a pathname as part of the LOAD DATA INPATH statement; +# depending on where you run the tests from, this can change. So the expected +# script file actually contains the marker string "BASEPATH" which is replaced +# by this script with the contents of $TMPDIR, which is set to 'test.build.data'. + +if [ -z "$EXPECTED_SCRIPT" ]; then + echo "No expected script set" + exit 1 +elif [ -z "$TMPDIR" ]; then + TMPDIR=/tmp +elif [ "$1" != "-f" ]; then + echo "Misunderstood argument: $1" + echo "Expected '-f'." + exit 1 +elif [ -z "$2" ]; then + echo "Expected: hive -f filename" + exit 1 +else + GENERATED_SCRIPT=$2 +fi + +# Normalize this to an absolute path +TMPDIR=`cd $TMPDIR && pwd` + +# Copy the expected script into the tmpdir and replace the marker. +cp "$EXPECTED_SCRIPT" "$TMPDIR" +SCRIPT_BASE=`basename $EXPECTED_SCRIPT` +COPIED_SCRIPT="$TMPDIR/$SCRIPT_BASE" +sed -i -e "s|BASEPATH|$TMPDIR|" $COPIED_SCRIPT + +# Actually check to see that the input we got matches up. +diff --ignore-all-space --ignore-blank-lines "$COPIED_SCRIPT" "$GENERATED_SCRIPT" +ret=$? + +exit $ret + diff --git a/testdata/hive/scripts/dateImport.q b/testdata/hive/scripts/dateImport.q new file mode 100644 index 00000000..50f94f98 --- /dev/null +++ b/testdata/hive/scripts/dateImport.q @@ -0,0 +1,2 @@ +CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; +LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT; diff --git a/testdata/hive/scripts/failingImport.q b/testdata/hive/scripts/failingImport.q new file mode 100644 index 00000000..50f94f98 --- /dev/null +++ b/testdata/hive/scripts/failingImport.q @@ -0,0 +1,2 @@ +CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; +LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT; diff --git a/testdata/hive/scripts/normalImport.q b/testdata/hive/scripts/normalImport.q new file mode 100644 index 00000000..a79e08f0 --- /dev/null +++ b/testdata/hive/scripts/normalImport.q @@ -0,0 +1,2 @@ +CREATE TABLE NORMAL_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; +LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT' INTO TABLE NORMAL_HIVE_IMPORT; diff --git a/testdata/hive/scripts/numericImport.q b/testdata/hive/scripts/numericImport.q new file mode 100644 index 00000000..b552209d --- /dev/null +++ b/testdata/hive/scripts/numericImport.q @@ -0,0 +1,2 @@ +CREATE TABLE NUMERIC_HIVE_IMPORT ( DATA_COL0 DOUBLE, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; +LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NUMERIC_HIVE_IMPORT' INTO TABLE NUMERIC_HIVE_IMPORT;