5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-03 00:29:08 +08:00

Changes for Postgresql Copy Export:

1. Support for empty null string        --null-string ''
 2. Support for non-xml delim            --fields-terminated-by'\0x1c'
 3. Added line buffering perf            --batch
 4. optional TEXT mode instead of CSV    -Dpostgresql.format.text=true
 5. Support for postgres Version 8       -Dpostgresql.targetdb.ver=8
 6. Optional disable escape sequences    -Dpostgresql.input.israw=true
This commit is contained in:
Robert B Hamilton 2019-01-21 20:17:11 -05:00
parent 032b828370
commit 123641f3a9
3 changed files with 206 additions and 54 deletions

View File

@ -79,16 +79,46 @@ protected void configureMapper(Job job, String tableName,
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
}
/* list of chars that cannot be passed via jobconf */
final static String badXmlString = "\u0000\u0001\u0002\u0003\u0004\u0005" +
"\u0006\u0007\u0008\u000B\u000C\u000E\u000F\u0010\u0011\u0012" +
"\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001A\u001B\u001C" +
"\u001D\u001E\u001F\uFFFE\uFFFF";
/* true if the char is ok to pass via Configuration */
public static boolean validXml(char s){
return (badXmlString.indexOf(s)<0);
}
protected void propagateOptionsToJob(Job job) {
super.propagateOptionsToJob(job);
SqoopOptions opts = context.getOptions();
Configuration conf = job.getConfiguration();
if (opts.getNullStringValue() != null) {
conf.set("postgresql.null.string", opts.getNullStringValue());
/* empty string needs to be passed as a flag */
if (opts.getNullStringValue().equals("")) {
conf.set("postgresql.null.emptystring","true");
}
setDelimiter("postgresql.input.field.delim",
opts.getInputFieldDelim(), conf);
/* valid delimiters may not be valid xml chars, so the hadoop conf will fail.
* but we still want to support them so we base64 encode it in that case
* */
char delim= opts.getInputFieldDelim();
String delimString=Character.toString(delim);
if(validXml(delim)){
setDelimiter("postgresql.input.field.delim",delim,conf);
}else{
conf.set("postgresql.input.field.delim.base64",
java.util.Base64.getEncoder().encodeToString(delimString.getBytes()));
}
/* use the --batch switch to enable line buffering */
if (opts.isBatchMode()){
conf.set("postgresql.export.batchmode","true");
}
/* todo: there may still be some case where user wants an invalid xml char for record delim */
setDelimiter("postgresql.input.record.delim",
opts.getInputRecordDelim(), conf);
setDelimiter("postgresql.input.enclosedby",

View File

@ -36,6 +36,7 @@
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.postgresql.copy.CopyIn;
import org.apache.sqoop.util.LineBuffer;
/**
@ -52,6 +53,10 @@ public class PostgreSQLCopyExportMapper
public static final Log LOG =
LogFactory.getLog(PostgreSQLCopyExportMapper.class.getName());
private boolean bufferMode=false; /* whether or not to use the line buffer */
private LineBuffer lineBuffer; /* batch up the lines before sending to copy */
private boolean isRaw=false; /* if isRaw then we won't interprete escapes */
private Configuration conf;
private DBConfiguration dbConf;
private Connection conn = null;
@ -63,6 +68,17 @@ public class PostgreSQLCopyExportMapper
public PostgreSQLCopyExportMapper() {
}
/* Text mode normally interprets escape sequences. Optionally
* turn that off by escaping the escapes
* */
public String fixEscapes(String s){
if (isRaw){
return s.replace("\\","\\\\");
} else {
return s;
}
}
@Override
protected void setup(Context context)
@ -71,6 +87,7 @@ protected void setup(Context context)
super.setup(context);
conf = context.getConfiguration();
dbConf = new DBConfiguration(conf);
lineBuffer=new LineBuffer();
CopyManager cm = null;
try {
conn = dbConf.getConnection();
@ -83,69 +100,141 @@ protected void setup(Context context)
throw new IOException(ex);
}
try {
StringBuilder sql = new StringBuilder();
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH (");
sql.append(" ENCODING 'UTF-8' ");
sql.append(", FORMAT csv ");
sql.append(", DELIMITER ");
sql.append("'");
sql.append(conf.get("postgresql.input.field.delim", ","));
sql.append("'");
sql.append(", QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(", ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
if (conf.get("postgresql.null.string") != null) {
sql.append(", NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
sql.append(")");
/* Set if buffering mode is requested */
this.bufferMode=("true".equals(conf.get("postgresql.export.batchmode")));
/* isRaw means escapes are NOT to be interpreted */
this.isRaw=("true".equals(conf.get("postgresql.input.israw")));
/* add support for delims which are not valid xml. We have base64 encoded them */
String delimBase64=conf.get("postgresql.input.field.delim.base64");
String delim=null;
if (delimBase64!=null){
delim=new String(java.util.Base64.getDecoder().decode(delimBase64));
} else {
delim=conf.get("postgresql.input.field.delim",",");
}
/* Some postgres instances out there still using version 8.x */
StringBuilder sql = new StringBuilder();
String ver=conf.get("postgresql.targetdb.ver", "9");
if (ver.equals("8")){
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH ");
sql.append(" DELIMITER ");
sql.append("'");
sql.append(delim);
sql.append("'");
if (! "true".equals(conf.get("postgresql.format.text"))){
sql.append(" CSV ");
sql.append(" QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(" ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
}
/* Hadoop config does not permit empty string so we use special switch to designate that */
if (conf.get("postgresql.null.emptystring")!=null){
sql.append(" NULL ''");
}else
if (conf.get("postgresql.null.string") != null) {
sql.append(" NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
} else { /* intended for version 9.x This has not been fixed for buffering */
sql.append("COPY ");
sql.append(dbConf.getOutputTableName());
sql.append(" FROM STDIN WITH (");
sql.append(" ENCODING 'UTF-8' ");
sql.append(", FORMAT csv ");
sql.append(", DELIMITER ");
sql.append("'");
sql.append(conf.get("postgresql.input.field.delim", ","));
sql.append("'");
sql.append(", QUOTE ");
sql.append("'");
sql.append(conf.get("postgresql.input.enclosedby", "\""));
sql.append("'");
sql.append(", ESCAPE ");
sql.append("'");
sql.append(conf.get("postgresql.input.escapedby", "\""));
sql.append("'");
if (conf.get("postgresql.null.string") != null) {
sql.append(", NULL ");
sql.append("'");
sql.append(conf.get("postgresql.null.string"));
sql.append("'");
}
sql.append(")");
}
LOG.debug("Starting export with copy: " + sql);
copyin = cm.copyIn(sql.toString());
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
close();
throw new IOException(ex);
}
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to get CopyIn", ex);
close();
throw new IOException(ex);
}
}
@Override
public void map(LongWritable key, Writable value, Context context)
throws IOException, InterruptedException {
line.setLength(0);
line.append(value.toString());
if (value instanceof Text) {
line.append(System.getProperty("line.separator"));
}
try {
byte[]data = line.toString().getBytes("UTF-8");
copyin.writeToCopy(data, 0, data.length);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
if (bufferMode){
if (lineBuffer.append(fixEscapes(value.toString()))){
return;
}
/* else buffer is full lets write out */
try {
byte[]data=lineBuffer.getBytes();
copyin.writeToCopy(data,0,data.length);
lineBuffer.clear();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
/* now write the new line that could not be appended because the buffer was full */
lineBuffer.append(fixEscapes(value.toString()));
} else { /* original unbuffered method */
line.setLength(0);
line.append(value.toString());
if (value instanceof Text) {
line.append(System.getProperty("line.separator"));
}
try {
byte[]data = line.toString().getBytes("UTF-8");
copyin.writeToCopy(data, 0, data.length);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
try {
copyin.endCopy();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
throw new IOException(ex);
try { /* write out the final fragment in the buffer */
if (bufferMode){
byte[]data=lineBuffer.getBytes();
copyin.writeToCopy(data,0,data.length);
lineBuffer.clear();
}
copyin.endCopy();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
throw new IOException(ex);
}
close();
}
close();
}
void close() throws IOException {
if (conn != null) {

View File

@ -0,0 +1,33 @@
package org.apache.sqoop.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/* Buffer up lines of text until buffer is full.
* This helper class is just to support a test to
* see if the copyIn mapper for postgres is unbuffered
*
* */
public class LineBuffer{
public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName());
private StringBuilder sb=new StringBuilder();
private static int MAXLEN=100000000;
//private static int MAXLEN=50000000;
public void clear(){ sb.setLength(0); }
public int length(){return sb.length();}
public boolean append(String s){
// LOG.debug(s);
if (sb.length()+s.length()+1>MAXLEN){return false;}
sb.append(s);
sb.append("\n");
return true;
}
public String toString(){return sb.toString();}
public byte[] getBytes() {
try {
//LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8")));
return sb.toString().getBytes("UTF-8");
}catch(Exception e){e.printStackTrace();return null;}
}
}