From ac43ae8d76331f6a1df5c7a366143637d6ff879a Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Fri, 22 Jul 2011 20:04:10 +0000 Subject: [PATCH] SQOOP-65. Refactor some utilities out of MySQLExportMapper. getLocalWorkPath() moved into c.c.s.util.TaskId. From: Aaron Kimball git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149955 13f79535-47bb-0310-9956-ffa450edef68 --- .../sqoop/mapreduce/MySQLExportMapper.java | 15 +-------- src/java/com/cloudera/sqoop/util/TaskId.java | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java index 639a0a80..0802f319 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java @@ -34,7 +34,6 @@ import com.cloudera.sqoop.io.NamedFifo; import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.manager.MySQLUtils; -import com.cloudera.sqoop.shims.HadoopShim; import com.cloudera.sqoop.util.AsyncSink; import com.cloudera.sqoop.util.JdbcUrl; import com.cloudera.sqoop.util.LoggingAsyncSink; @@ -100,19 +99,7 @@ public class MySQLExportMapper * A File object representing the FIFO is in 'fifoFile'. */ private void initMySQLImportProcess() throws IOException { - String tmpDir = conf.get(HadoopShim.get().getJobLocalDirProperty(), - "/tmp/"); - - // Create a local subdir specific to this task attempt. - String taskAttemptStr = TaskId.get(conf, "mysql_export"); - File taskAttemptDir = new File(tmpDir, taskAttemptStr); - if (!taskAttemptDir.exists()) { - boolean createdDir = taskAttemptDir.mkdir(); - if (!createdDir) { - LOG.warn("Could not create non-existent task attempt dir: " - + taskAttemptDir.toString()); - } - } + File taskAttemptDir = TaskId.getLocalWorkPath(conf); this.fifoFile = new File(taskAttemptDir, conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt"); diff --git a/src/java/com/cloudera/sqoop/util/TaskId.java b/src/java/com/cloudera/sqoop/util/TaskId.java index 5f57a85d..e9eb4dfc 100644 --- a/src/java/com/cloudera/sqoop/util/TaskId.java +++ b/src/java/com/cloudera/sqoop/util/TaskId.java @@ -18,8 +18,13 @@ package com.cloudera.sqoop.util; +import java.io.File; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; +import com.cloudera.sqoop.shims.HadoopShim; + /** * Utility class; returns task attempt Id of the current job * regardless of Hadoop version being used. @@ -30,6 +35,7 @@ private TaskId() { } /** + * Return the task attempt id as a string. * @param conf the Configuration to check for the current task attempt id. * @param defaultVal the value to return if a task attempt id is not set. * @return the current task attempt id, or the default value if one isn't set. @@ -38,4 +44,29 @@ public static String get(Configuration conf, String defaultVal) { return conf.get("mapreduce.task.id", conf.get("mapred.task.id", defaultVal)); } + + /** + * Return the local filesystem dir where the current task attempt can + * perform work. + * @return a File describing a directory where local temp data for the + * task attempt can be stored. + */ + public static File getLocalWorkPath(Configuration conf) throws IOException { + String tmpDir = conf.get(HadoopShim.get().getJobLocalDirProperty(), + "/tmp/"); + + // Create a local subdir specific to this task attempt. + String taskAttemptStr = TaskId.get(conf, "task_attempt"); + File taskAttemptDir = new File(tmpDir, taskAttemptStr); + if (!taskAttemptDir.exists()) { + boolean createdDir = taskAttemptDir.mkdirs(); + if (!createdDir) { + throw new IOException("Could not create missing task attempt dir: " + + taskAttemptDir.toString()); + } + } + + return taskAttemptDir; + } + }