5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-21 11:21:39 +08:00

SQOOP-483. Allow target dir to be set to a different name than table name for hive import.

(Cheolsoo Park via Jarek Jarcec Cecho)


git-svn-id: https://svn.apache.org/repos/asf/sqoop/trunk@1342998 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jarek Jarcec Cecho 2012-05-27 06:18:02 +00:00
parent 35365d5e21
commit 06b636aac6
3 changed files with 51 additions and 44 deletions

View File

@ -101,10 +101,8 @@ private String getHiveBinPath() {
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH.
*/
private void removeTempLogs(String tableName) throws IOException {
private void removeTempLogs(Path tablePath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
Path tablePath = getOutputPath(tableName);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
@ -115,26 +113,6 @@ private void removeTempLogs(String tableName) throws IOException {
}
}
/**
* Get directory where we stored job output files.
*
* @param tableName imported table name
* @return Path with directory where output files can be found
*/
private Path getOutputPath(String tableName) {
if (null != tableName) {
String warehouseDir = options.getWarehouseDir();
if (warehouseDir != null) {
return new Path(new Path(warehouseDir), tableName);
} else {
return new Path(tableName);
}
} else {
// --table option is not used, so use the target dir instead
return new Path(options.getTargetDir());
}
}
/**
* @return true if we're just generating the DDL for the import, but
* not actually running it (i.e., --generate-only mode). If so, don't
@ -171,11 +149,6 @@ private File getScriptFile(String outputTableName) throws IOException {
public void importTable(String inputTableName, String outputTableName,
boolean createOnly) throws IOException {
if (!isGenerateOnly()) {
removeTempLogs(inputTableName);
LOG.info("Loading uploaded data into Hive");
}
if (null == outputTableName) {
outputTableName = inputTableName;
}
@ -200,17 +173,21 @@ public void importTable(String inputTableName, String outputTableName,
configuration, !debugMode);
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
Path finalPath = tableWriter.getFinalPath();
if (!isGenerateOnly()) {
removeTempLogs(finalPath);
LOG.info("Loading uploaded data into Hive");
String codec = options.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
try {
String finalPathStr = tableWriter.getFinalPathStr();
Tool tool = ReflectionUtils.newInstance(Class.
forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
asSubclass(Tool.class), configuration);
ToolRunner.run(configuration, tool, new String[] { finalPathStr });
ToolRunner.run(configuration, tool,
new String[] { finalPath.toString() });
} catch (Exception ex) {
LOG.error("Error indexing lzo files", ex);
throw new IOException("Error indexing lzo files", ex);
@ -250,7 +227,7 @@ public void importTable(String inputTableName, String outputTableName,
LOG.info("Hive import complete.");
cleanUp(inputTableName);
cleanUp(finalPath);
}
} finally {
if (!isGenerateOnly()) {
@ -267,23 +244,22 @@ public void importTable(String inputTableName, String outputTableName,
/**
* Clean up after successful HIVE import.
*
* @param table Imported table name
* @param outputPath path to the output directory
* @throws IOException
*/
private void cleanUp(String table) throws IOException {
private void cleanUp(Path outputPath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
Path outputPath = getOutputPath(table);
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
fs.delete(getOutputPath(table));
fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}

View File

@ -217,11 +217,11 @@ public String getCreateTableStmt() throws IOException {
* @return the LOAD DATA statement to import the data in HDFS into hive.
*/
public String getLoadDataStmt() throws IOException {
String finalPathStr = getFinalPathStr();
Path finalPath = getFinalPath();
StringBuilder sb = new StringBuilder();
sb.append("LOAD DATA INPATH '");
sb.append(finalPathStr + "'");
sb.append(finalPath.toString() + "'");
if (options.doOverwriteHiveTable()) {
sb.append(" OVERWRITE");
}
@ -240,7 +240,7 @@ public String getLoadDataStmt() throws IOException {
return sb.toString();
}
public String getFinalPathStr() throws IOException {
public Path getFinalPath() throws IOException {
String warehouseDir = options.getWarehouseDir();
if (null == warehouseDir) {
warehouseDir = "";
@ -248,15 +248,18 @@ public String getFinalPathStr() throws IOException {
warehouseDir = warehouseDir + File.separator;
}
String tablePath;
if (null != inputTableName) {
tablePath = warehouseDir + inputTableName;
// Final path is determined in the following order:
// 1. Use target dir if the user specified.
// 2. Use input table name.
String tablePath = null;
String targetDir = options.getTargetDir();
if (null != targetDir) {
tablePath = warehouseDir + targetDir;
} else {
tablePath = options.getTargetDir();
tablePath = warehouseDir + inputTableName;
}
FileSystem fs = FileSystem.get(configuration);
Path finalPath = new Path(tablePath).makeQualified(fs);
return finalPath.toString();
return new Path(tablePath).makeQualified(fs);
}
/**

View File

@ -83,6 +83,34 @@ public void testDifferentTableNames() throws Exception {
assertTrue(loadData.indexOf("/inputTable'") != -1);
}
public void testDifferentTargetDirs() throws Exception {
String targetDir = "targetDir";
String inputTable = "inputTable";
String outputTable = "outputTable";
Configuration conf = new Configuration();
SqoopOptions options = new SqoopOptions();
// Specify a different target dir from input table name
options.setTargetDir(targetDir);
TableDefWriter writer = new TableDefWriter(options, null,
inputTable, outputTable, conf, false);
Map<String, Integer> colTypes = new HashMap<String, Integer>();
writer.setColumnTypes(colTypes);
String createTable = writer.getCreateTableStmt();
String loadData = writer.getLoadDataStmt();
LOG.debug("Create table stmt: " + createTable);
LOG.debug("Load data stmt: " + loadData);
// Assert that the statements generated have the form we expect.
assertTrue(createTable.indexOf(
"CREATE TABLE IF NOT EXISTS `" + outputTable + "`") != -1);
assertTrue(loadData.indexOf("INTO TABLE `" + outputTable + "`") != -1);
assertTrue(loadData.indexOf("/" + targetDir + "'") != -1);
}
public void testPartitions() throws Exception {
String[] args = {
"--hive-partition-key", "ds",