mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 01:19:38 +08:00
SQOOP-65. Refactor some utilities out of MySQLExportMapper.
getLocalWorkPath() moved into c.c.s.util.TaskId. From: Aaron Kimball <aaron@cloudera.com> git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1149955 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1749a84f68
commit
ac43ae8d76
@ -34,7 +34,6 @@
|
|||||||
import com.cloudera.sqoop.io.NamedFifo;
|
import com.cloudera.sqoop.io.NamedFifo;
|
||||||
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
|
||||||
import com.cloudera.sqoop.manager.MySQLUtils;
|
import com.cloudera.sqoop.manager.MySQLUtils;
|
||||||
import com.cloudera.sqoop.shims.HadoopShim;
|
|
||||||
import com.cloudera.sqoop.util.AsyncSink;
|
import com.cloudera.sqoop.util.AsyncSink;
|
||||||
import com.cloudera.sqoop.util.JdbcUrl;
|
import com.cloudera.sqoop.util.JdbcUrl;
|
||||||
import com.cloudera.sqoop.util.LoggingAsyncSink;
|
import com.cloudera.sqoop.util.LoggingAsyncSink;
|
||||||
@ -100,19 +99,7 @@ public class MySQLExportMapper<KEYIN, VALIN>
|
|||||||
* A File object representing the FIFO is in 'fifoFile'.
|
* A File object representing the FIFO is in 'fifoFile'.
|
||||||
*/
|
*/
|
||||||
private void initMySQLImportProcess() throws IOException {
|
private void initMySQLImportProcess() throws IOException {
|
||||||
String tmpDir = conf.get(HadoopShim.get().getJobLocalDirProperty(),
|
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
||||||
"/tmp/");
|
|
||||||
|
|
||||||
// Create a local subdir specific to this task attempt.
|
|
||||||
String taskAttemptStr = TaskId.get(conf, "mysql_export");
|
|
||||||
File taskAttemptDir = new File(tmpDir, taskAttemptStr);
|
|
||||||
if (!taskAttemptDir.exists()) {
|
|
||||||
boolean createdDir = taskAttemptDir.mkdir();
|
|
||||||
if (!createdDir) {
|
|
||||||
LOG.warn("Could not create non-existent task attempt dir: "
|
|
||||||
+ taskAttemptDir.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.fifoFile = new File(taskAttemptDir,
|
this.fifoFile = new File(taskAttemptDir,
|
||||||
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
|
conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
|
||||||
|
@ -18,8 +18,13 @@
|
|||||||
|
|
||||||
package com.cloudera.sqoop.util;
|
package com.cloudera.sqoop.util;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import com.cloudera.sqoop.shims.HadoopShim;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class; returns task attempt Id of the current job
|
* Utility class; returns task attempt Id of the current job
|
||||||
* regardless of Hadoop version being used.
|
* regardless of Hadoop version being used.
|
||||||
@ -30,6 +35,7 @@ private TaskId() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Return the task attempt id as a string.
|
||||||
* @param conf the Configuration to check for the current task attempt id.
|
* @param conf the Configuration to check for the current task attempt id.
|
||||||
* @param defaultVal the value to return if a task attempt id is not set.
|
* @param defaultVal the value to return if a task attempt id is not set.
|
||||||
* @return the current task attempt id, or the default value if one isn't set.
|
* @return the current task attempt id, or the default value if one isn't set.
|
||||||
@ -38,4 +44,29 @@ public static String get(Configuration conf, String defaultVal) {
|
|||||||
return conf.get("mapreduce.task.id",
|
return conf.get("mapreduce.task.id",
|
||||||
conf.get("mapred.task.id", defaultVal));
|
conf.get("mapred.task.id", defaultVal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the local filesystem dir where the current task attempt can
|
||||||
|
* perform work.
|
||||||
|
* @return a File describing a directory where local temp data for the
|
||||||
|
* task attempt can be stored.
|
||||||
|
*/
|
||||||
|
public static File getLocalWorkPath(Configuration conf) throws IOException {
|
||||||
|
String tmpDir = conf.get(HadoopShim.get().getJobLocalDirProperty(),
|
||||||
|
"/tmp/");
|
||||||
|
|
||||||
|
// Create a local subdir specific to this task attempt.
|
||||||
|
String taskAttemptStr = TaskId.get(conf, "task_attempt");
|
||||||
|
File taskAttemptDir = new File(tmpDir, taskAttemptStr);
|
||||||
|
if (!taskAttemptDir.exists()) {
|
||||||
|
boolean createdDir = taskAttemptDir.mkdirs();
|
||||||
|
if (!createdDir) {
|
||||||
|
throw new IOException("Could not create missing task attempt dir: "
|
||||||
|
+ taskAttemptDir.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return taskAttemptDir;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user