mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 08:11:10 +08:00
check the label state when label already exists
This commit is contained in:
parent
83242f1705
commit
f5cedbeb32
@ -15,7 +15,7 @@ public class StarRocksWriterOptions implements Serializable {
|
||||
private static final long serialVersionUID = 1l;
|
||||
private static final long KILO_BYTES_SCALE = 1024l;
|
||||
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
|
||||
private static final int MAX_RETRIES = 1;
|
||||
private static final int MAX_RETRIES = 3;
|
||||
private static final int BATCH_ROWS = 500000;
|
||||
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
|
||||
private static final long FLUSH_INTERVAL = 300000;
|
||||
|
@ -15,6 +15,7 @@ public class StarRocksFlushTuple {
|
||||
}
|
||||
|
||||
public String getLabel() { return label; }
|
||||
public void setLabel(String label) { this.label = label; }
|
||||
public Long getBytes() { return bytes; }
|
||||
public List<byte[]> getRows() { return rows; }
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class StarRocksStreamLoadFailedException extends IOException {
|
||||
|
||||
static final long serialVersionUID = 1L;
|
||||
|
||||
private final Map<String, Object> response;
|
||||
private boolean reCreateLabel;
|
||||
|
||||
public StarRocksStreamLoadFailedException(String message, Map<String, Object> response) {
|
||||
super(message);
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
public StarRocksStreamLoadFailedException(String message, Map<String, Object> response, boolean reCreateLabel) {
|
||||
super(message);
|
||||
this.response = response;
|
||||
this.reCreateLabel = reCreateLabel;
|
||||
}
|
||||
|
||||
public Map<String, Object> getFailedResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public boolean needReCreateLabel() {
|
||||
return reCreateLabel;
|
||||
}
|
||||
|
||||
}
|
@ -14,6 +14,7 @@ import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@ -35,6 +37,13 @@ public class StarRocksStreamLoadVisitor {
|
||||
|
||||
private final StarRocksWriterOptions writerOptions;
|
||||
private long pos;
|
||||
private static final String RESULT_FAILED = "Fail";
|
||||
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
|
||||
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
|
||||
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
|
||||
private static final String RESULT_LABEL_PREPARE = "PREPARE";
|
||||
private static final String RESULT_LABEL_ABORTED = "ABORTED";
|
||||
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
|
||||
|
||||
public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) {
|
||||
this.writerOptions = writerOptions;
|
||||
@ -59,10 +68,14 @@ public class StarRocksStreamLoadVisitor {
|
||||
throw new IOException("Unable to flush data to StarRocks: unknown result status.");
|
||||
}
|
||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
||||
if (loadResult.get(keyStatus).equals("Fail")) {
|
||||
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
|
||||
throw new IOException(
|
||||
new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString()
|
||||
);
|
||||
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
|
||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
||||
// has to block-checking the state to get the final result
|
||||
checkLabelState(host, flushData.getLabel());
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,6 +135,52 @@ public class StarRocksStreamLoadVisitor {
|
||||
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkLabelState(String host, String label) throws IOException {
|
||||
int idx = 0;
|
||||
while(true) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
|
||||
} catch (InterruptedException ex) {
|
||||
break;
|
||||
}
|
||||
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
|
||||
HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(writerOptions.getDatabase()).append("/get_load_state?label=").append(label).toString());
|
||||
httpGet.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
|
||||
httpGet.setHeader("Connection", "close");
|
||||
|
||||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
|
||||
HttpEntity respEntity = getHttpEntity(resp);
|
||||
if (respEntity == null) {
|
||||
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
|
||||
"could not get the final state of label[%s].\n", label), null);
|
||||
}
|
||||
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
||||
String labelState = (String)result.get("state");
|
||||
if (null == labelState) {
|
||||
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
|
||||
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
|
||||
}
|
||||
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
|
||||
switch(labelState) {
|
||||
case LAEBL_STATE_VISIBLE:
|
||||
case LAEBL_STATE_COMMITTED:
|
||||
return;
|
||||
case RESULT_LABEL_PREPARE:
|
||||
continue;
|
||||
case RESULT_LABEL_ABORTED:
|
||||
throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " +
|
||||
"label[%s] state[%s]\n", label, labelState), null, true);
|
||||
case RESULT_LABEL_UNKNOWN:
|
||||
default:
|
||||
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
|
||||
"label[%s] state[%s]\n", label, labelState), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
|
||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
||||
@ -150,16 +209,9 @@ public class StarRocksStreamLoadVisitor {
|
||||
httpPut.setEntity(new ByteArrayEntity(data));
|
||||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
||||
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
||||
int code = resp.getStatusLine().getStatusCode();
|
||||
if (200 != code) {
|
||||
LOG.warn("Request failed with code:{}", code);
|
||||
HttpEntity respEntity = getHttpEntity(resp);
|
||||
if (respEntity == null)
|
||||
return null;
|
||||
}
|
||||
HttpEntity respEntity = resp.getEntity();
|
||||
if (null == respEntity) {
|
||||
LOG.warn("Request failed with empty response.");
|
||||
return null;
|
||||
}
|
||||
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
||||
}
|
||||
}
|
||||
@ -171,4 +223,18 @@ public class StarRocksStreamLoadVisitor {
|
||||
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
|
||||
}
|
||||
|
||||
private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
|
||||
int code = resp.getStatusLine().getStatusCode();
|
||||
if (200 != code) {
|
||||
LOG.warn("Request failed with code:{}", code);
|
||||
return null;
|
||||
}
|
||||
HttpEntity respEntity = resp.getEntity();
|
||||
if (null == respEntity) {
|
||||
LOG.warn("Request failed with empty response.");
|
||||
return null;
|
||||
}
|
||||
return respEntity;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ public class StarRocksWriterManager {
|
||||
flushException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
flushThread.setDaemon(true);
|
||||
flushThread.start();
|
||||
@ -167,8 +167,13 @@ public class StarRocksWriterManager {
|
||||
if (i >= writerOptions.getMaxRetries()) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException)e).needReCreateLabel()) {
|
||||
String newLabel = createBatchLabel();
|
||||
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
|
||||
flushData.setLabel(newLabel);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000l * (i + 1));
|
||||
Thread.sleep(1000l * Math.min(i + 1, 10));
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
|
||||
|
Loading…
Reference in New Issue
Block a user