This commit is contained in:
jiafeng.zhang 2022-10-09 18:20:23 +08:00
parent 91306ad586
commit 2ba4727e14
6 changed files with 29 additions and 27 deletions

View File

@ -20,17 +20,19 @@
"writer": {
"name": "doriswriter",
"parameter": {
"feLoadUrl": ["192.168.1.1:8030"],
"jdbcUrl": "jdbc:mysql://192.168.1.1:9030/",
"loadProps": {
},
"loadUrl": ["192.168.1.1:8030"],
"loadProps": {},
"database": "db1",
"table": "t3",
"column": ["k1", "k2", "k3"],
"username": "root",
"password": "",
"postSql": [],
"preSql": []
"preSql": [],
"connection": [
"jdbcUrl":"jdbc:mysql://192.168.1.1:9030/",
"table":["xxx"],
"selectedDatabase":"xxxx"
]
}
}
}

View File

@ -95,11 +95,11 @@ public class DorisWriter extends Writer {
// trigger buffer
if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
// generate doris stream load label
flush(flushBatch);
flush (flushBatch);
// clear buffer
batchCount = 0;
batchByteSize = 0L;
flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat());
flushBatch = new DorisFlushBatch (lineDelimiter, this.keys.getFormat());
}
} // end of while

View File

@ -31,12 +31,11 @@ import java.util.Map;
public class Key implements Serializable {
public static final String FE_LOAD_URL = "feLoadUrl";
public static final String BE_LOAD_URL = "beLoadUrl";
public static final String JDBC_URL = "jdbcUrl";
public static final String JDBC_URL = "connection[0].jdbcUrl";
public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String DATABASE = "connection[0].selectedDatabase";
public static final String TABLE = "connection[0].table[0]";
public static final String COLUMN = "column";
public static final String TIME_ZONE = "timeZone";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
@ -48,8 +47,8 @@ public class Key implements Serializable {
public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter";
public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator";
public static final String MAX_BATCH_ROWS = "batchSizeRows";
public static final String BATCH_BYTE_SIZE = "batchByteSize";
public static final String MAX_BATCH_ROWS = "maxBatchRows";
public static final String BATCH_BYTE_SIZE = "maxBatchSize";
public static final String MAX_RETRIES = "maxRetries";
public static final String LABEL_PREFIX = "labelPrefix";
public static final String FORMAT = "format";
@ -57,6 +56,7 @@ public class Key implements Serializable {
private final Configuration options;
private static final long DEFAULT_MAX_BATCH_ROWS = 500000;
private static final long DEFAULT_BATCH_BYTE_SIZE = 90 * 1024 * 1024;
private static final int DEFAULT_MAX_RETRIES = 0;
@ -109,10 +109,6 @@ public class Key implements Serializable {
return this.options.getList(COLUMN, String.class);
}
public String getTimeZone() {
return this.options.getString(TIME_ZONE, DEFAULT_TIME_ZONE);
}
public List<String> getPreSqlList() {
return this.options.getList(PRE_SQL, String.class);
}
@ -157,6 +153,7 @@ public class Key implements Serializable {
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
private void validateStreamLoadUrl() {
List<String> urlList = this.getBeLoadUrlList();
if (urlList == null) {

View File

@ -1,6 +1,6 @@
{
"name": "doriswriter",
"class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter",
"description": "",
"developer": ""
"description": "apache doris writer plugin",
"developer": "apche doris"
}

View File

@ -3,14 +3,17 @@
"parameter": {
"username": "",
"password": "",
"database": "",
"table": "",
"column": [],
"timeZone": "",
"preSql": [],
"postSql": [],
"jdbcUrl": "",
"beLoadUrl": [],
"loadProps": {}
"loadProps": {},
"connection": [
{
"jdbcUrl": "",
"selectedDatabase": "",
"table": []
}
]
}
}

View File

@ -62,7 +62,7 @@ public class TestDorisWriterLoad {
Key key = new Key(configuration);
DorisWriterEmitter emitter = new DorisWriterEmitter(key);
DorisFlushBatch flushBatch = new DorisFlushBatch("\n");
DorisFlushBatch flushBatch = new DorisFlushBatch("\n","csv");
flushBatch.setLabel("test4");
Map<String, String> row1 = Maps.newHashMap();
row1.put("k1", "2021-02-02");
@ -83,6 +83,6 @@ public class TestDorisWriterLoad {
for (int i = 0; i < 50000; ++i) {
flushBatch.putData(rowStr2);
}
emitter.doStreamLoad(flushBatch);
emitter.emit (flushBatch);
}
}