From fca329b326df75121f6be6411dcd31b3886249cc Mon Sep 17 00:00:00 2001 From: Abraham Elmahrek Date: Tue, 10 Mar 2015 23:14:52 -0700 Subject: [PATCH] SQOOP-2164: Enhance the Netezza Connector for Sqoop (Venkat Ranganathan via Abraham Elmahrek) --- src/docs/user/connectors.txt | 19 ++++- .../sqoop/manager/DirectNetezzaManager.java | 39 +++++++++- .../NetezzaExternalTableExportMapper.java | 56 +++++++++++---- .../NetezzaExternalTableImportMapper.java | 48 +++++++++---- .../org/apache/sqoop/util/FileUploader.java | 71 +++++++++++++++++++ .../DirectNetezzaExportManualTest.java | 2 + 6 files changed, 205 insertions(+), 30 deletions(-) create mode 100644 src/java/org/apache/sqoop/util/FileUploader.java diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index fee40d9d..496d3cf8 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -389,8 +389,23 @@ Argument Description Default value is 1. +--log-dir+ Applicable only in direct mode.\ Specifies the directory where Netezza\ - external table operation logs are stored.\ - Default value is /tmp. + external table operation logs are stored\ + on the hadoop filesystem. Logs are\ + stored under this directory with one\ + directory for the job and sub-directories\ + for each task number and attempt.\ + Default value is the user home directory. ++--trunc-string+ Applicable only in direct mode.\ + Specifies whether the system \ + truncates strings to the declared\ + storage and loads the data. By default\ + truncation of strings is reported as an\ + error. ++--ctrl-chars+ Applicable only in direct mode.\ + Specifies whether control characters \ + (ASCII chars 1 - 31) can be allowed \ + to be part of char/nchar/varchar/nvarchar\ + columns. Default is false. -------------------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java index 9ca8f63d..daddb8ce 100644 --- a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java +++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob; import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob; @@ -58,11 +59,24 @@ public class DirectNetezzaManager extends NetezzaManager { public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG = "max-errors"; + public static final String NETEZZA_CTRL_CHARS_OPT = + "netezza.ctrl.chars"; + public static final String NETEZZA_CTRL_CHARS_LONG_ARG = + "ctrl-chars"; + + public static final String NETEZZA_TRUNC_STRING_OPT = + "netezza.trunc.string"; + public static final String NETEZZA_TRUNC_STRING_LONG_ARG = + "trunc-string"; + + + private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE = "SELECT 1 FROM _V_TABLE WHERE OWNER= ? " + " AND TABLENAME = ?"; public static final String NETEZZA_NULL_VALUE = "netezza.exttable.null.value"; + public DirectNetezzaManager(SqoopOptions opts) { super(opts); try { @@ -159,7 +173,7 @@ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context) options = context.getOptions(); context.setConnManager(this); - checkTable(); // Throws excpetion as necessary + checkTable(); // Throws exception as necessary NetezzaExternalTableExportJob exporter = null; char qc = (char) options.getInputEnclosedBy(); @@ -248,6 +262,12 @@ protected RelatedOptions getNetezzaExtraOpts() { netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT) .hasArg().withDescription("Netezza log directory") .withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create()); + netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CTRL_CHARS_OPT) + .withDescription("Allow control chars in data") + .withLongOpt(NETEZZA_CTRL_CHARS_LONG_ARG).create()); + netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT) + .withDescription("Truncate string to declared storage size") + .withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create()); return netezzaOpts; } @@ -270,6 +290,12 @@ private void handleNetezzaExtraArgs(SqoopOptions opts) conf.set(NETEZZA_LOG_DIR_OPT, dir); } + conf.setBoolean(NETEZZA_CTRL_CHARS_OPT, + cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG)); + + conf.setBoolean(NETEZZA_CTRL_CHARS_OPT, + cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG)); + // Always true for Netezza direct mode access conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true); } @@ -291,4 +317,15 @@ public boolean isORMFacilitySelfManaged() { public boolean isDirectModeHCatSupported() { return true; } + + + public static String getLocalLogDir(TaskAttemptID attemptId) { + int tid = attemptId.getTaskID().getId(); + int aid = attemptId.getId(); + String jid = attemptId.getJobID().toString(); + StringBuilder sb = new StringBuilder(jid).append('-'); + sb.append(tid).append('-').append(aid); + String localLogDir = sb.toString(); + return localLogDir; + } } diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java index 3613ff2b..f377fb90 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java @@ -20,15 +20,21 @@ import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.sql.Connection; import java.sql.SQLException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.sqoop.io.NamedFifo; @@ -36,6 +42,7 @@ import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.mapreduce.SqoopMapper; import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.FileUploader; import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.TaskId; @@ -61,6 +68,9 @@ public abstract class NetezzaExternalTableExportMapper extends private NetezzaJDBCStatementRunner extTableThread; private PerfCounters counter; private DelimiterSet outputDelimiters; + private String localLogDir = null; + private String logDir = null; + private File taskAttemptDir = null; private String getSqlStatement(DelimiterSet delimiters) throws IOException { @@ -72,7 +82,11 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { int errorThreshold = conf.getInt( DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1); - String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); + + boolean ctrlChars = + conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false); + boolean truncString = + conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false); StringBuilder sqlStmt = new StringBuilder(2048); @@ -83,6 +97,12 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { sqlStmt.append("' USING (REMOTESOURCE 'JDBC' "); sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' "); sqlStmt.append(" CRINSTRING FALSE "); + if (ctrlChars) { + sqlStmt.append(" CTRLCHARS TRUE "); + } + if (truncString) { + sqlStmt.append(" TRUNCSTRING TRUE "); + } sqlStmt.append(" DELIMITER "); sqlStmt.append(Integer.toString(fd)); sqlStmt.append(" ENCODING 'internal' "); @@ -112,19 +132,18 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { } sqlStmt.append(" MAXERRORS ").append(errorThreshold); - if (logDir != null) { - logDir = logDir.trim(); - if (logDir.length() > 0) { - File logDirPath = new File(logDir); - logDirPath.mkdirs(); - if (logDirPath.canWrite() && logDirPath.isDirectory()) { - sqlStmt.append(" LOGDIR ").append(logDir).append(' '); - } else { - throw new IOException("Unable to create log directory specified"); - } - } - } - sqlStmt.append(")"); + + + File logDirPath = new File(taskAttemptDir, localLogDir); + logDirPath.mkdirs(); + if (logDirPath.canWrite() && logDirPath.isDirectory()) { + sqlStmt.append(" LOGDIR ") + .append(logDirPath.getAbsolutePath()).append(' '); + } else { + throw new IOException("Unable to create log directory specified"); + } + + sqlStmt.append(")"); String stmt = sqlStmt.toString(); LOG.debug("SQL generated for external table export" + stmt); @@ -135,6 +154,12 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { private void initNetezzaExternalTableExport(Context context) throws IOException { this.conf = context.getConfiguration(); + + taskAttemptDir = TaskId.getLocalWorkPath(conf); + localLogDir = + DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID()); + logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); + dbc = new DBConfiguration(conf); File taskAttemptDir = TaskId.getLocalWorkPath(conf); @@ -212,6 +237,9 @@ public void run(Context context) throws IOException, InterruptedException { throw new IOException(extTableThread.getException()); } } + FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), + localLogDir, logDir, context.getJobID().toString(), + conf); } } diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java index 2f4c1529..486ba7c5 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java @@ -21,14 +21,19 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.SQLException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.sqoop.config.ConfigurationHelper; @@ -36,6 +41,7 @@ import org.apache.sqoop.lib.DelimiterSet; import org.apache.sqoop.manager.DirectNetezzaManager; import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.util.FileUploader; import org.apache.sqoop.util.PerfCounters; import org.apache.sqoop.util.TaskId; @@ -61,7 +67,9 @@ public abstract class NetezzaExternalTableImportMapper extends .getLog(NetezzaExternalTableImportMapper.class.getName()); private NetezzaJDBCStatementRunner extTableThread; private PerfCounters counter; - + private String localLogDir = null; + private String logDir = null; + private File taskAttemptDir = null; private String getSqlStatement(int myId) throws IOException { char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ','); @@ -70,6 +78,11 @@ private String getSqlStatement(int myId) throws IOException { String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE); + boolean ctrlChars = + conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false); + boolean truncString = + conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false); + int errorThreshold = conf.getInt( DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1); String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); @@ -82,6 +95,12 @@ private String getSqlStatement(int myId) throws IOException { sqlStmt.append("' USING (REMOTESOURCE 'JDBC' "); sqlStmt.append(" BOOLSTYLE 'T_F' "); sqlStmt.append(" CRINSTRING FALSE "); + if (ctrlChars) { + sqlStmt.append(" CTRLCHARS TRUE "); + } + if (truncString) { + sqlStmt.append(" TRUNCSTRING TRUE "); + } sqlStmt.append(" DELIMITER "); sqlStmt.append(Integer.toString(fd)); sqlStmt.append(" ENCODING 'internal' "); @@ -112,17 +131,12 @@ private String getSqlStatement(int myId) throws IOException { sqlStmt.append(" MAXERRORS ").append(errorThreshold); - if (logDir != null) { - logDir = logDir.trim(); - if (logDir.length() > 0) { - File logDirPath = new File(logDir); - logDirPath.mkdirs(); - if (logDirPath.canWrite() && logDirPath.isDirectory()) { - sqlStmt.append(" LOGDIR ").append(logDir).append(' '); - } else { - throw new IOException("Unable to create log directory specified"); - } - } + File logDirPath = new File(taskAttemptDir, localLogDir); + logDirPath.mkdirs(); + if (logDirPath.canWrite() && logDirPath.isDirectory()) { + sqlStmt.append(" LOGDIR ").append(logDirPath.getAbsolutePath()).append(' '); + } else { + throw new IOException("Unable to create log directory specified"); } sqlStmt.append(") AS SELECT "); @@ -149,7 +163,7 @@ private String getSqlStatement(int myId) throws IOException { private void initNetezzaExternalTableImport(int myId) throws IOException { - File taskAttemptDir = TaskId.getLocalWorkPath(conf); + taskAttemptDir = TaskId.getLocalWorkPath(conf); this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt")); String filename = fifoFile.toString(); @@ -199,6 +213,10 @@ abstract protected void writeRecord(Text text, Context context) public void map(Integer dataSliceId, NullWritable val, Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); + localLogDir = + DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID()); + logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT); + dbc = new DBConfiguration(conf); numMappers = ConfigurationHelper.getConfNumMaps(conf); char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n'); @@ -232,7 +250,11 @@ public void map(Integer dataSliceId, NullWritable val, Context context) extTableThread.printException(); throw new IOException(extTableThread.getException()); } + FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), + localLogDir, logDir, context.getJobID().toString(), + conf); } } } + } diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java new file mode 100644 index 00000000..155cffce --- /dev/null +++ b/src/java/org/apache/sqoop/util/FileUploader.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FileUploader { + public static final Log LOG = + LogFactory.getLog(FileUploader.class.getName()); + + private FileUploader() { } + + public static void uploadFilesToDFS(String srcBasePath, String src, + String destBasePath, String dest, Configuration conf) throws IOException { + + FileSystem fs = FileSystem.get(conf); + Path targetPath = null; + Path srcPath = new Path(srcBasePath, src); + + if (destBasePath == null || destBasePath.length() == 0) { + destBasePath = "."; + } + + targetPath = new Path(destBasePath, dest); + + if (!fs.exists(targetPath)) { + fs.mkdirs(targetPath); + } + + Path targetPath2 = new Path(targetPath, src); + fs.delete(targetPath2, true); + + try { + LOG.info("Copying " + srcPath + " to " + targetPath); + // Copy srcPath (on local FS) to targetPath on DFS. + // The first boolean arg instructs not to delete source and the second + // boolean arg instructs to overwrite dest if exists. + fs.copyFromLocalFile(false, true, srcPath, targetPath); + } catch (IOException ioe) { + LOG.warn("Unable to copy " + srcPath + " to " + targetPath); + } + } + +} diff --git a/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java b/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java index f7b68ed2..92012c47 100644 --- a/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java +++ b/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java @@ -155,6 +155,8 @@ public void testValidExtraArgs() throws Exception { "--", "--log-dir", "/tmp", "--max-errors", "2", + "--trunc-string", + "--ctrl-chars" }; String[] argv = getArgv(true, 10, 10, extraArgs); runNetezzaTest(getTableName(), argv);