Merge pull request #2041 from caoliang-web/doriswriter

Fix multiple concurrent import label duplication issues
This commit is contained in:
Trafalgar 2024-01-11 14:13:39 +08:00 committed by GitHub
commit 371476f97b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,6 +22,7 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -97,7 +98,7 @@ public class DorisStreamLoadObserver {
"could not get the final state of label[%s].\n", label), null); "could not get the final state of label[%s].\n", label), null);
} }
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity)); Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
String labelState = (String)result.get("state"); String labelState = (String)result.get("data");
if (null == labelState) { if (null == labelState) {
throw new IOException(String.format("Failed to flush data to Doris, Error " + throw new IOException(String.format("Failed to flush data to Doris, Error " +
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
@ -210,12 +211,10 @@ public class DorisStreamLoadObserver {
private String getLoadHost() { private String getLoadHost() {
List<String> hostList = options.getLoadUrlList(); List<String> hostList = options.getLoadUrlList();
long tmp = pos + hostList.size(); Collections.shuffle(hostList);
for (; pos < tmp; pos++) { String host = new StringBuilder("http://").append(hostList.get((0))).toString();
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); if (checkConnection(host)){
if (checkConnection(host)) { return host;
return host;
}
} }
return null; return null;
} }