reduce the getBytes operations

This commit is contained in:
hffariel 2021-09-23 17:15:14 +08:00
parent e7d879185b
commit fe43e20ea9
3 changed files with 12 additions and 11 deletions

View File

@ -6,9 +6,9 @@ public class StarRocksFlushTuple {
private String label;
private Long bytes;
private List<String> rows;
private List<byte[]> rows;
public StarRocksFlushTuple(String label, Long bytes, List<String> rows) {
public StarRocksFlushTuple(String label, Long bytes, List<byte[]> rows) {
this.label = label;
this.bytes = bytes;
this.rows = rows;
@ -16,5 +16,5 @@ public class StarRocksFlushTuple {
public String getLabel() { return label; }
public Long getBytes() { return bytes; }
public List<String> getRows() { return rows; }
public List<byte[]> getRows() { return rows; }
}

View File

@ -93,13 +93,13 @@ public class StarRocksStreamLoadVisitor {
}
}
private byte[] joinRows(List<String> rows, int totalBytes) {
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (String row : rows) {
bos.put(row.getBytes(StandardCharsets.UTF_8));
for (byte[] row : rows) {
bos.put(row);
bos.put(lineDelimiter);
}
return bos.array();
@ -110,11 +110,11 @@ public class StarRocksStreamLoadVisitor {
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true;
for (String row : rows) {
for (byte[] row : rows) {
if (!isFirstElement) {
bos.put(jsonDelimiter);
}
bos.put(row.getBytes(StandardCharsets.UTF_8));
bos.put(row);
isFirstElement = false;
}
bos.put("]".getBytes(StandardCharsets.UTF_8));

View File

@ -20,7 +20,7 @@ public class StarRocksWriterManager {
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private final StarRocksWriterOptions writerOptions;
private final List<String> buffer = new ArrayList<>();
private final List<byte[]> buffer = new ArrayList<>();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
@ -37,9 +37,10 @@ public class StarRocksWriterManager {
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
buffer.add(record);
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += record.getBytes(StandardCharsets.UTF_8).length;
batchSize += bts.length;
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));