diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java index ea2b064e..13f3b6d1 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java @@ -79,16 +79,46 @@ protected void configureMapper(Job job, String tableName, job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); } +/* list of chars that cannot be passed via jobconf */ + final static String badXmlString = "\u0000\u0001\u0002\u0003\u0004\u0005" + + "\u0006\u0007\u0008\u000B\u000C\u000E\u000F\u0010\u0011\u0012" + + "\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001A\u001B\u001C" + + "\u001D\u001E\u001F\uFFFE\uFFFF"; + +/* true if the char is ok to pass via Configuration */ + public static boolean validXml(char s){ + return (badXmlString.indexOf(s)<0); + } + protected void propagateOptionsToJob(Job job) { super.propagateOptionsToJob(job); SqoopOptions opts = context.getOptions(); Configuration conf = job.getConfiguration(); - if (opts.getNullStringValue() != null) { - conf.set("postgresql.null.string", opts.getNullStringValue()); + + /* empty string needs to be passed as a flag */ + if (opts.getNullStringValue().equals("")) { + conf.set("postgresql.null.emptystring","true"); } - setDelimiter("postgresql.input.field.delim", - opts.getInputFieldDelim(), conf); + + /* valid delimiters may not be valid xml chars, so the hadoop conf will fail. + * but we still want to support them so we base64 encode it in that case + * */ + char delim= opts.getInputFieldDelim(); + String delimString=Character.toString(delim); + if(validXml(delim)){ + setDelimiter("postgresql.input.field.delim",delim,conf); + }else{ + conf.set("postgresql.input.field.delim.base64", + java.util.Base64.getEncoder().encodeToString(delimString.getBytes())); + } + + /* use the --batch switch to enable line buffering */ + if (opts.isBatchMode()){ + conf.set("postgresql.export.batchmode","true"); + } + +/* todo: there may still be some case where user wants an invalid xml char for record delim */ setDelimiter("postgresql.input.record.delim", opts.getInputRecordDelim(), conf); setDelimiter("postgresql.input.enclosedby", diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java index cf9a3cd1..82e26b53 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportMapper.java @@ -36,6 +36,7 @@ import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import org.postgresql.copy.CopyIn; +import org.apache.sqoop.util.LineBuffer; /** @@ -52,6 +53,10 @@ public class PostgreSQLCopyExportMapper public static final Log LOG = LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName()); + private boolean bufferMode=false; /* whether or not to use the line buffer */ + private LineBuffer lineBuffer; /* batch up the lines before sending to copy */ + private boolean isRaw=false; /* if isRaw then we won't interprete escapes */ + private Configuration conf; private DBConfiguration dbConf; private Connection conn = null; @@ -63,6 +68,17 @@ public class PostgreSQLCopyExportMapper public PostgreSQLCopyExportMapper() { } + /* Text mode normally interprets escape sequences. Optionally + * turn that off by escaping the escapes + * */ + public String fixEscapes(String s){ + if (isRaw){ + return s.replace("\\","\\\\"); + } else { + return s; + } + } + @Override protected void setup(Context context) @@ -71,6 +87,7 @@ protected void setup(Context context) super.setup(context); conf = context.getConfiguration(); dbConf = new DBConfiguration(conf); + lineBuffer=new LineBuffer(); CopyManager cm = null; try { conn = dbConf.getConnection(); @@ -83,69 +100,141 @@ protected void setup(Context context) throw new IOException(ex); } try { - StringBuilder sql = new StringBuilder(); - sql.append("COPY "); - sql.append(dbConf.getOutputTableName()); - sql.append(" FROM STDIN WITH ("); - sql.append(" ENCODING 'UTF-8' "); - sql.append(", FORMAT csv "); - sql.append(", DELIMITER "); - sql.append("'"); - sql.append(conf.get("postgresql.input.field.delim", ",")); - sql.append("'"); - sql.append(", QUOTE "); - sql.append("'"); - sql.append(conf.get("postgresql.input.enclosedby", "\"")); - sql.append("'"); - sql.append(", ESCAPE "); - sql.append("'"); - sql.append(conf.get("postgresql.input.escapedby", "\"")); - sql.append("'"); - if (conf.get("postgresql.null.string") != null) { - sql.append(", NULL "); - sql.append("'"); - sql.append(conf.get("postgresql.null.string")); - sql.append("'"); - } - sql.append(")"); + /* Set if buffering mode is requested */ + this.bufferMode=("true".equals(conf.get("postgresql.export.batchmode"))); + + /* isRaw means escapes are NOT to be interpreted */ + this.isRaw=("true".equals(conf.get("postgresql.input.israw"))); + + /* add support for delims which are not valid xml. We have base64 encoded them */ + String delimBase64=conf.get("postgresql.input.field.delim.base64"); + String delim=null; + if (delimBase64!=null){ + delim=new String(java.util.Base64.getDecoder().decode(delimBase64)); + } else { + delim=conf.get("postgresql.input.field.delim",","); + } + + /* Some postgres instances out there still using version 8.x */ + StringBuilder sql = new StringBuilder(); + String ver=conf.get("postgresql.targetdb.ver", "9"); + if (ver.equals("8")){ + sql.append("COPY "); + sql.append(dbConf.getOutputTableName()); + sql.append(" FROM STDIN WITH "); + sql.append(" DELIMITER "); + sql.append("'"); + sql.append(delim); + sql.append("'"); + if (! "true".equals(conf.get("postgresql.format.text"))){ + sql.append(" CSV "); + sql.append(" QUOTE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.enclosedby", "\"")); + sql.append("'"); + sql.append(" ESCAPE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.escapedby", "\"")); + sql.append("'"); + } + /* Hadoop config does not permit empty string so we use special switch to designate that */ + if (conf.get("postgresql.null.emptystring")!=null){ + sql.append(" NULL ''"); + }else + if (conf.get("postgresql.null.string") != null) { + sql.append(" NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } + } else { /* intended for version 9.x This has not been fixed for buffering */ + sql.append("COPY "); + sql.append(dbConf.getOutputTableName()); + sql.append(" FROM STDIN WITH ("); + sql.append(" ENCODING 'UTF-8' "); + sql.append(", FORMAT csv "); + sql.append(", DELIMITER "); + sql.append("'"); + sql.append(conf.get("postgresql.input.field.delim", ",")); + sql.append("'"); + sql.append(", QUOTE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.enclosedby", "\"")); + sql.append("'"); + sql.append(", ESCAPE "); + sql.append("'"); + sql.append(conf.get("postgresql.input.escapedby", "\"")); + sql.append("'"); + if (conf.get("postgresql.null.string") != null) { + sql.append(", NULL "); + sql.append("'"); + sql.append(conf.get("postgresql.null.string")); + sql.append("'"); + } + sql.append(")"); + } LOG.debug("Starting export with copy: " + sql); copyin = cm.copyIn(sql.toString()); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); - close(); - throw new IOException(ex); - } + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex); + close(); + throw new IOException(ex); + } } @Override public void map(LongWritable key, Writable value, Context context) throws IOException, InterruptedException { - line.setLength(0); - line.append(value.toString()); - if (value instanceof Text) { - line.append(System.getProperty("line.separator")); - } - try { - byte[]data = line.toString().getBytes("UTF-8"); - copyin.writeToCopy(data, 0, data.length); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to execute copy", ex); - close(); - throw new IOException(ex); - } + if (bufferMode){ + if (lineBuffer.append(fixEscapes(value.toString()))){ + return; + } + /* else buffer is full lets write out */ + try { + byte[]data=lineBuffer.getBytes(); + copyin.writeToCopy(data,0,data.length); + lineBuffer.clear(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute copy", ex); + close(); + throw new IOException(ex); + } + + /* now write the new line that could not be appended because the buffer was full */ + lineBuffer.append(fixEscapes(value.toString())); + } else { /* original unbuffered method */ + line.setLength(0); + line.append(value.toString()); + if (value instanceof Text) { + line.append(System.getProperty("line.separator")); + } + try { + byte[]data = line.toString().getBytes("UTF-8"); + copyin.writeToCopy(data, 0, data.length); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to execute copy", ex); + close(); + throw new IOException(ex); + } + } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { - try { - copyin.endCopy(); - } catch (SQLException ex) { - LoggingUtils.logAll(LOG, "Unable to finalize copy", ex); - throw new IOException(ex); + try { /* write out the final fragment in the buffer */ + if (bufferMode){ + byte[]data=lineBuffer.getBytes(); + copyin.writeToCopy(data,0,data.length); + lineBuffer.clear(); + } + copyin.endCopy(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to finalize copy", ex); + throw new IOException(ex); + } + close(); } - close(); - } void close() throws IOException { if (conn != null) { diff --git a/src/java/org/apache/sqoop/util/LineBuffer.java b/src/java/org/apache/sqoop/util/LineBuffer.java new file mode 100644 index 00000000..474ac552 --- /dev/null +++ b/src/java/org/apache/sqoop/util/LineBuffer.java @@ -0,0 +1,33 @@ +package org.apache.sqoop.util; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* Buffer up lines of text until buffer is full. + * This helper class is just to support a test to + * see if the copyIn mapper for postgres is unbuffered + * + * */ +public class LineBuffer{ + public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName()); + + private StringBuilder sb=new StringBuilder(); + private static int MAXLEN=100000000; + //private static int MAXLEN=50000000; + + public void clear(){ sb.setLength(0); } + public int length(){return sb.length();} + public boolean append(String s){ +// LOG.debug(s); + if (sb.length()+s.length()+1>MAXLEN){return false;} + sb.append(s); + sb.append("\n"); + return true; + } + public String toString(){return sb.toString();} + public byte[] getBytes() { + try { + //LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8"))); + return sb.toString().getBytes("UTF-8"); + }catch(Exception e){e.printStackTrace();return null;} + } +}