diff --git a/doriswriter/doc/mysql2doris.json b/doriswriter/doc/mysql2doris.json index b8be23c9..6992a2be 100644 --- a/doriswriter/doc/mysql2doris.json +++ b/doriswriter/doc/mysql2doris.json @@ -20,17 +20,19 @@ "writer": { "name": "doriswriter", "parameter": { - "feLoadUrl": ["192.168.1.1:8030"], - "jdbcUrl": "jdbc:mysql://192.168.1.1:9030/", - "loadProps": { - }, + "loadUrl": ["192.168.1.1:8030"], + "loadProps": {}, "database": "db1", - "table": "t3", "column": ["k1", "k2", "k3"], "username": "root", "password": "", "postSql": [], - "preSql": [] + "preSql": [], + "connection": [ + "jdbcUrl":"jdbc:mysql://192.168.1.1:9030/", + "table":["xxx"], + "selectedDatabase":"xxxx" + ] } } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index 5f60f9ab..5e0bd205 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -95,11 +95,11 @@ public class DorisWriter extends Writer { // trigger buffer if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) { // generate doris stream load label - flush(flushBatch); + flush (flushBatch); // clear buffer batchCount = 0; batchByteSize = 0L; - flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat()); + flushBatch = new DorisFlushBatch (lineDelimiter, this.keys.getFormat()); } } // end of while diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java index 4335936f..a6ad82c4 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java @@ -31,12 +31,11 @@ import java.util.Map; public class Key implements Serializable { public static final String FE_LOAD_URL = "feLoadUrl"; public static final String BE_LOAD_URL = "beLoadUrl"; - public static final String JDBC_URL = "jdbcUrl"; + public static final String JDBC_URL = "connection[0].jdbcUrl"; - public static final String DATABASE = "database"; - public static final String TABLE = "table"; + public static final String DATABASE = "connection[0].selectedDatabase"; + public static final String TABLE = "connection[0].table[0]"; public static final String COLUMN = "column"; - public static final String TIME_ZONE = "timeZone"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; @@ -48,8 +47,8 @@ public class Key implements Serializable { public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter"; public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator"; - public static final String MAX_BATCH_ROWS = "batchSizeRows"; - public static final String BATCH_BYTE_SIZE = "batchByteSize"; + public static final String MAX_BATCH_ROWS = "maxBatchRows"; + public static final String BATCH_BYTE_SIZE = "maxBatchSize"; public static final String MAX_RETRIES = "maxRetries"; public static final String LABEL_PREFIX = "labelPrefix"; public static final String FORMAT = "format"; @@ -57,6 +56,7 @@ public class Key implements Serializable { private final Configuration options; private static final long DEFAULT_MAX_BATCH_ROWS = 500000; + private static final long DEFAULT_BATCH_BYTE_SIZE = 90 * 1024 * 1024; private static final int DEFAULT_MAX_RETRIES = 0; @@ -109,10 +109,6 @@ public class Key implements Serializable { return this.options.getList(COLUMN, String.class); } - public String getTimeZone() { - return this.options.getString(TIME_ZONE, DEFAULT_TIME_ZONE); - } - public List getPreSqlList() { return this.options.getList(PRE_SQL, String.class); } @@ -157,6 +153,7 @@ public class Key implements Serializable { return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); } + private void validateStreamLoadUrl() { List urlList = this.getBeLoadUrlList(); if (urlList == null) { diff --git a/doriswriter/src/main/resources/plugin.json b/doriswriter/src/main/resources/plugin.json index 9d2ad497..69dc31a2 100644 --- a/doriswriter/src/main/resources/plugin.json +++ b/doriswriter/src/main/resources/plugin.json @@ -1,6 +1,6 @@ { "name": "doriswriter", "class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter", - "description": "", - "developer": "" + "description": "apache doris writer plugin", + "developer": "apche doris" } diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json index 9cd9bb18..897fe49b 100644 --- a/doriswriter/src/main/resources/plugin_job_template.json +++ b/doriswriter/src/main/resources/plugin_job_template.json @@ -3,14 +3,17 @@ "parameter": { "username": "", "password": "", - "database": "", - "table": "", "column": [], - "timeZone": "", "preSql": [], "postSql": [], - "jdbcUrl": "", "beLoadUrl": [], - "loadProps": {} + "loadProps": {}, + "connection": [ + { + "jdbcUrl": "", + "selectedDatabase": "", + "table": [] + } + ] } } \ No newline at end of file diff --git a/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java index 35b6e3a4..bbb60a0e 100644 --- a/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java +++ b/doriswriter/src/test/java/com/alibaba/datax/plugin/writer/doriswriter/TestDorisWriterLoad.java @@ -62,7 +62,7 @@ public class TestDorisWriterLoad { Key key = new Key(configuration); DorisWriterEmitter emitter = new DorisWriterEmitter(key); - DorisFlushBatch flushBatch = new DorisFlushBatch("\n"); + DorisFlushBatch flushBatch = new DorisFlushBatch("\n","csv"); flushBatch.setLabel("test4"); Map row1 = Maps.newHashMap(); row1.put("k1", "2021-02-02"); @@ -83,6 +83,6 @@ public class TestDorisWriterLoad { for (int i = 0; i < 50000; ++i) { flushBatch.putData(rowStr2); } - emitter.doStreamLoad(flushBatch); + emitter.emit (flushBatch); } }