From be539f183b6ac24c5314c9bee95ce57c2c96803c Mon Sep 17 00:00:00 2001 From: Robert B Hamilton Date: Tue, 22 Jan 2019 12:06:01 -0500 Subject: [PATCH] cleanups: 1. moved LineBuffer to inner class of the Export Mapper 2. extended support to 9.x direct copy 3. Added test case --- .../PostgreSQLCopyExportMapper.java | 49 ++++++++++++++++--- .../org/apache/sqoop/util/LineBuffer.java | 33 ------------- .../postgresql/PostgresqlExportTest.java | 14 ++++++ 3 files changed, 56 insertions(+), 40 deletions(-) delete mode 100644 src/java/org/apache/sqoop/util/LineBuffer.java diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java index 82e26b53..100d7875 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java @@ -36,7 +36,7 @@ import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import org.postgresql.copy.CopyIn; -import org.apache.sqoop.util.LineBuffer; +//import org.apache.sqoop.util.LineBuffer; /** @@ -66,6 +66,36 @@ public class PostgreSQLCopyExportMapper new DelimiterSet(',', '\n', DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false); +/* Buffer up lines of text until buffer is full. + * This helper class is just to support a test to + * see if the copyIn mapper for postgres is unbuffered + * + * */ +static class LineBuffer{ + public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); + + private StringBuilder sb=new StringBuilder(); + private static int MAXLEN=100000000; + //private static int MAXLEN=50000000; + + public void clear(){ sb.setLength(0); } + public int length(){return sb.length();} + public boolean append(String s){ +// LOG.debug(s); + if (sb.length()+s.length()+1>MAXLEN){return false;} + sb.append(s); + sb.append("\n"); + return true; + } + public String toString(){return sb.toString();} + public byte[] getBytes() { + try { + //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); + return sb.toString().getBytes("UTF-8"); + }catch(Exception e){e.printStackTrace();return null;} + } +} + public PostgreSQLCopyExportMapper() { } /* Text mode normally interprets escape sequences. Optionally @@ -155,7 +185,7 @@ protected void setup(Context context) sql.append(", FORMAT csv "); sql.append(", DELIMITER "); sql.append("'"); - sql.append(conf.get("postgresql.input.field.delim", ",")); + sql.append(delim); sql.append("'"); sql.append(", QUOTE "); sql.append("'"); @@ -165,11 +195,16 @@ protected void setup(Context context) sql.append("'"); sql.append(conf.get("postgresql.input.escapedby", "\"")); sql.append("'"); - if (conf.get("postgresql.null.string") != null) { - sql.append(", NULL "); - sql.append("'"); - sql.append(conf.get("postgresql.null.string")); - sql.append("'"); + /* Hadoop config does not permit empty string so we use special switch to designate that */ + if (conf.get("postgresql.null.emptystring")!=null){ + sql.append(", NULL ''"); + }else { + if (conf.get("postgresql.null.string") != null) { + sql.append(", NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } } sql.append(")"); } diff --git a/src/java/org/apache/sqoop/util/LineBuffer.java b/src/java/org/apache/sqoop/util/LineBuffer.java deleted file mode 100644 index 474ac552..00000000 --- a/src/java/org/apache/sqoop/util/LineBuffer.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.sqoop.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/* Buffer up lines of text until buffer is full. - * This helper class is just to support a test to - * see if the copyIn mapper for postgres is unbuffered - * - * */ -public class LineBuffer{ - public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); - - private StringBuilder sb=new StringBuilder(); - private static int MAXLEN=100000000; - //private static int MAXLEN=50000000; - - public void clear(){ sb.setLength(0); } - public int length(){return sb.length();} - public boolean append(String s){ -// LOG.debug(s); - if (sb.length()+s.length()+1>MAXLEN){return false;} - sb.append(s); - sb.append("\n"); - return true; - } - public String toString(){return sb.toString();} - public byte[] getBytes() { - try { - //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); - return sb.toString().getBytes("UTF-8"); - }catch(Exception e){e.printStackTrace();return null;} - } -} diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java index 457b398a..5361a427 100644 --- a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java +++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java @@ -352,6 +352,19 @@ public void testExportDirect() throws IOException, SQLException { "3,Fred,2009-01-23,15,marketing", }); + String[] extra = new String[] {"--direct","--batch"}; + + runExport(getArgv(true, extra)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + @Test + public void testExportDirectBatch() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + String[] extra = new String[] {"--direct"}; runExport(getArgv(true, extra)); @@ -359,6 +372,7 @@ public void testExportDirect() throws IOException, SQLException { assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); } + @Test public void testExportCustomSchema() throws IOException, SQLException { createTestFile("inputFile", new String[] {