aync flush version

This commit is contained in:
fariel 2021-03-25 15:49:41 +08:00
parent dee644c960
commit ee9243cfe2
5 changed files with 104 additions and 43 deletions

View File

@ -124,7 +124,7 @@ public class DorisWriter extends Writer {
@Override @Override
public void post() { public void post() {
try { try {
writerManager.flush(writerManager.createBatchLabel()); writerManager.close();
} catch (Exception e) { } catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
} }

View File

@ -32,6 +32,7 @@ public class DorisWriterOptions implements Serializable {
private static final String KEY_POST_SQL = "postSql"; private static final String KEY_POST_SQL = "postSql";
private static final String KEY_JDBC_URL = "jdbcUrl"; private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_LOAD_URL = "loadUrl"; private static final String KEY_LOAD_URL = "loadUrl";
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String KEY_LOAD_PROPS = "loadProps"; private static final String KEY_LOAD_PROPS = "loadProps";
private final Configuration options; private final Configuration options;
@ -96,6 +97,11 @@ public class DorisWriterOptions implements Serializable {
public long getBatchSize() { public long getBatchSize() {
return BATCH_BYTES; return BATCH_BYTES;
} }
public int getFlushQueueLength() {
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
return null == len ? 1 : len;
}
public StreamLoadFormat getStreamLoadFormat() { public StreamLoadFormat getStreamLoadFormat() {
Map<String, Object> loadProps = getLoadProps(); Map<String, Object> loadProps = getLoadProps();

View File

@ -0,0 +1,20 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager;
import java.util.List;
public class DorisFlushTuple {
private String label;
private Long bytes;
private List<String> rows;
public DorisFlushTuple(String label, Long bytes, List<String> rows) {
this.label = label;
this.bytes = bytes;
this.rows = rows;
}
public String getLabel() { return label; }
public Long getBytes() { return bytes; }
public List<String> getRows() { return rows; }
}

View File

@ -37,7 +37,7 @@ public class DorisStreamLoadVisitor {
this.writerOptions = writerOptions; this.writerOptions = writerOptions;
} }
public void doStreamLoad(String label, List<String> labeledRows) throws IOException { public void doStreamLoad(DorisFlushTuple flushData) throws IOException {
String host = getAvailableHost(); String host = getAvailableHost();
if (null == host) { if (null == host) {
throw new IOException("None of the host in `load_url` could be connected."); throw new IOException("None of the host in `load_url` could be connected.");
@ -49,7 +49,8 @@ public class DorisStreamLoadVisitor {
.append(writerOptions.getTable()) .append(writerOptions.getTable())
.append("/_stream_load") .append("/_stream_load")
.toString(); .toString();
Map<String, Object> loadResult = doHttpPut(loadUrl, label, joinRows(labeledRows)); LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
Map<String, Object> loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows()));
final String keyStatus = "Status"; final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) { if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to doris: unknown result status."); throw new IOException("Unable to flush data to doris: unknown result status.");

View File

@ -7,8 +7,10 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingDeque;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
import com.google.common.base.Strings;
public class DorisWriterManager { public class DorisWriterManager {
@ -22,10 +24,13 @@ public class DorisWriterManager {
private long batchSize = 0; private long batchSize = 0;
private volatile boolean closed = false; private volatile boolean closed = false;
private volatile Exception flushException; private volatile Exception flushException;
private final LinkedBlockingDeque<DorisFlushTuple> flushQueue;
public DorisWriterManager(DorisWriterOptions writerOptions) { public DorisWriterManager(DorisWriterOptions writerOptions) {
this.writerOptions = writerOptions; this.writerOptions = writerOptions;
this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions); this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions);
flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
this.startAsyncFlushing();
} }
public final synchronized void writeRecord(String record) throws IOException { public final synchronized void writeRecord(String record) throws IOException {
@ -35,26 +40,88 @@ public class DorisWriterManager {
batchCount++; batchCount++;
batchSize += record.getBytes().length; batchSize += record.getBytes().length;
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) { if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
flush(createBatchLabel()); String label = createBatchLabel();
LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
flush(label, false);
} }
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Writing records to Doris failed.", e); throw new IOException("Writing records to Doris failed.", e);
} }
} }
public synchronized void flush(String label) throws IOException { public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
checkFlushException(); checkFlushException();
if (batchCount == 0) { if (batchCount == 0) {
if (waitUtilDone) {
waitAsyncFlushingDone();
}
return; return;
} }
flushQueue.put(new DorisFlushTuple(label, batchSize, new ArrayList<>(buffer)));
if (waitUtilDone) {
// wait the last flush
waitAsyncFlushingDone();
}
buffer.clear();
batchCount = 0;
batchSize = 0;
}
public synchronized void close() {
if (!closed) {
closed = true;
try {
String label = createBatchLabel();
if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
flush(label, true);
} catch (Exception e) {
throw new RuntimeException("Writing records to Doris failed.", e);
}
}
checkFlushException();
}
public String createBatchLabel() {
return UUID.randomUUID().toString();
}
private void startAsyncFlushing() {
// start flush thread
Thread flushThread = new Thread(new Runnable(){
public void run() {
while(true) {
try {
asyncFlush();
} catch (Exception e) {
flushException = e;
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
}
private void waitAsyncFlushingDone() throws InterruptedException {
// wait previous flushings
for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
flushQueue.put(new DorisFlushTuple("", 0l, null));
}
}
private void asyncFlush() throws Exception {
DorisFlushTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) { for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
try { try {
tryToFlush(label); // flush to Doris with stream load
buffer.clear(); dorisStreamLoadVisitor.doStreamLoad(flushData);
batchCount = 0; LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
batchSize = 0;
break; break;
} catch (IOException e) { } catch (Exception e) {
LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e); LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e);
if (i >= writerOptions.getMaxRetries()) { if (i >= writerOptions.getMaxRetries()) {
throw new IOException(e); throw new IOException(e);
@ -68,39 +135,6 @@ public class DorisWriterManager {
} }
} }
} }
public synchronized void close() {
if (!closed) {
closed = true;
if (batchCount > 0) {
try {
flush(createBatchLabel());
} catch (Exception e) {
throw new RuntimeException("Writing records to Doris failed.", e);
}
}
}
checkFlushException();
}
public String createBatchLabel() {
return UUID.randomUUID().toString();
}
public List<String> getBufferedBatchList() {
return buffer;
}
public void setBufferedBatchList(List<String> buffer) {
this.buffer.clear();
this.buffer.addAll(buffer);
}
private void tryToFlush(String label) throws IOException {
// flush to Doris with stream load
dorisStreamLoadVisitor.doStreamLoad(label, buffer);
}
private void checkFlushException() { private void checkFlushException() {
if (flushException != null) { if (flushException != null) {