diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index 9f8f510c..1f0dff62 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -43,6 +43,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter "password": "xxxxxx", "postSql": ["select count(1) from all_employees_info"], "preSql": [], + "flushInterval":30000, "connection": [ { "jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo", @@ -77,7 +78,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **loadUrl** - - 描述:和 **beLoadUrl** 二选一。作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,doriswriter 将以轮询的方式访问。 + - 描述:作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,多个之间使用英文状态的分号隔开:`;`,doriswriter 将以轮询的方式访问。 - 必选:是 - 默认值:无 @@ -88,23 +89,21 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter - 默认值:无 * **password** - + - 描述:访问Doris数据库的密码 - 必选:否 - 默认值:空 -* **database** - +* **connection.selectedDatabase** - 描述:需要写入的Doris数据库名称。 - 必选:是 - 默认值:无 -* **table** - - - 描述:需要写入的Doris表名称。 +* **connection.table** + - 描述:需要写入的Doris表名称。 - 必选:是 - 默认值:无 - + * **column** - 描述:目的表**需要写入数据**的字段,这些字段将作为生成的 Json 数据的字段名。字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 @@ -144,32 +143,26 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter * **labelPrefix** - - 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID + 序号` 组成 + - 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入 - 必选:否 - 默认值:`datax_doris_writer_` -* **format** - - - 描述:StreamLoad数据的组装格式,支持csv和json格式。csv默认的行分隔符是\x01,列分隔符是\x02。 - - 必选:否 - - 默认值:csv - * **loadProps** - - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。 + - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual) + + 这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息 + - 必选:否 + - 默认值:无 -* **connectTimeout** - - - 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。 - - 必选:否 - - 默认值:-1 - ### 类型转换 默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。 -如需更改列分隔符, 则正确配置 `loadProps` 即可: + +默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可: + ```json "loadProps": { "column_separator": "\\x01", @@ -183,4 +176,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter "format": "json", "strip_outer_array": true } -``` \ No newline at end of file +``` + +更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual) \ No newline at end of file diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java similarity index 98% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java index 5c92af9f..e84bd7dd 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DelimiterParser.java @@ -4,7 +4,7 @@ import com.google.common.base.Strings; import java.io.StringWriter; -public class DorisDelimiterParser { +public class DelimiterParser { private static final String HEX_STRING = "0123456789ABCDEF"; diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java similarity index 88% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java index c9aacc6e..ee7ded56 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseCodec.java @@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.doriswriter; import com.alibaba.datax.common.element.Column; -public class DorisBaseSerializer { - protected String fieldConvertion( Column col) { +public class DorisBaseCodec { + protected String convertionField( Column col) { if (null == col.getRawData() || Column.Type.NULL == col.getType()) { return null; } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java similarity index 59% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java index 58759ecf..a2437a1c 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java @@ -4,7 +4,7 @@ import com.alibaba.datax.common.element.Record; import java.io.Serializable; -public interface DorisSerializer extends Serializable { +public interface DorisCodec extends Serializable { - String serialize( Record row); + String codec( Record row); } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java new file mode 100644 index 00000000..22c4b409 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.Map; + +public class DorisCodecFactory { + public DorisCodecFactory (){ + + } + public static DorisCodec createCodec( Keys writerOptions) { + if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + Map props = writerOptions.getLoadProps(); + return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); + } + if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new DorisJsonCodec (writerOptions.getColumns()); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java similarity index 63% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java index 93c477d8..518aa304 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -2,20 +2,21 @@ package com.alibaba.datax.plugin.writer.doriswriter; import com.alibaba.datax.common.element.Record; -public class DorisCsvSerializer extends DorisBaseSerializer implements DorisSerializer{ +public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec { + private static final long serialVersionUID = 1L; private final String columnSeparator; - public DorisCsvSerializer(String sp) { - this.columnSeparator = DorisDelimiterParser.parse(sp, "\t"); + public DorisCsvCodec ( String sp) { + this.columnSeparator = DelimiterParser.parse(sp, "\t"); } @Override - public String serialize( Record row) { + public String codec( Record row) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < row.getColumnNumber(); i++) { - String value = fieldConvertion(row.getColumn(i)); + String value = convertionField(row.getColumn(i)); sb.append(null == value ? "\\N" : value); if (i < row.getColumnNumber() - 1) { sb.append(columnSeparator); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java similarity index 72% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java index 315e472c..e6c05733 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java @@ -7,25 +7,25 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class DorisJsonSerializer extends DorisBaseSerializer implements DorisSerializer{ +public class DorisJsonCodec extends DorisBaseCodec implements DorisCodec { private static final long serialVersionUID = 1L; private final List fieldNames; - public DorisJsonSerializer( List fieldNames) { + public DorisJsonCodec ( List fieldNames) { this.fieldNames = fieldNames; } @Override - public String serialize( Record row) { + public String codec( Record row) { if (null == fieldNames) { return ""; } Map rowMap = new HashMap<> (fieldNames.size()); int idx = 0; for (String fieldName : fieldNames) { - rowMap.put(fieldName, fieldConvertion(row.getColumn(idx))); + rowMap.put(fieldName, convertionField(row.getColumn(idx))); idx++; } return JSON.toJSONString(rowMap); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java deleted file mode 100644 index 8484e5b7..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.alibaba.datax.plugin.writer.doriswriter; - -import java.util.Map; - -public class DorisSerializerFactory { - public DorisSerializerFactory(){ - - } - public static DorisSerializer createSerializer(DorisWriterOptions writerOptions) { - if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { - Map props = writerOptions.getLoadProps(); - return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); - } - if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { - return new DorisJsonSerializer(writerOptions.getColumns()); - } - throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java similarity index 81% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index a18940cb..3e4db6cf 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -27,10 +27,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class DorisStreamLoadVisitor { - private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class); +public class DorisStreamLoadObserver { + private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class); - private DorisWriterOptions options; + private Keys options; private long pos; private static final String RESULT_FAILED = "Fail"; @@ -42,14 +42,14 @@ public class DorisStreamLoadVisitor { private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; - public DorisStreamLoadVisitor(DorisWriterOptions options){ + public DorisStreamLoadObserver ( Keys options){ this.options = options; } - public void streamLoad(DorisWriterTuple data) throws Exception { - String host = getAvailableHost(); + public void streamLoad(WriterTuple data) throws Exception { + String host = getLoadHost(); if(host == null){ - throw new IOException ("None of the host in `load_url` could be connected."); + throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration."); } String loadUrl = new StringBuilder(host) .append("/api/") @@ -58,25 +58,25 @@ public class DorisStreamLoadVisitor { .append(options.getTable()) .append("/_stream_load") .toString(); - LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", data.getRows().size(), data.getBytes(), data.getLabel())); - Map loadResult = doHttpPut(loadUrl, data.getLabel(), joinRows(data.getRows(), data.getBytes().intValue())); + LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel()); + Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue())); + LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult)); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { throw new IOException("Unable to flush data to Doris: unknown result status."); } - LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { throw new IOException( new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString() ); } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { - LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); - // has to block-checking the state to get the final result - checkLabelState(host, data.getLabel()); + LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); + checkStreamLoadState(host, data.getLabel()); } } - private void checkLabelState(String host, String label) throws IOException { + private void checkStreamLoadState(String host, String label) throws IOException { int idx = 0; while(true) { try { @@ -109,7 +109,7 @@ public class DorisStreamLoadVisitor { case RESULT_LABEL_PREPARE: continue; case RESULT_LABEL_ABORTED: - throw new DorisStreamLoadExcetion(String.format("Failed to flush data to Doris, Error " + + throw new DorisWriterExcetion (String.format("Failed to flush data to Doris, Error " + "label[%s] state[%s]\n", label, labelState), null, true); case RESULT_LABEL_UNKNOWN: default: @@ -121,10 +121,10 @@ public class DorisStreamLoadVisitor { } } - private byte[] joinRows(List rows, int totalBytes) { - if (DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + private byte[] addRows(List rows, int totalBytes) { + if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); - byte[] lineDelimiter = DorisDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); for (byte[] row : rows) { bos.put(row); @@ -133,7 +133,7 @@ public class DorisStreamLoadVisitor { return bos.array(); } - if (DorisWriterOptions.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); bos.put("[".getBytes(StandardCharsets.UTF_8)); byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); @@ -150,7 +150,7 @@ public class DorisStreamLoadVisitor { } throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); } - private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { + private Map put(String loadUrl, String label, byte[] data) throws IOException { LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); final HttpClientBuilder httpClientBuilder = HttpClients.custom() .setRedirectStrategy(new DefaultRedirectStrategy () { @@ -162,7 +162,7 @@ public class DorisStreamLoadVisitor { try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { HttpPut httpPut = new HttpPut(loadUrl); List cols = options.getColumns(); - if (null != cols && !cols.isEmpty() && DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); } if (null != options.getLoadProps()) { @@ -205,19 +205,19 @@ public class DorisStreamLoadVisitor { return respEntity; } - private String getAvailableHost() { + private String getLoadHost() { List hostList = options.getLoadUrlList(); long tmp = pos + hostList.size(); for (; pos < tmp; pos++) { String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); - if (tryHttpConnection(host)) { + if (checkConnection(host)) { return host; } } return null; } - private boolean tryHttpConnection(String host) { + private boolean checkConnection(String host) { try { URL url = new URL(host); HttpURLConnection co = (HttpURLConnection) url.openConnection(); @@ -226,7 +226,7 @@ public class DorisStreamLoadVisitor { co.disconnect(); return true; } catch (Exception e1) { - LOG.warn("Failed to connect to address:{}", host, e1); + e1.printStackTrace(); return false; } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java index 7c7d9a92..5f5a6f34 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java @@ -71,7 +71,7 @@ public class DorisUtil { } } - public static void preCheckPrePareSQL(DorisWriterOptions options) { + public static void preCheckPrePareSQL( Keys options) { String table = options.getTable(); List preSqls = options.getPreSqlList(); List renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table); @@ -87,7 +87,7 @@ public class DorisUtil { } } - public static void preCheckPostSQL(DorisWriterOptions options) { + public static void preCheckPostSQL( Keys options) { String table = options.getTable(); List postSqls = options.getPostSqlList(); List renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index f16cd354..b44d5440 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -25,19 +25,12 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.util.RdbmsException; -import com.alibaba.datax.plugin.rdbms.writer.Constant; -import com.alibaba.druid.sql.parser.ParserException; -import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; -import java.sql.Statement; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.UUID; /** * doris data writer @@ -48,12 +41,12 @@ public class DorisWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig = null; - private DorisWriterOptions options; + private Keys options; @Override public void init() { this.originalConfig = super.getPluginJobConf(); - options = new DorisWriterOptions(super.getPluginJobConf()); + options = new Keys (super.getPluginJobConf()); options.doPretreatment(); } @@ -92,11 +85,10 @@ public class DorisWriter extends Writer { String username = options.getUsername(); String password = options.getPassword(); String jdbcUrl = options.getJdbcUrl(); - LOG.info("userName :{},password:{},jdbcUrl:{}.", username,password,jdbcUrl); List renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); DorisUtil.executeSqls(conn, renderedPostSqls); DBUtil.closeDBResources(null, null, conn); } @@ -110,19 +102,19 @@ public class DorisWriter extends Writer { public static class Task extends Writer.Task { private DorisWriterManager writerManager; - private DorisWriterOptions options; - private DorisSerializer rowSerializer; + private Keys options; + private DorisCodec rowCodec; @Override public void init() { - options = new DorisWriterOptions(super.getPluginJobConf()); + options = new Keys (super.getPluginJobConf()); if (options.isWildcardColumn()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword()); List columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable()); options.setInfoCchemaColumns(columns); } writerManager = new DorisWriterManager(options); - rowSerializer = DorisSerializerFactory.createSerializer(options); + rowCodec = DorisCodecFactory.createCodec(options); } @Override @@ -138,11 +130,14 @@ public class DorisWriter extends Writer { .asDataXException( DBUtilErrorCode.CONF_ERROR, String.format( - "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", + "There is an error in the column configuration information. " + + "This is because you have configured a task where the number of fields to be read from the source:%s " + + "is not equal to the number of fields to be written to the destination table:%s. " + + "Please check your configuration and make changes.", record.getColumnNumber(), options.getColumns().size())); } - writerManager.writeRecord(rowSerializer.serialize(record)); + writerManager.writeRecord(rowCodec.codec(record)); } } catch (Exception e) { throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java similarity index 68% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java index dc19b209..7797d79f 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java @@ -3,17 +3,17 @@ package com.alibaba.datax.plugin.writer.doriswriter; import java.io.IOException; import java.util.Map; -public class DorisStreamLoadExcetion extends IOException { +public class DorisWriterExcetion extends IOException { private final Map response; private boolean reCreateLabel; - public DorisStreamLoadExcetion(String message, Map response) { + public DorisWriterExcetion ( String message, Map response) { super(message); this.response = response; } - public DorisStreamLoadExcetion(String message, Map response, boolean reCreateLabel) { + public DorisWriterExcetion ( String message, Map response, boolean reCreateLabel) { super(message); this.response = response; this.reCreateLabel = reCreateLabel; diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java index df034076..f0ba6b52 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java @@ -20,20 +20,20 @@ public class DorisWriterManager { private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class); - private final DorisStreamLoadVisitor visitor; - private final DorisWriterOptions options; + private final DorisStreamLoadObserver visitor; + private final Keys options; private final List buffer = new ArrayList<> (); private int batchCount = 0; private long batchSize = 0; private volatile boolean closed = false; private volatile Exception flushException; - private final LinkedBlockingDeque flushQueue; + private final LinkedBlockingDeque< WriterTuple > flushQueue; private ScheduledExecutorService scheduler; private ScheduledFuture scheduledFuture; - public DorisWriterManager(DorisWriterOptions options) { + public DorisWriterManager( Keys options) { this.options = options; - this.visitor = new DorisStreamLoadVisitor(options); + this.visitor = new DorisStreamLoadObserver (options); flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); this.startScheduler(); this.startAsyncFlushing(); @@ -92,7 +92,7 @@ public class DorisWriterManager { } return; } - flushQueue.put(new DorisWriterTuple(label, batchSize, new ArrayList<>(buffer))); + flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer))); if (waitUtilDone) { // wait the last flush waitAsyncFlushingDone(); @@ -145,13 +145,13 @@ public class DorisWriterManager { private void waitAsyncFlushingDone() throws InterruptedException { // wait previous flushings for (int i = 0; i <= options.getFlushQueueLength(); i++) { - flushQueue.put(new DorisWriterTuple("", 0l, null)); + flushQueue.put(new WriterTuple ("", 0l, null)); } checkFlushException(); } private void asyncFlush() throws Exception { - DorisWriterTuple flushData = flushQueue.take(); + WriterTuple flushData = flushQueue.take(); if (Strings.isNullOrEmpty(flushData.getLabel())) { return; } @@ -169,7 +169,7 @@ public class DorisWriterManager { if (i >= options.getMaxRetries()) { throw new IOException(e); } - if (e instanceof DorisStreamLoadExcetion && ((DorisStreamLoadExcetion)e).needReCreateLabel()) { + if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) { String newLabel = createBatchLabel(); LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); flushData.setLabel(newLabel); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java deleted file mode 100644 index b4395d2e..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java +++ /dev/null @@ -1,174 +0,0 @@ -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class DorisWriterOptions implements Serializable { - - private static final long serialVersionUID = 1l; - private static final long KILO_BYTES_SCALE = 1024l; - private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE; - private static final int MAX_RETRIES = 3; - private static final int BATCH_ROWS = 500000; - private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE; - private static final long FLUSH_INTERVAL = 300000; - - private static final String KEY_LOAD_PROPS_FORMAT = "format"; - public enum StreamLoadFormat { - CSV, JSON; - } - - private static final String KEY_USERNAME = "username"; - private static final String KEY_PASSWORD = "password"; - private static final String KEY_DATABASE = "connection[0].database"; - private static final String KEY_TABLE = "connection[0].table[0]"; - private static final String KEY_COLUMN = "column"; - private static final String KEY_PRE_SQL = "preSql"; - private static final String KEY_POST_SQL = "postSql"; - private static final String KEY_JDBC_URL = "connection[0].jdbcUrl"; - private static final String KEY_LABEL_PREFIX = "labelPrefix"; - private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows"; - private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize"; - private static final String KEY_FLUSH_INTERVAL = "flushInterval"; - private static final String KEY_LOAD_URL = "loadUrl"; - private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength"; - private static final String KEY_LOAD_PROPS = "loadProps"; - - private final Configuration options; - private List infoCchemaColumns; - private List userSetColumns; - private boolean isWildcardColumn; - - public DorisWriterOptions(Configuration options) { - this.options = options; - this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); - if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) { - this.isWildcardColumn = true; - } - } - - public void doPretreatment() { - validateRequired(); - validateStreamLoadUrl(); - } - - public String getJdbcUrl() { - return options.getString(KEY_JDBC_URL); - } - - public String getDatabase() { - return options.getString(KEY_DATABASE); - } - - public String getTable() { - return options.getString(KEY_TABLE); - } - - public String getUsername() { - return options.getString(KEY_USERNAME); - } - - public String getPassword() { - return options.getString(KEY_PASSWORD); - } - - public String getLabelPrefix() { - return options.getString(KEY_LABEL_PREFIX); - } - - public List getLoadUrlList() { - return options.getList(KEY_LOAD_URL, String.class); - } - - public List getColumns() { - if (isWildcardColumn) { - return this.infoCchemaColumns; - } - return this.userSetColumns; - } - - public boolean isWildcardColumn() { - return this.isWildcardColumn; - } - - public void setInfoCchemaColumns(List cols) { - this.infoCchemaColumns = cols; - } - - public List getPreSqlList() { - return options.getList(KEY_PRE_SQL, String.class); - } - - public List getPostSqlList() { - return options.getList(KEY_POST_SQL, String.class); - } - - public Map getLoadProps() { - return options.getMap(KEY_LOAD_PROPS); - } - - public int getMaxRetries() { - return MAX_RETRIES; - } - - public int getBatchRows() { - Integer rows = options.getInt(KEY_MAX_BATCH_ROWS); - return null == rows ? BATCH_ROWS : rows; - } - - public long getBatchSize() { - Long size = options.getLong(KEY_MAX_BATCH_SIZE); - return null == size ? BATCH_BYTES : size; - } - - public long getFlushInterval() { - Long interval = options.getLong(KEY_FLUSH_INTERVAL); - return null == interval ? FLUSH_INTERVAL : interval; - } - - public int getFlushQueueLength() { - Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH); - return null == len ? 1 : len; - } - - public StreamLoadFormat getStreamLoadFormat() { - Map loadProps = getLoadProps(); - if (null == loadProps) { - return StreamLoadFormat.CSV; - } - if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT) - && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) { - return StreamLoadFormat.JSON; - } - return StreamLoadFormat.CSV; - } - - private void validateStreamLoadUrl() { - List urlList = getLoadUrlList(); - for (String host : urlList) { - if (host.split(":").length < 2) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。"); - } - } - } - - private void validateRequired() { - final String[] requiredOptionKeys = new String[]{ - KEY_USERNAME, - KEY_DATABASE, - KEY_TABLE, - KEY_COLUMN, - KEY_LOAD_URL - }; - for (String optionKey : requiredOptionKeys) { - options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); - } - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java new file mode 100644 index 00000000..01c0e3c6 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java @@ -0,0 +1,177 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class Keys implements Serializable { + + private static final long serialVersionUID = 1l; + private static final int MAX_RETRIES = 3; + private static final int BATCH_ROWS = 500000; + private static final long DEFAULT_FLUSH_INTERVAL = 30000; + + private static final String LOAD_PROPS_FORMAT = "format"; + public enum StreamLoadFormat { + CSV, JSON; + } + + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String DATABASE = "connection[0].selectedDatabase"; + private static final String TABLE = "connection[0].table[0]"; + private static final String COLUMN = "column"; + private static final String PRE_SQL = "preSql"; + private static final String POST_SQL = "postSql"; + private static final String JDBC_URL = "connection[0].jdbcUrl"; + private static final String LABEL_PREFIX = "labelPrefix"; + private static final String MAX_BATCH_ROWS = "maxBatchRows"; + private static final String MAX_BATCH_SIZE = "maxBatchSize"; + private static final String FLUSH_INTERVAL = "flushInterval"; + private static final String LOAD_URL = "loadUrl"; + private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength"; + private static final String LOAD_PROPS = "loadProps"; + + private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; + + private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M + + private final Configuration options; + + private List infoSchemaColumns; + private List userSetColumns; + private boolean isWildcardColumn; + + public Keys ( Configuration options) { + this.options = options; + this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) { + this.isWildcardColumn = true; + } + } + + public void doPretreatment() { + validateRequired(); + validateStreamLoadUrl(); + } + + public String getJdbcUrl() { + return options.getString(JDBC_URL); + } + + public String getDatabase() { + return options.getString(DATABASE); + } + + public String getTable() { + return options.getString(TABLE); + } + + public String getUsername() { + return options.getString(USERNAME); + } + + public String getPassword() { + return options.getString(PASSWORD); + } + + public String getLabelPrefix() { + String label = options.getString(LABEL_PREFIX); + return null == label ? DEFAULT_LABEL_PREFIX : label; + } + + public List getLoadUrlList() { + return options.getList(LOAD_URL, String.class); + } + + public List getColumns() { + if (isWildcardColumn) { + return this.infoSchemaColumns; + } + return this.userSetColumns; + } + + public boolean isWildcardColumn() { + return this.isWildcardColumn; + } + + public void setInfoCchemaColumns(List cols) { + this.infoSchemaColumns = cols; + } + + public List getPreSqlList() { + return options.getList(PRE_SQL, String.class); + } + + public List getPostSqlList() { + return options.getList(POST_SQL, String.class); + } + + public Map getLoadProps() { + return options.getMap(LOAD_PROPS); + } + + public int getMaxRetries() { + return MAX_RETRIES; + } + + public int getBatchRows() { + Integer rows = options.getInt(MAX_BATCH_ROWS); + return null == rows ? BATCH_ROWS : rows; + } + + public long getBatchSize() { + Long size = options.getLong(MAX_BATCH_SIZE); + return null == size ? DEFAULT_MAX_BATCH_SIZE : size; + } + + public long getFlushInterval() { + Long interval = options.getLong(FLUSH_INTERVAL); + return null == interval ? DEFAULT_FLUSH_INTERVAL : interval; + } + + public int getFlushQueueLength() { + Integer len = options.getInt(FLUSH_QUEUE_LENGTH); + return null == len ? 1 : len; + } + + public StreamLoadFormat getStreamLoadFormat() { + Map loadProps = getLoadProps(); + if (null == loadProps) { + return StreamLoadFormat.CSV; + } + if (loadProps.containsKey(LOAD_PROPS_FORMAT) + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) { + return StreamLoadFormat.JSON; + } + return StreamLoadFormat.CSV; + } + + private void validateStreamLoadUrl() { + List urlList = getLoadUrlList(); + for (String host : urlList) { + if (host.split(":").length < 2) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`]."); + } + } + } + + private void validateRequired() { + final String[] requiredOptionKeys = new String[]{ + USERNAME, + DATABASE, + TABLE, + COLUMN, + LOAD_URL + }; + for (String optionKey : requiredOptionKeys) { + options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java similarity index 81% rename from doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java rename to doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java index 0bc52daf..32e0b341 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java @@ -2,12 +2,12 @@ package com.alibaba.datax.plugin.writer.doriswriter; import java.util.List; -public class DorisWriterTuple { +public class WriterTuple { private String label; private Long bytes; private List rows; - public DorisWriterTuple(String label,Long bytes,List rows){ + public WriterTuple ( String label, Long bytes, List rows){ this.label = label; this.rows = rows; this.bytes = bytes;