diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java index 78184a54..ce34286d 100644 --- a/src/java/org/apache/sqoop/hive/HiveImport.java +++ b/src/java/org/apache/sqoop/hive/HiveImport.java @@ -101,10 +101,8 @@ private String getHiveBinPath() { * If we used a MapReduce-based upload of the data, remove the _logs dir * from where we put it, before running Hive LOAD DATA INPATH. */ - private void removeTempLogs(String tableName) throws IOException { + private void removeTempLogs(Path tablePath) throws IOException { FileSystem fs = FileSystem.get(configuration); - Path tablePath = getOutputPath(tableName); - Path logsPath = new Path(tablePath, "_logs"); if (fs.exists(logsPath)) { LOG.info("Removing temporary files from import process: " + logsPath); @@ -115,26 +113,6 @@ private void removeTempLogs(String tableName) throws IOException { } } - /** - * Get directory where we stored job output files. - * - * @param tableName imported table name - * @return Path with directory where output files can be found - */ - private Path getOutputPath(String tableName) { - if (null != tableName) { - String warehouseDir = options.getWarehouseDir(); - if (warehouseDir != null) { - return new Path(new Path(warehouseDir), tableName); - } else { - return new Path(tableName); - } - } else { - // --table option is not used, so use the target dir instead - return new Path(options.getTargetDir()); - } - } - /** * @return true if we're just generating the DDL for the import, but * not actually running it (i.e., --generate-only mode). If so, don't @@ -171,11 +149,6 @@ private File getScriptFile(String outputTableName) throws IOException { public void importTable(String inputTableName, String outputTableName, boolean createOnly) throws IOException { - if (!isGenerateOnly()) { - removeTempLogs(inputTableName); - LOG.info("Loading uploaded data into Hive"); - } - if (null == outputTableName) { outputTableName = inputTableName; } @@ -200,17 +173,21 @@ public void importTable(String inputTableName, String outputTableName, configuration, !debugMode); String createTableStr = tableWriter.getCreateTableStmt() + ";\n"; String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n"; + Path finalPath = tableWriter.getFinalPath(); if (!isGenerateOnly()) { + removeTempLogs(finalPath); + LOG.info("Loading uploaded data into Hive"); + String codec = options.getCompressionCodec(); if (codec != null && (codec.equals(CodecMap.LZOP) || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) { try { - String finalPathStr = tableWriter.getFinalPathStr(); Tool tool = ReflectionUtils.newInstance(Class. forName("com.hadoop.compression.lzo.DistributedLzoIndexer"). asSubclass(Tool.class), configuration); - ToolRunner.run(configuration, tool, new String[] { finalPathStr }); + ToolRunner.run(configuration, tool, + new String[] { finalPath.toString() }); } catch (Exception ex) { LOG.error("Error indexing lzo files", ex); throw new IOException("Error indexing lzo files", ex); @@ -250,7 +227,7 @@ public void importTable(String inputTableName, String outputTableName, LOG.info("Hive import complete."); - cleanUp(inputTableName); + cleanUp(finalPath); } } finally { if (!isGenerateOnly()) { @@ -267,23 +244,22 @@ public void importTable(String inputTableName, String outputTableName, /** * Clean up after successful HIVE import. * - * @param table Imported table name + * @param outputPath path to the output directory * @throws IOException */ - private void cleanUp(String table) throws IOException { + private void cleanUp(Path outputPath) throws IOException { FileSystem fs = FileSystem.get(configuration); // HIVE is not always removing input directory after LOAD DATA statement // (which is our export directory). We're removing export directory in case // that is blank for case that user wants to periodically populate HIVE // table (for example with --hive-overwrite). - Path outputPath = getOutputPath(table); try { if (outputPath != null && fs.exists(outputPath)) { FileStatus[] statuses = fs.listStatus(outputPath); if (statuses.length == 0) { LOG.info("Export directory is empty, removing it."); - fs.delete(getOutputPath(table)); + fs.delete(outputPath, true); } else { LOG.info("Export directory is not empty, keeping it."); } diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java index 3383103a..8fc2d9d9 100644 --- a/src/java/org/apache/sqoop/hive/TableDefWriter.java +++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java @@ -217,11 +217,11 @@ public String getCreateTableStmt() throws IOException { * @return the LOAD DATA statement to import the data in HDFS into hive. */ public String getLoadDataStmt() throws IOException { - String finalPathStr = getFinalPathStr(); + Path finalPath = getFinalPath(); StringBuilder sb = new StringBuilder(); sb.append("LOAD DATA INPATH '"); - sb.append(finalPathStr + "'"); + sb.append(finalPath.toString() + "'"); if (options.doOverwriteHiveTable()) { sb.append(" OVERWRITE"); } @@ -240,7 +240,7 @@ public String getLoadDataStmt() throws IOException { return sb.toString(); } - public String getFinalPathStr() throws IOException { + public Path getFinalPath() throws IOException { String warehouseDir = options.getWarehouseDir(); if (null == warehouseDir) { warehouseDir = ""; @@ -248,15 +248,18 @@ public String getFinalPathStr() throws IOException { warehouseDir = warehouseDir + File.separator; } - String tablePath; - if (null != inputTableName) { - tablePath = warehouseDir + inputTableName; + // Final path is determined in the following order: + // 1. Use target dir if the user specified. + // 2. Use input table name. + String tablePath = null; + String targetDir = options.getTargetDir(); + if (null != targetDir) { + tablePath = warehouseDir + targetDir; } else { - tablePath = options.getTargetDir(); + tablePath = warehouseDir + inputTableName; } FileSystem fs = FileSystem.get(configuration); - Path finalPath = new Path(tablePath).makeQualified(fs); - return finalPath.toString(); + return new Path(tablePath).makeQualified(fs); } /** diff --git a/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java b/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java index f2d38498..4954e381 100644 --- a/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java +++ b/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java @@ -83,6 +83,34 @@ public void testDifferentTableNames() throws Exception { assertTrue(loadData.indexOf("/inputTable'") != -1); } + public void testDifferentTargetDirs() throws Exception { + String targetDir = "targetDir"; + String inputTable = "inputTable"; + String outputTable = "outputTable"; + + Configuration conf = new Configuration(); + SqoopOptions options = new SqoopOptions(); + // Specify a different target dir from input table name + options.setTargetDir(targetDir); + TableDefWriter writer = new TableDefWriter(options, null, + inputTable, outputTable, conf, false); + + Map colTypes = new HashMap(); + writer.setColumnTypes(colTypes); + + String createTable = writer.getCreateTableStmt(); + String loadData = writer.getLoadDataStmt(); + + LOG.debug("Create table stmt: " + createTable); + LOG.debug("Load data stmt: " + loadData); + + // Assert that the statements generated have the form we expect. + assertTrue(createTable.indexOf( + "CREATE TABLE IF NOT EXISTS `" + outputTable + "`") != -1); + assertTrue(loadData.indexOf("INTO TABLE `" + outputTable + "`") != -1); + assertTrue(loadData.indexOf("/" + targetDir + "'") != -1); + } + public void testPartitions() throws Exception { String[] args = { "--hive-partition-key", "ds",