add label-prefix option

This commit is contained in:
hffariel 2022-02-14 18:49:25 +08:00
parent f5cedbeb32
commit a1bf9baa3e
2 changed files with 16 additions and 1 deletions

View File

@ -33,6 +33,7 @@ public class StarRocksWriterOptions implements Serializable {
private static final String KEY_PRE_SQL = "preSql";
private static final String KEY_POST_SQL = "postSql";
private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_LABEL_PREFIX = "labelPrefix";
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
private static final String KEY_FLUSH_INTERVAL = "flushInterval";
@ -78,6 +79,10 @@ public class StarRocksWriterOptions implements Serializable {
return options.getString(KEY_PASSWORD);
}
public String getLabelPrefix() {
return options.getString(KEY_LABEL_PREFIX);
}
public List<String> getLoadUrlList() {
return options.getList(KEY_LOAD_URL, String.class);
}

View File

@ -24,6 +24,7 @@ public class StarRocksWriterManager {
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private final StarRocksWriterOptions writerOptions;
private static final String UNDERSCORE = "_";
private final List<byte[]> buffer = new ArrayList<>();
private int batchCount = 0;
@ -120,7 +121,16 @@ public class StarRocksWriterManager {
}
public String createBatchLabel() {
return UUID.randomUUID().toString();
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(writerOptions.getLabelPrefix())) {
sb.append(writerOptions.getLabelPrefix()).append(UNDERSCORE);
}
return sb.append(writerOptions.getDatabase())
.append(UNDERSCORE)
.append(writerOptions.getTable())
.append(UNDERSCORE)
.append(UUID.randomUUID().toString())
.toString();
}
private void startAsyncFlushing() {