From fe43e20ea92a03b3b50f98068407446b81d1cdc0 Mon Sep 17 00:00:00 2001 From: hffariel Date: Thu, 23 Sep 2021 17:15:14 +0800 Subject: [PATCH] reduce the getBytes operations --- .../starrockswriter/manager/StarRocksFlushTuple.java | 6 +++--- .../manager/StarRocksStreamLoadVisitor.java | 10 +++++----- .../manager/StarRocksWriterManager.java | 7 ++++--- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java index cd8c663b..47cebb91 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java @@ -6,9 +6,9 @@ public class StarRocksFlushTuple { private String label; private Long bytes; - private List rows; + private List rows; - public StarRocksFlushTuple(String label, Long bytes, List rows) { + public StarRocksFlushTuple(String label, Long bytes, List rows) { this.label = label; this.bytes = bytes; this.rows = rows; @@ -16,5 +16,5 @@ public class StarRocksFlushTuple { public String getLabel() { return label; } public Long getBytes() { return bytes; } - public List getRows() { return rows; } + public List getRows() { return rows; } } \ No newline at end of file diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java index aecac149..656f4020 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java @@ -93,13 +93,13 @@ public class StarRocksStreamLoadVisitor { } } - private byte[] joinRows(List rows, int totalBytes) { + private byte[] joinRows(List rows, int totalBytes) { if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { Map props = writerOptions.getLoadProps(); byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); - for (String row : rows) { - bos.put(row.getBytes(StandardCharsets.UTF_8)); + for (byte[] row : rows) { + bos.put(row); bos.put(lineDelimiter); } return bos.array(); @@ -110,11 +110,11 @@ public class StarRocksStreamLoadVisitor { bos.put("[".getBytes(StandardCharsets.UTF_8)); byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); boolean isFirstElement = true; - for (String row : rows) { + for (byte[] row : rows) { if (!isFirstElement) { bos.put(jsonDelimiter); } - bos.put(row.getBytes(StandardCharsets.UTF_8)); + bos.put(row); isFirstElement = false; } bos.put("]".getBytes(StandardCharsets.UTF_8)); diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java index 0f68afe9..0e89005f 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java @@ -20,7 +20,7 @@ public class StarRocksWriterManager { private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; private final StarRocksWriterOptions writerOptions; - private final List buffer = new ArrayList<>(); + private final List buffer = new ArrayList<>(); private int batchCount = 0; private long batchSize = 0; private volatile boolean closed = false; @@ -37,9 +37,10 @@ public class StarRocksWriterManager { public final synchronized void writeRecord(String record) throws IOException { checkFlushException(); try { - buffer.add(record); + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + buffer.add(bts); batchCount++; - batchSize += record.getBytes(StandardCharsets.UTF_8).length; + batchSize += bts.length; if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) { String label = createBatchLabel(); LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));