mirror of
https://github.com/apache/sqoop.git
synced 2025-05-03 01:31:04 +08:00
cleanups:
1. moved LineBuffer to inner class of the Export Mapper 2. extended support to 9.x direct copy 3. Added test case
This commit is contained in:
parent
4916a0ed26
commit
be539f183b
@ -36,7 +36,7 @@
|
|||||||
import org.postgresql.PGConnection;
|
import org.postgresql.PGConnection;
|
||||||
import org.postgresql.copy.CopyManager;
|
import org.postgresql.copy.CopyManager;
|
||||||
import org.postgresql.copy.CopyIn;
|
import org.postgresql.copy.CopyIn;
|
||||||
import org.apache.sqoop.util.LineBuffer;
|
//import org.apache.sqoop.util.LineBuffer;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -66,6 +66,36 @@ public class PostgreSQLCopyExportMapper
|
|||||||
new DelimiterSet(',', '\n',
|
new DelimiterSet(',', '\n',
|
||||||
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false);
|
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR, false);
|
||||||
|
|
||||||
|
/* Buffer up lines of text until buffer is full.
|
||||||
|
* This helper class is just to support a test to
|
||||||
|
* see if the copyIn mapper for postgres is unbuffered
|
||||||
|
*
|
||||||
|
* */
|
||||||
|
static class LineBuffer{
|
||||||
|
public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName());
|
||||||
|
|
||||||
|
private StringBuilder sb=new StringBuilder();
|
||||||
|
private static int MAXLEN=100000000;
|
||||||
|
//private static int MAXLEN=50000000;
|
||||||
|
|
||||||
|
public void clear(){ sb.setLength(0); }
|
||||||
|
public int length(){return sb.length();}
|
||||||
|
public boolean append(String s){
|
||||||
|
// LOG.debug(s);
|
||||||
|
if (sb.length()+s.length()+1>MAXLEN){return false;}
|
||||||
|
sb.append(s);
|
||||||
|
sb.append("\n");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
public String toString(){return sb.toString();}
|
||||||
|
public byte[] getBytes() {
|
||||||
|
try {
|
||||||
|
//LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8")));
|
||||||
|
return sb.toString().getBytes("UTF-8");
|
||||||
|
}catch(Exception e){e.printStackTrace();return null;}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public PostgreSQLCopyExportMapper() {
|
public PostgreSQLCopyExportMapper() {
|
||||||
}
|
}
|
||||||
/* Text mode normally interprets escape sequences. Optionally
|
/* Text mode normally interprets escape sequences. Optionally
|
||||||
@ -155,7 +185,7 @@ protected void setup(Context context)
|
|||||||
sql.append(", FORMAT csv ");
|
sql.append(", FORMAT csv ");
|
||||||
sql.append(", DELIMITER ");
|
sql.append(", DELIMITER ");
|
||||||
sql.append("'");
|
sql.append("'");
|
||||||
sql.append(conf.get("postgresql.input.field.delim", ","));
|
sql.append(delim);
|
||||||
sql.append("'");
|
sql.append("'");
|
||||||
sql.append(", QUOTE ");
|
sql.append(", QUOTE ");
|
||||||
sql.append("'");
|
sql.append("'");
|
||||||
@ -165,11 +195,16 @@ protected void setup(Context context)
|
|||||||
sql.append("'");
|
sql.append("'");
|
||||||
sql.append(conf.get("postgresql.input.escapedby", "\""));
|
sql.append(conf.get("postgresql.input.escapedby", "\""));
|
||||||
sql.append("'");
|
sql.append("'");
|
||||||
if (conf.get("postgresql.null.string") != null) {
|
/* Hadoop config does not permit empty string so we use special switch to designate that */
|
||||||
sql.append(", NULL ");
|
if (conf.get("postgresql.null.emptystring")!=null){
|
||||||
sql.append("'");
|
sql.append(", NULL ''");
|
||||||
sql.append(conf.get("postgresql.null.string"));
|
}else {
|
||||||
sql.append("'");
|
if (conf.get("postgresql.null.string") != null) {
|
||||||
|
sql.append(", NULL ");
|
||||||
|
sql.append("'");
|
||||||
|
sql.append(conf.get("postgresql.null.string"));
|
||||||
|
sql.append("'");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
sql.append(")");
|
sql.append(")");
|
||||||
}
|
}
|
||||||
|
@ -1,33 +0,0 @@
|
|||||||
package org.apache.sqoop.util;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
/* Buffer up lines of text until buffer is full.
|
|
||||||
* This helper class is just to support a test to
|
|
||||||
* see if the copyIn mapper for postgres is unbuffered
|
|
||||||
*
|
|
||||||
* */
|
|
||||||
public class LineBuffer{
|
|
||||||
public static final Log LOG = LogFactory.getLog(LineBuffer.class.getName());
|
|
||||||
|
|
||||||
private StringBuilder sb=new StringBuilder();
|
|
||||||
private static int MAXLEN=100000000;
|
|
||||||
//private static int MAXLEN=50000000;
|
|
||||||
|
|
||||||
public void clear(){ sb.setLength(0); }
|
|
||||||
public int length(){return sb.length();}
|
|
||||||
public boolean append(String s){
|
|
||||||
// LOG.debug(s);
|
|
||||||
if (sb.length()+s.length()+1>MAXLEN){return false;}
|
|
||||||
sb.append(s);
|
|
||||||
sb.append("\n");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
public String toString(){return sb.toString();}
|
|
||||||
public byte[] getBytes() {
|
|
||||||
try {
|
|
||||||
//LOG.debug("returning "+new String(sb.toString().getBytes("UTF-8")));
|
|
||||||
return sb.toString().getBytes("UTF-8");
|
|
||||||
}catch(Exception e){e.printStackTrace();return null;}
|
|
||||||
}
|
|
||||||
}
|
|
@ -352,6 +352,19 @@ public void testExportDirect() throws IOException, SQLException {
|
|||||||
"3,Fred,2009-01-23,15,marketing",
|
"3,Fred,2009-01-23,15,marketing",
|
||||||
});
|
});
|
||||||
|
|
||||||
|
String[] extra = new String[] {"--direct","--batch"};
|
||||||
|
|
||||||
|
runExport(getArgv(true, extra));
|
||||||
|
|
||||||
|
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testExportDirectBatch() throws IOException, SQLException {
|
||||||
|
createTestFile("inputFile", new String[] {
|
||||||
|
"2,Bob,2009-04-20,400,sales",
|
||||||
|
"3,Fred,2009-01-23,15,marketing",
|
||||||
|
});
|
||||||
|
|
||||||
String[] extra = new String[] {"--direct"};
|
String[] extra = new String[] {"--direct"};
|
||||||
|
|
||||||
runExport(getArgv(true, extra));
|
runExport(getArgv(true, extra));
|
||||||
@ -359,6 +372,7 @@ public void testExportDirect() throws IOException, SQLException {
|
|||||||
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
|
assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExportCustomSchema() throws IOException, SQLException {
|
public void testExportCustomSchema() throws IOException, SQLException {
|
||||||
createTestFile("inputFile", new String[] {
|
createTestFile("inputFile", new String[] {
|
||||||
|
Loading…
Reference in New Issue
Block a user