update selectdbwriter plugin

This commit is contained in:
caoliang 2022-12-06 18:17:52 +08:00
parent 8f70622ef1
commit 921d96b621
4 changed files with 14 additions and 11 deletions

View File

@ -349,7 +349,7 @@ c.再次尝试编译。
* **maxBatchRows** * **maxBatchRows**
- 描述:每批次导入数据的最大行数。和 **maxBatchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。 - 描述:每批次导入数据的最大行数。和 **batchSize** 共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
- 必选:否 - 必选:否
- 默认值500000 - 默认值500000

View File

@ -75,7 +75,7 @@
} }
], ],
"maxBatchRows":200000, "maxBatchRows":200000,
"maxBatchByteSize":53687091200 "batchSize":53687091200
} }
} }
} }

View File

@ -18,14 +18,14 @@ public class DelimiterParser {
String hexStr = sp.substring(2); String hexStr = sp.substring(2);
// check hex str // check hex str
if (hexStr.isEmpty()) { if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); throw new RuntimeException("Failed to parse delimiter: Hex str is empty");
} }
if (hexStr.length() % 2 != 0) { if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); throw new RuntimeException("Failed to parse delimiter: Hex str length error");
} }
for (char hexChar : hexStr.toUpperCase().toCharArray()) { for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) { if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); throw new RuntimeException("Failed to parse delimiter: Hex str format error");
} }
} }
// transform to separator // transform to separator

View File

@ -76,11 +76,13 @@ public class SelectdbWriterManager {
batchSize += bts.length; batchSize += bts.length;
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) { if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
String label = createBatchLabel(); String label = createBatchLabel();
LOG.debug(String.format("buffer Sinking triggered: rows[%d] label [%s].", batchCount, label)); if(LOG.isDebugEnabled()){
LOG.debug(String.format("buffer Sinking triggered: rows[%d] label [%s].", batchCount, label));
}
flush(label, false); flush(label, false);
} }
} catch (Exception e) { } catch (Exception e) {
throw new SelectdbWriterException("Writing records to Doris failed.", e); throw new SelectdbWriterException("Writing records to selectdb failed.", e);
} }
} }
@ -107,13 +109,14 @@ public class SelectdbWriterManager {
closed = true; closed = true;
try { try {
String label = createBatchLabel(); String label = createBatchLabel();
if (batchCount > 0) if (batchCount > 0) {
LOG.debug(String.format("Selectdb Sink is about to close: label[%s].", label)); if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Selectdb Sink is about to close: label[%s].", label));
}
}
flush(label, true); flush(label, true);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Writing records to Selectdb failed.", e); throw new RuntimeException("Writing records to Selectdb failed.", e);
} finally {
this.visitor.close();
} }
} }
checkFlushException(); checkFlushException();