diff --git a/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java index dc7fca2a..7ec1c9cb 100644 --- a/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java +++ b/src/java/org/apache/hadoop/sqoop/lib/LargeObjectLoader.java @@ -83,8 +83,7 @@ public LargeObjectLoader(Configuration conf, Path workPath) * @return a filename to use to put an external LOB in. */ private String getNextLobFileName() { - String file = "_lob/obj_" + conf.get("mapreduce.task.id", - conf.get("mapred.task.id", "unknown_task_id")) + String file = "_lob/obj_" + TaskId.get(conf, "unknown_task_id") + nextLobFileId; nextLobFileId++; diff --git a/src/java/org/apache/hadoop/sqoop/lib/TaskId.java b/src/java/org/apache/hadoop/sqoop/lib/TaskId.java new file mode 100644 index 00000000..b3054f9f --- /dev/null +++ b/src/java/org/apache/hadoop/sqoop/lib/TaskId.java @@ -0,0 +1,41 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.lib; + +import org.apache.hadoop.conf.Configuration; + +/** + * Utility class; returns task attempt Id of the current job + * regardless of Hadoop version being used. + */ +public final class TaskId { + + private TaskId() { + } + + /** + * @param conf the Configuration to check for the current task attempt id. + * @param defaultVal the value to return if a task attempt id is not set. + * @return the current task attempt id, or the default value if one isn't set. + */ + public static String get(Configuration conf, String defaultVal) { + return conf.get("mapreduce.task.id", + conf.get("mapred.task.id", defaultVal)); + } +} diff --git a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportMapper.java b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportMapper.java index 3606ac33..64e317b6 100644 --- a/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportMapper.java +++ b/src/java/org/apache/hadoop/sqoop/mapreduce/MySQLExportMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.sqoop.lib.TaskId; import org.apache.hadoop.sqoop.manager.MySQLUtils; import org.apache.hadoop.sqoop.shims.HadoopShim; import org.apache.hadoop.sqoop.util.AsyncSink; @@ -100,7 +101,19 @@ public class MySQLExportMapper private void initMySQLImportProcess() throws IOException { String tmpDir = conf.get(HadoopShim.get().getJobLocalDirProperty(), "/tmp/"); - this.fifoFile = new File(tmpDir, + + // Create a local subdir specific to this task attempt. + String taskAttemptStr = TaskId.get(conf, "mysql_export"); + File taskAttemptDir = new File(tmpDir, taskAttemptStr); + if (!taskAttemptDir.exists()) { + boolean createdDir = taskAttemptDir.mkdir(); + if (!createdDir) { + LOG.warn("Could not create non-existent task attempt dir: " + + taskAttemptDir.toString()); + } + } + + this.fifoFile = new File(taskAttemptDir, conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt"); String filename = fifoFile.toString(); @@ -282,6 +295,13 @@ private void closeExportHandles() throws IOException, InterruptedException { LOG.error("Could not clean up named FIFO after completing mapper"); } + // We put the FIFO file in a one-off subdir. Remove that. + File fifoParentDir = this.fifoFile.getParentFile(); + LOG.debug("Removing task attempt tmpdir"); + if (!fifoParentDir.delete()) { + LOG.error("Could not clean up task dir after completing mapper"); + } + this.fifoFile = null; }