optimize heap memory usage

This commit is contained in:
fariel 2021-07-22 18:52:47 +08:00
parent f4d25be8fe
commit fd42a23b68

View File

@ -3,7 +3,7 @@ package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
@ -52,7 +52,7 @@ public class DorisStreamLoadVisitor {
.append("/_stream_load") .append("/_stream_load")
.toString(); .toString();
LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
Map<String, Object> loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows())); Map<String, Object> loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue()));
final String keyStatus = "Status"; final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) { if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to doris: unknown result status."); throw new IOException("Unable to flush data to doris: unknown result status.");
@ -93,32 +93,32 @@ public class DorisStreamLoadVisitor {
} }
} }
private byte[] joinRows(List<String> rows) { private byte[] joinRows(List<String> rows, int totalBytes) {
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps(); Map<String, Object> props = writerOptions.getLoadProps();
String lineDelimiter = "\n"; ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size());
if (null != props && props.containsKey("row_delimiter")) { byte[] lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n").getBytes(StandardCharsets.UTF_8);
lineDelimiter = DorisDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n");
}
StringBuilder sb = new StringBuilder();
for (String row : rows) { for (String row : rows) {
sb.append(row).append(lineDelimiter); bos.put(row.getBytes(StandardCharsets.UTF_8));
bos.put(lineDelimiter);
} }
return sb.toString().getBytes(StandardCharsets.UTF_8); return bos.array();
} }
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
StringBuilder sb = new StringBuilder(); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() + 1);
sb.append("["); bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true; boolean isFirstElement = true;
for (String row : rows) { for (String row : rows) {
if (!isFirstElement) { if (!isFirstElement) {
sb.append(","); bos.put(jsonDelimiter);
} }
sb.append(row); bos.put(row.getBytes(StandardCharsets.UTF_8));
isFirstElement = false; isFirstElement = false;
} }
sb.append("]"); bos.put("]".getBytes(StandardCharsets.UTF_8));
return sb.toString().getBytes(StandardCharsets.UTF_8); return bos.array();
} }
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
} }