From fd42a23b686425e2d1c456aeafec0798965974da Mon Sep 17 00:00:00 2001 From: fariel Date: Thu, 22 Jul 2021 18:52:47 +0800 Subject: [PATCH] optimize heap memory usage --- .../manager/DorisStreamLoadVisitor.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java index 8568c7b6..f792bfaf 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java @@ -3,7 +3,7 @@ package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; -import java.net.URLEncoder; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import com.alibaba.fastjson.JSON; @@ -52,7 +52,7 @@ public class DorisStreamLoadVisitor { .append("/_stream_load") .toString(); LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); - Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows())); + Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue())); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { throw new IOException("Unable to flush data to doris: unknown result status."); @@ -93,32 +93,32 @@ public class DorisStreamLoadVisitor { } } - private byte[] joinRows(List rows) { + private byte[] joinRows(List rows, int totalBytes) { if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { Map props = writerOptions.getLoadProps(); - String lineDelimiter = "\n"; - if (null != props && props.containsKey("row_delimiter")) { - lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n"); - } - StringBuilder sb = new StringBuilder(); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size()); + byte[] lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n").getBytes(StandardCharsets.UTF_8); for (String row : rows) { - sb.append(row).append(lineDelimiter); + bos.put(row.getBytes(StandardCharsets.UTF_8)); + bos.put(lineDelimiter); } - return sb.toString().getBytes(StandardCharsets.UTF_8); + return bos.array(); } + if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { - StringBuilder sb = new StringBuilder(); - sb.append("["); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() + 1); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); boolean isFirstElement = true; for (String row : rows) { if (!isFirstElement) { - sb.append(","); + bos.put(jsonDelimiter); } - sb.append(row); + bos.put(row.getBytes(StandardCharsets.UTF_8)); isFirstElement = false; } - sb.append("]"); - return sb.toString().getBytes(StandardCharsets.UTF_8); + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); } throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); }