From f5cedbeb3269fb7560cceda16e049863a50bf6af Mon Sep 17 00:00:00 2001 From: hffariel Date: Mon, 24 Jan 2022 12:18:14 +0800 Subject: [PATCH] check the label state when `label already exists` --- .../StarRocksWriterOptions.java | 2 +- .../manager/StarRocksFlushTuple.java | 1 + .../StarRocksStreamLoadFailedException.java | 33 +++++++ .../manager/StarRocksStreamLoadVisitor.java | 86 ++++++++++++++++--- .../manager/StarRocksWriterManager.java | 9 +- 5 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java index 8d5ede98..689c09e1 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java @@ -15,7 +15,7 @@ public class StarRocksWriterOptions implements Serializable { private static final long serialVersionUID = 1l; private static final long KILO_BYTES_SCALE = 1024l; private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE; - private static final int MAX_RETRIES = 1; + private static final int MAX_RETRIES = 3; private static final int BATCH_ROWS = 500000; private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE; private static final long FLUSH_INTERVAL = 300000; diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java index 47cebb91..5c939f9b 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksFlushTuple.java @@ -15,6 +15,7 @@ public class StarRocksFlushTuple { } public String getLabel() { return label; } + public void setLabel(String label) { this.label = label; } public Long getBytes() { return bytes; } public List getRows() { return rows; } } \ No newline at end of file diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java new file mode 100644 index 00000000..4eb47048 --- /dev/null +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadFailedException.java @@ -0,0 +1,33 @@ +package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager; + +import java.io.IOException; +import java.util.Map; + + +public class StarRocksStreamLoadFailedException extends IOException { + + static final long serialVersionUID = 1L; + + private final Map response; + private boolean reCreateLabel; + + public StarRocksStreamLoadFailedException(String message, Map response) { + super(message); + this.response = response; + } + + public StarRocksStreamLoadFailedException(String message, Map response, boolean reCreateLabel) { + super(message); + this.response = response; + this.reCreateLabel = reCreateLabel; + } + + public Map getFailedResponse() { + return response; + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } + +} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java index 2a372263..b8245d71 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksStreamLoadVisitor.java @@ -14,6 +14,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -35,6 +37,13 @@ public class StarRocksStreamLoadVisitor { private final StarRocksWriterOptions writerOptions; private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) { this.writerOptions = writerOptions; @@ -59,10 +68,14 @@ public class StarRocksStreamLoadVisitor { throw new IOException("Unable to flush data to StarRocks: unknown result status."); } LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); - if (loadResult.get(keyStatus).equals("Fail")) { + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { throw new IOException( new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString() ); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + // has to block-checking the state to get the final result + checkLabelState(host, flushData.getLabel()); } } @@ -122,6 +135,52 @@ public class StarRocksStreamLoadVisitor { throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); } + @SuppressWarnings("unchecked") + private void checkLabelState(String host, String label) throws IOException { + int idx = 0; + while(true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + } catch (InterruptedException ex) { + break; + } + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(writerOptions.getDatabase()).append("/get_load_state?label=").append(label).toString()); + httpGet.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword())); + httpGet.setHeader("Connection", "close"); + + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s].\n", label), null); + } + Map result = (Map)JSON.parse(EntityUtils.toString(respEntity)); + String labelState = (String)result.get("state"); + if (null == labelState) { + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); + } + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch(labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null, true); + case RESULT_LABEL_UNKNOWN: + default: + throw new IOException(String.format("Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", label, labelState), null); + } + } + } + } + } + @SuppressWarnings("unchecked") private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); @@ -150,16 +209,9 @@ public class StarRocksStreamLoadVisitor { httpPut.setEntity(new ByteArrayEntity(data)); httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { - int code = resp.getStatusLine().getStatusCode(); - if (200 != code) { - LOG.warn("Request failed with code:{}", code); + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) return null; - } - HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } return (Map)JSON.parse(EntityUtils.toString(respEntity)); } } @@ -171,4 +223,18 @@ public class StarRocksStreamLoadVisitor { return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); } + private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + } diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java index f32eccde..d7c290df 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java @@ -134,7 +134,7 @@ public class StarRocksWriterManager { flushException = e; } } - } + } }); flushThread.setDaemon(true); flushThread.start(); @@ -167,8 +167,13 @@ public class StarRocksWriterManager { if (i >= writerOptions.getMaxRetries()) { throw new IOException(e); } + if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException)e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); + flushData.setLabel(newLabel); + } try { - Thread.sleep(1000l * (i + 1)); + Thread.sleep(1000l * Math.min(i + 1, 10)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("Unable to flush, interrupted while doing another attempt", e);