diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java index d88581ec..8b3c414d 100755 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java @@ -124,7 +124,7 @@ public class DorisWriter extends Writer { @Override public void post() { try { - writerManager.flush(writerManager.createBatchLabel()); + writerManager.close(); } catch (Exception e) { throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); } diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java index b4dbb0a3..4229e256 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java @@ -32,6 +32,7 @@ public class DorisWriterOptions implements Serializable { private static final String KEY_POST_SQL = "postSql"; private static final String KEY_JDBC_URL = "jdbcUrl"; private static final String KEY_LOAD_URL = "loadUrl"; + private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength"; private static final String KEY_LOAD_PROPS = "loadProps"; private final Configuration options; @@ -96,6 +97,11 @@ public class DorisWriterOptions implements Serializable { public long getBatchSize() { return BATCH_BYTES; } + + public int getFlushQueueLength() { + Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH); + return null == len ? 1 : len; + } public StreamLoadFormat getStreamLoadFormat() { Map loadProps = getLoadProps(); diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisFlushTuple.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisFlushTuple.java new file mode 100644 index 00000000..24bcc9c6 --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisFlushTuple.java @@ -0,0 +1,20 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.manager; + +import java.util.List; + +public class DorisFlushTuple { + + private String label; + private Long bytes; + private List rows; + + public DorisFlushTuple(String label, Long bytes, List rows) { + this.label = label; + this.bytes = bytes; + this.rows = rows; + } + + public String getLabel() { return label; } + public Long getBytes() { return bytes; } + public List getRows() { return rows; } +} \ No newline at end of file diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java index 68621d8f..7ebfced6 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java @@ -37,7 +37,7 @@ public class DorisStreamLoadVisitor { this.writerOptions = writerOptions; } - public void doStreamLoad(String label, List labeledRows) throws IOException { + public void doStreamLoad(DorisFlushTuple flushData) throws IOException { String host = getAvailableHost(); if (null == host) { throw new IOException("None of the host in `load_url` could be connected."); @@ -49,7 +49,8 @@ public class DorisStreamLoadVisitor { .append(writerOptions.getTable()) .append("/_stream_load") .toString(); - Map loadResult = doHttpPut(loadUrl, label, joinRows(labeledRows)); + LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows())); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { throw new IOException("Unable to flush data to doris: unknown result status."); diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java index da24448f..d7e9ad33 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisWriterManager.java @@ -7,8 +7,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.LinkedBlockingDeque; import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; +import com.google.common.base.Strings; public class DorisWriterManager { @@ -22,10 +24,13 @@ public class DorisWriterManager { private long batchSize = 0; private volatile boolean closed = false; private volatile Exception flushException; + private final LinkedBlockingDeque flushQueue; public DorisWriterManager(DorisWriterOptions writerOptions) { this.writerOptions = writerOptions; this.dorisStreamLoadVisitor = new DorisStreamLoadVisitor(writerOptions); + flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength()); + this.startAsyncFlushing(); } public final synchronized void writeRecord(String record) throws IOException { @@ -35,26 +40,88 @@ public class DorisWriterManager { batchCount++; batchSize += record.getBytes().length; if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) { - flush(createBatchLabel()); + String label = createBatchLabel(); + LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); + flush(label, false); } } catch (Exception e) { throw new IOException("Writing records to Doris failed.", e); } } - public synchronized void flush(String label) throws IOException { + public synchronized void flush(String label, boolean waitUtilDone) throws Exception { checkFlushException(); if (batchCount == 0) { + if (waitUtilDone) { + waitAsyncFlushingDone(); + } return; } + flushQueue.put(new DorisFlushTuple(label, batchSize, new ArrayList<>(buffer))); + if (waitUtilDone) { + // wait the last flush + waitAsyncFlushingDone(); + } + buffer.clear(); + batchCount = 0; + batchSize = 0; + } + + public synchronized void close() { + if (!closed) { + closed = true; + try { + String label = createBatchLabel(); + if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label)); + flush(label, true); + } catch (Exception e) { + throw new RuntimeException("Writing records to Doris failed.", e); + } + } + checkFlushException(); + } + + public String createBatchLabel() { + return UUID.randomUUID().toString(); + } + + private void startAsyncFlushing() { + // start flush thread + Thread flushThread = new Thread(new Runnable(){ + public void run() { + while(true) { + try { + asyncFlush(); + } catch (Exception e) { + flushException = e; + } + } + } + }); + flushThread.setDaemon(true); + flushThread.start(); + } + + private void waitAsyncFlushingDone() throws InterruptedException { + // wait previous flushings + for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) { + flushQueue.put(new DorisFlushTuple("", 0l, null)); + } + } + + private void asyncFlush() throws Exception { + DorisFlushTuple flushData = flushQueue.take(); + if (Strings.isNullOrEmpty(flushData.getLabel())) { + return; + } + LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); for (int i = 0; i <= writerOptions.getMaxRetries(); i++) { try { - tryToFlush(label); - buffer.clear(); - batchCount = 0; - batchSize = 0; + // flush to Doris with stream load + dorisStreamLoadVisitor.doStreamLoad(flushData); + LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); break; - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Failed to flush batch data to doris, retry times = {}", i, e); if (i >= writerOptions.getMaxRetries()) { throw new IOException(e); @@ -68,39 +135,6 @@ public class DorisWriterManager { } } } - - public synchronized void close() { - if (!closed) { - closed = true; - - if (batchCount > 0) { - try { - flush(createBatchLabel()); - } catch (Exception e) { - throw new RuntimeException("Writing records to Doris failed.", e); - } - } - } - checkFlushException(); - } - - public String createBatchLabel() { - return UUID.randomUUID().toString(); - } - - public List getBufferedBatchList() { - return buffer; - } - - public void setBufferedBatchList(List buffer) { - this.buffer.clear(); - this.buffer.addAll(buffer); - } - - private void tryToFlush(String label) throws IOException { - // flush to Doris with stream load - dorisStreamLoadVisitor.doStreamLoad(label, buffer); - } private void checkFlushException() { if (flushException != null) {