diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index 496d3cf8..c5ce4d6a 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -381,31 +381,47 @@ Argument Description of data slices of a table or all\ Default is "false" for standard mode\ and "true" for direct mode. -+--max-errors+ Applicable only in direct mode.\ ++--max-errors+ Applicable only for direct mode export.\ This option specifies the error threshold\ per mapper while transferring data. If\ the number of errors encountered exceed\ this threshold then the job will fail. Default value is 1. -+--log-dir+ Applicable only in direct mode.\ ++--log-dir+ Applicable only for direct mode export.\ Specifies the directory where Netezza\ external table operation logs are stored\ on the hadoop filesystem. Logs are\ stored under this directory with one\ directory for the job and sub-directories\ for each task number and attempt.\ - Default value is the user home directory. -+--trunc-string+ Applicable only in direct mode.\ + Default value is the user home directory.\ + The nzlog and nzbad files will be under + (logdir)/job-id/job-attempt-id. ++--trunc-string+ Applicable only for direct mode export.\ Specifies whether the system \ truncates strings to the declared\ storage and loads the data. By default\ truncation of strings is reported as an\ error. -+--ctrl-chars+ Applicable only in direct mode.\ ++--ctrl-chars+ Applicable only for direct mode export.\ Specifies whether control characters \ (ASCII chars 1 - 31) can be allowed \ to be part of char/nchar/varchar/nvarchar\ columns. Default is false. ++--crin-string+ Applicable only for direct mode export.\ + Specifies whether carriage return \ + (ASCII char 13) can be allowed \ + to be part of char/nchar/varchar/nvarchar\ + columns. Note that CR can no longer \ + be a record delimiter with this option.\ + Default is false. ++--ignore-zero+ Applicable only for direct mode export.\ + Specifies whether NUL character \ + (ASCII char 0) should be scanned \ + and ignored as part of the data loaded\ + into char/nchar/varchar/nvarchar \ + columns.\ + Default is false. -------------------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java index 06fa9762..2ec07700 100644 --- a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java +++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java @@ -64,6 +64,18 @@ public class DirectNetezzaManager extends NetezzaManager { public static final String NETEZZA_CTRL_CHARS_LONG_ARG = "ctrl-chars"; + + public static final String NETEZZA_CRIN_STRING_OPT = + "netezza.crin.string"; + public static final String NETEZZA_CRIN_STRING_LONG_ARG = + "crin-string"; + + + public static final String NETEZZA_IGNORE_ZERO_OPT = + "netezza.ignore.zero"; + public static final String NETEZZA_IGNORE_ZERO_LONG_ARG = + "ignore-zero"; + public static final String NETEZZA_TRUNC_STRING_OPT = "netezza.trunc.string"; public static final String NETEZZA_TRUNC_STRING_LONG_ARG = @@ -268,6 +280,12 @@ protected RelatedOptions getNetezzaExtraOpts() { netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT) .withDescription("Truncate string to declared storage size") .withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create()); + netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CRIN_STRING_OPT) + .withDescription("Truncate string to declared storage size") + .withLongOpt(NETEZZA_CRIN_STRING_LONG_ARG).create()); + netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_IGNORE_ZERO_OPT) + .withDescription("Truncate string to declared storage size") + .withLongOpt(NETEZZA_IGNORE_ZERO_LONG_ARG).create()); return netezzaOpts; } @@ -296,6 +314,12 @@ private void handleNetezzaExtraArgs(SqoopOptions opts) conf.setBoolean(NETEZZA_TRUNC_STRING_OPT, cmdLine.hasOption(NETEZZA_TRUNC_STRING_LONG_ARG)); + conf.setBoolean(NETEZZA_CRIN_STRING_OPT, + cmdLine.hasOption(NETEZZA_CRIN_STRING_LONG_ARG)); + + conf.setBoolean(NETEZZA_IGNORE_ZERO_OPT, + cmdLine.hasOption(NETEZZA_IGNORE_ZERO_LONG_ARG)); + // Always true for Netezza direct mode access conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true); } diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java index f377fb90..aa058d14 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java @@ -87,7 +87,10 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false); boolean truncString = conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false); - + boolean ignoreZero = + conf.getBoolean(DirectNetezzaManager.NETEZZA_IGNORE_ZERO_OPT, false); + boolean crinString = + conf.getBoolean(DirectNetezzaManager.NETEZZA_CRIN_STRING_OPT, false); StringBuilder sqlStmt = new StringBuilder(2048); sqlStmt.append("INSERT INTO "); @@ -96,13 +99,20 @@ private String getSqlStatement(DelimiterSet delimiters) throws IOException { sqlStmt.append(fifoFile.getAbsolutePath()); sqlStmt.append("' USING (REMOTESOURCE 'JDBC' "); sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' "); - sqlStmt.append(" CRINSTRING FALSE "); + if (crinString) { + sqlStmt.append(" CRINSTRING TRUE "); + } else { + sqlStmt.append(" CRINSTRING FALSE "); + } if (ctrlChars) { sqlStmt.append(" CTRLCHARS TRUE "); } if (truncString) { sqlStmt.append(" TRUNCSTRING TRUE "); } + if (ignoreZero) { + sqlStmt.append(" IGNOREZERO TRUE "); + } sqlStmt.append(" DELIMITER "); sqlStmt.append(Integer.toString(fd)); sqlStmt.append(" ENCODING 'internal' "); @@ -228,18 +238,24 @@ public void run(Context context) throws IOException, InterruptedException { } cleanup(context); } finally { - recordWriter.close(); - extTableThread.join(); + try { + recordWriter.close(); + extTableThread.join(); + } catch (Exception e) { + LOG.debug("Exception cleaning up mapper operation : " + e.getMessage()); + } counter.stopClock(); LOG.info("Transferred " + counter.toString()); + FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), + localLogDir, logDir, context.getJobID().toString(), + conf); + if (extTableThread.hasExceptions()) { extTableThread.printException(); throw new IOException(extTableThread.getException()); } } - FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(), - localLogDir, logDir, context.getJobID().toString(), - conf); + } }