mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 20:40:58 +08:00
SQOOP-2164: Enhance the Netezza Connector for Sqoop
(Venkat Ranganathan via Abraham Elmahrek)
This commit is contained in:
parent
3a475c9694
commit
fca329b326
@ -389,8 +389,23 @@ Argument Description
|
|||||||
Default value is 1.
|
Default value is 1.
|
||||||
+--log-dir+ Applicable only in direct mode.\
|
+--log-dir+ Applicable only in direct mode.\
|
||||||
Specifies the directory where Netezza\
|
Specifies the directory where Netezza\
|
||||||
external table operation logs are stored.\
|
external table operation logs are stored\
|
||||||
Default value is /tmp.
|
on the hadoop filesystem. Logs are\
|
||||||
|
stored under this directory with one\
|
||||||
|
directory for the job and sub-directories\
|
||||||
|
for each task number and attempt.\
|
||||||
|
Default value is the user home directory.
|
||||||
|
+--trunc-string+ Applicable only in direct mode.\
|
||||||
|
Specifies whether the system \
|
||||||
|
truncates strings to the declared\
|
||||||
|
storage and loads the data. By default\
|
||||||
|
truncation of strings is reported as an\
|
||||||
|
error.
|
||||||
|
+--ctrl-chars+ Applicable only in direct mode.\
|
||||||
|
Specifies whether control characters \
|
||||||
|
(ASCII chars 1 - 31) can be allowed \
|
||||||
|
to be part of char/nchar/varchar/nvarchar\
|
||||||
|
columns. Default is false.
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob;
|
import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob;
|
||||||
import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob;
|
import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob;
|
||||||
|
|
||||||
@ -58,11 +59,24 @@ public class DirectNetezzaManager extends NetezzaManager {
|
|||||||
public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG =
|
public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG =
|
||||||
"max-errors";
|
"max-errors";
|
||||||
|
|
||||||
|
public static final String NETEZZA_CTRL_CHARS_OPT =
|
||||||
|
"netezza.ctrl.chars";
|
||||||
|
public static final String NETEZZA_CTRL_CHARS_LONG_ARG =
|
||||||
|
"ctrl-chars";
|
||||||
|
|
||||||
|
public static final String NETEZZA_TRUNC_STRING_OPT =
|
||||||
|
"netezza.trunc.string";
|
||||||
|
public static final String NETEZZA_TRUNC_STRING_LONG_ARG =
|
||||||
|
"trunc-string";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
|
private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
|
||||||
"SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
|
"SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
|
||||||
+ " AND TABLENAME = ?";
|
+ " AND TABLENAME = ?";
|
||||||
public static final String NETEZZA_NULL_VALUE =
|
public static final String NETEZZA_NULL_VALUE =
|
||||||
"netezza.exttable.null.value";
|
"netezza.exttable.null.value";
|
||||||
|
|
||||||
public DirectNetezzaManager(SqoopOptions opts) {
|
public DirectNetezzaManager(SqoopOptions opts) {
|
||||||
super(opts);
|
super(opts);
|
||||||
try {
|
try {
|
||||||
@ -159,7 +173,7 @@ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
|
|||||||
options = context.getOptions();
|
options = context.getOptions();
|
||||||
context.setConnManager(this);
|
context.setConnManager(this);
|
||||||
|
|
||||||
checkTable(); // Throws excpetion as necessary
|
checkTable(); // Throws exception as necessary
|
||||||
NetezzaExternalTableExportJob exporter = null;
|
NetezzaExternalTableExportJob exporter = null;
|
||||||
|
|
||||||
char qc = (char) options.getInputEnclosedBy();
|
char qc = (char) options.getInputEnclosedBy();
|
||||||
@ -248,6 +262,12 @@ protected RelatedOptions getNetezzaExtraOpts() {
|
|||||||
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT)
|
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT)
|
||||||
.hasArg().withDescription("Netezza log directory")
|
.hasArg().withDescription("Netezza log directory")
|
||||||
.withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create());
|
.withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create());
|
||||||
|
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CTRL_CHARS_OPT)
|
||||||
|
.withDescription("Allow control chars in data")
|
||||||
|
.withLongOpt(NETEZZA_CTRL_CHARS_LONG_ARG).create());
|
||||||
|
netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT)
|
||||||
|
.withDescription("Truncate string to declared storage size")
|
||||||
|
.withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create());
|
||||||
return netezzaOpts;
|
return netezzaOpts;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,6 +290,12 @@ private void handleNetezzaExtraArgs(SqoopOptions opts)
|
|||||||
conf.set(NETEZZA_LOG_DIR_OPT, dir);
|
conf.set(NETEZZA_LOG_DIR_OPT, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
|
||||||
|
cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
|
||||||
|
|
||||||
|
conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
|
||||||
|
cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
|
||||||
|
|
||||||
// Always true for Netezza direct mode access
|
// Always true for Netezza direct mode access
|
||||||
conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
|
conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
|
||||||
}
|
}
|
||||||
@ -291,4 +317,15 @@ public boolean isORMFacilitySelfManaged() {
|
|||||||
public boolean isDirectModeHCatSupported() {
|
public boolean isDirectModeHCatSupported() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static String getLocalLogDir(TaskAttemptID attemptId) {
|
||||||
|
int tid = attemptId.getTaskID().getId();
|
||||||
|
int aid = attemptId.getId();
|
||||||
|
String jid = attemptId.getJobID().toString();
|
||||||
|
StringBuilder sb = new StringBuilder(jid).append('-');
|
||||||
|
sb.append(tid).append('-').append(aid);
|
||||||
|
String localLogDir = sb.toString();
|
||||||
|
return localLogDir;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,15 +20,21 @@
|
|||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.sqoop.io.NamedFifo;
|
import org.apache.sqoop.io.NamedFifo;
|
||||||
@ -36,6 +42,7 @@
|
|||||||
import org.apache.sqoop.manager.DirectNetezzaManager;
|
import org.apache.sqoop.manager.DirectNetezzaManager;
|
||||||
import org.apache.sqoop.mapreduce.SqoopMapper;
|
import org.apache.sqoop.mapreduce.SqoopMapper;
|
||||||
import org.apache.sqoop.mapreduce.db.DBConfiguration;
|
import org.apache.sqoop.mapreduce.db.DBConfiguration;
|
||||||
|
import org.apache.sqoop.util.FileUploader;
|
||||||
import org.apache.sqoop.util.PerfCounters;
|
import org.apache.sqoop.util.PerfCounters;
|
||||||
import org.apache.sqoop.util.TaskId;
|
import org.apache.sqoop.util.TaskId;
|
||||||
|
|
||||||
@ -61,6 +68,9 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
|
|||||||
private NetezzaJDBCStatementRunner extTableThread;
|
private NetezzaJDBCStatementRunner extTableThread;
|
||||||
private PerfCounters counter;
|
private PerfCounters counter;
|
||||||
private DelimiterSet outputDelimiters;
|
private DelimiterSet outputDelimiters;
|
||||||
|
private String localLogDir = null;
|
||||||
|
private String logDir = null;
|
||||||
|
private File taskAttemptDir = null;
|
||||||
|
|
||||||
private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
||||||
|
|
||||||
@ -72,7 +82,11 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
|||||||
|
|
||||||
int errorThreshold = conf.getInt(
|
int errorThreshold = conf.getInt(
|
||||||
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
||||||
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
|
||||||
|
boolean ctrlChars =
|
||||||
|
conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
|
||||||
|
boolean truncString =
|
||||||
|
conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
|
||||||
|
|
||||||
StringBuilder sqlStmt = new StringBuilder(2048);
|
StringBuilder sqlStmt = new StringBuilder(2048);
|
||||||
|
|
||||||
@ -83,6 +97,12 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
|||||||
sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
|
sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
|
||||||
sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
|
sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
|
||||||
sqlStmt.append(" CRINSTRING FALSE ");
|
sqlStmt.append(" CRINSTRING FALSE ");
|
||||||
|
if (ctrlChars) {
|
||||||
|
sqlStmt.append(" CTRLCHARS TRUE ");
|
||||||
|
}
|
||||||
|
if (truncString) {
|
||||||
|
sqlStmt.append(" TRUNCSTRING TRUE ");
|
||||||
|
}
|
||||||
sqlStmt.append(" DELIMITER ");
|
sqlStmt.append(" DELIMITER ");
|
||||||
sqlStmt.append(Integer.toString(fd));
|
sqlStmt.append(Integer.toString(fd));
|
||||||
sqlStmt.append(" ENCODING 'internal' ");
|
sqlStmt.append(" ENCODING 'internal' ");
|
||||||
@ -112,19 +132,18 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
|||||||
}
|
}
|
||||||
sqlStmt.append(" MAXERRORS ").append(errorThreshold);
|
sqlStmt.append(" MAXERRORS ").append(errorThreshold);
|
||||||
|
|
||||||
if (logDir != null) {
|
|
||||||
logDir = logDir.trim();
|
|
||||||
if (logDir.length() > 0) {
|
File logDirPath = new File(taskAttemptDir, localLogDir);
|
||||||
File logDirPath = new File(logDir);
|
logDirPath.mkdirs();
|
||||||
logDirPath.mkdirs();
|
if (logDirPath.canWrite() && logDirPath.isDirectory()) {
|
||||||
if (logDirPath.canWrite() && logDirPath.isDirectory()) {
|
sqlStmt.append(" LOGDIR ")
|
||||||
sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
|
.append(logDirPath.getAbsolutePath()).append(' ');
|
||||||
} else {
|
} else {
|
||||||
throw new IOException("Unable to create log directory specified");
|
throw new IOException("Unable to create log directory specified");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
sqlStmt.append(")");
|
||||||
sqlStmt.append(")");
|
|
||||||
|
|
||||||
String stmt = sqlStmt.toString();
|
String stmt = sqlStmt.toString();
|
||||||
LOG.debug("SQL generated for external table export" + stmt);
|
LOG.debug("SQL generated for external table export" + stmt);
|
||||||
@ -135,6 +154,12 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException {
|
|||||||
private void initNetezzaExternalTableExport(Context context)
|
private void initNetezzaExternalTableExport(Context context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = context.getConfiguration();
|
this.conf = context.getConfiguration();
|
||||||
|
|
||||||
|
taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
||||||
|
localLogDir =
|
||||||
|
DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
|
||||||
|
logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
||||||
|
|
||||||
dbc = new DBConfiguration(conf);
|
dbc = new DBConfiguration(conf);
|
||||||
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
||||||
|
|
||||||
@ -212,6 +237,9 @@ public void run(Context context) throws IOException, InterruptedException {
|
|||||||
throw new IOException(extTableThread.getException());
|
throw new IOException(extTableThread.getException());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
|
||||||
|
localLogDir, logDir, context.getJobID().toString(),
|
||||||
|
conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,14 +21,19 @@
|
|||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.sqoop.config.ConfigurationHelper;
|
import org.apache.sqoop.config.ConfigurationHelper;
|
||||||
@ -36,6 +41,7 @@
|
|||||||
import org.apache.sqoop.lib.DelimiterSet;
|
import org.apache.sqoop.lib.DelimiterSet;
|
||||||
import org.apache.sqoop.manager.DirectNetezzaManager;
|
import org.apache.sqoop.manager.DirectNetezzaManager;
|
||||||
import org.apache.sqoop.mapreduce.db.DBConfiguration;
|
import org.apache.sqoop.mapreduce.db.DBConfiguration;
|
||||||
|
import org.apache.sqoop.util.FileUploader;
|
||||||
import org.apache.sqoop.util.PerfCounters;
|
import org.apache.sqoop.util.PerfCounters;
|
||||||
import org.apache.sqoop.util.TaskId;
|
import org.apache.sqoop.util.TaskId;
|
||||||
|
|
||||||
@ -61,7 +67,9 @@ public abstract class NetezzaExternalTableImportMapper<K, V> extends
|
|||||||
.getLog(NetezzaExternalTableImportMapper.class.getName());
|
.getLog(NetezzaExternalTableImportMapper.class.getName());
|
||||||
private NetezzaJDBCStatementRunner extTableThread;
|
private NetezzaJDBCStatementRunner extTableThread;
|
||||||
private PerfCounters counter;
|
private PerfCounters counter;
|
||||||
|
private String localLogDir = null;
|
||||||
|
private String logDir = null;
|
||||||
|
private File taskAttemptDir = null;
|
||||||
private String getSqlStatement(int myId) throws IOException {
|
private String getSqlStatement(int myId) throws IOException {
|
||||||
|
|
||||||
char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
|
char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
|
||||||
@ -70,6 +78,11 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
|
|
||||||
String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
|
String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
|
||||||
|
|
||||||
|
boolean ctrlChars =
|
||||||
|
conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
|
||||||
|
boolean truncString =
|
||||||
|
conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
|
||||||
|
|
||||||
int errorThreshold = conf.getInt(
|
int errorThreshold = conf.getInt(
|
||||||
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
|
||||||
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
||||||
@ -82,6 +95,12 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
|
sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
|
||||||
sqlStmt.append(" BOOLSTYLE 'T_F' ");
|
sqlStmt.append(" BOOLSTYLE 'T_F' ");
|
||||||
sqlStmt.append(" CRINSTRING FALSE ");
|
sqlStmt.append(" CRINSTRING FALSE ");
|
||||||
|
if (ctrlChars) {
|
||||||
|
sqlStmt.append(" CTRLCHARS TRUE ");
|
||||||
|
}
|
||||||
|
if (truncString) {
|
||||||
|
sqlStmt.append(" TRUNCSTRING TRUE ");
|
||||||
|
}
|
||||||
sqlStmt.append(" DELIMITER ");
|
sqlStmt.append(" DELIMITER ");
|
||||||
sqlStmt.append(Integer.toString(fd));
|
sqlStmt.append(Integer.toString(fd));
|
||||||
sqlStmt.append(" ENCODING 'internal' ");
|
sqlStmt.append(" ENCODING 'internal' ");
|
||||||
@ -112,17 +131,12 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
|
|
||||||
sqlStmt.append(" MAXERRORS ").append(errorThreshold);
|
sqlStmt.append(" MAXERRORS ").append(errorThreshold);
|
||||||
|
|
||||||
if (logDir != null) {
|
File logDirPath = new File(taskAttemptDir, localLogDir);
|
||||||
logDir = logDir.trim();
|
logDirPath.mkdirs();
|
||||||
if (logDir.length() > 0) {
|
if (logDirPath.canWrite() && logDirPath.isDirectory()) {
|
||||||
File logDirPath = new File(logDir);
|
sqlStmt.append(" LOGDIR ").append(logDirPath.getAbsolutePath()).append(' ');
|
||||||
logDirPath.mkdirs();
|
} else {
|
||||||
if (logDirPath.canWrite() && logDirPath.isDirectory()) {
|
throw new IOException("Unable to create log directory specified");
|
||||||
sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
|
|
||||||
} else {
|
|
||||||
throw new IOException("Unable to create log directory specified");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlStmt.append(") AS SELECT ");
|
sqlStmt.append(") AS SELECT ");
|
||||||
@ -149,7 +163,7 @@ private String getSqlStatement(int myId) throws IOException {
|
|||||||
|
|
||||||
private void initNetezzaExternalTableImport(int myId) throws IOException {
|
private void initNetezzaExternalTableImport(int myId) throws IOException {
|
||||||
|
|
||||||
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
taskAttemptDir = TaskId.getLocalWorkPath(conf);
|
||||||
|
|
||||||
this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
|
this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
|
||||||
String filename = fifoFile.toString();
|
String filename = fifoFile.toString();
|
||||||
@ -199,6 +213,10 @@ abstract protected void writeRecord(Text text, Context context)
|
|||||||
public void map(Integer dataSliceId, NullWritable val, Context context)
|
public void map(Integer dataSliceId, NullWritable val, Context context)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
conf = context.getConfiguration();
|
conf = context.getConfiguration();
|
||||||
|
localLogDir =
|
||||||
|
DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
|
||||||
|
logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
|
||||||
|
|
||||||
dbc = new DBConfiguration(conf);
|
dbc = new DBConfiguration(conf);
|
||||||
numMappers = ConfigurationHelper.getConfNumMaps(conf);
|
numMappers = ConfigurationHelper.getConfNumMaps(conf);
|
||||||
char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
|
char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
|
||||||
@ -232,7 +250,11 @@ public void map(Integer dataSliceId, NullWritable val, Context context)
|
|||||||
extTableThread.printException();
|
extTableThread.printException();
|
||||||
throw new IOException(extTableThread.getException());
|
throw new IOException(extTableThread.getException());
|
||||||
}
|
}
|
||||||
|
FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
|
||||||
|
localLogDir, logDir, context.getJobID().toString(),
|
||||||
|
conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
71
src/java/org/apache/sqoop/util/FileUploader.java
Normal file
71
src/java/org/apache/sqoop/util/FileUploader.java
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.sqoop.util;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public class FileUploader {
|
||||||
|
public static final Log LOG =
|
||||||
|
LogFactory.getLog(FileUploader.class.getName());
|
||||||
|
|
||||||
|
private FileUploader() { }
|
||||||
|
|
||||||
|
public static void uploadFilesToDFS(String srcBasePath, String src,
|
||||||
|
String destBasePath, String dest, Configuration conf) throws IOException {
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Path targetPath = null;
|
||||||
|
Path srcPath = new Path(srcBasePath, src);
|
||||||
|
|
||||||
|
if (destBasePath == null || destBasePath.length() == 0) {
|
||||||
|
destBasePath = ".";
|
||||||
|
}
|
||||||
|
|
||||||
|
targetPath = new Path(destBasePath, dest);
|
||||||
|
|
||||||
|
if (!fs.exists(targetPath)) {
|
||||||
|
fs.mkdirs(targetPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
Path targetPath2 = new Path(targetPath, src);
|
||||||
|
fs.delete(targetPath2, true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.info("Copying " + srcPath + " to " + targetPath);
|
||||||
|
// Copy srcPath (on local FS) to targetPath on DFS.
|
||||||
|
// The first boolean arg instructs not to delete source and the second
|
||||||
|
// boolean arg instructs to overwrite dest if exists.
|
||||||
|
fs.copyFromLocalFile(false, true, srcPath, targetPath);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Unable to copy " + srcPath + " to " + targetPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -155,6 +155,8 @@ public void testValidExtraArgs() throws Exception {
|
|||||||
"--",
|
"--",
|
||||||
"--log-dir", "/tmp",
|
"--log-dir", "/tmp",
|
||||||
"--max-errors", "2",
|
"--max-errors", "2",
|
||||||
|
"--trunc-string",
|
||||||
|
"--ctrl-chars"
|
||||||
};
|
};
|
||||||
String[] argv = getArgv(true, 10, 10, extraArgs);
|
String[] argv = getArgv(true, 10, 10, extraArgs);
|
||||||
runNetezzaTest(getTableName(), argv);
|
runNetezzaTest(getTableName(), argv);
|
||||||
|
Loading…
Reference in New Issue
Block a user