bug fix and write speed optimization

This commit is contained in:
daizihao 2020-10-13 20:23:27 +08:00
parent 80860c224d
commit 00d8e9783d
2 changed files with 55 additions and 50 deletions

View File

@ -111,7 +111,7 @@ public class Kudu11xHelper {
} catch (Exception e) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e);
} finally {
AtomicInteger i = new AtomicInteger(5);
AtomicInteger i = new AtomicInteger(10);
while (i.get() > 0) {
try {
if (kuduClient.isCreateTableDone(tableName)) {
@ -124,7 +124,7 @@ public class Kudu11xHelper {
} catch (KuduException e) {
LOG.info("Wait for the table to be created..... " + i);
try {
Thread.sleep(1000L);
Thread.sleep(100L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}

View File

@ -94,13 +94,12 @@ public class KuduWriterTask {
//增量更新
row = insert.getRow();
}
List<Future<?>> futures = new ArrayList<>();
for (List<Configuration> columnList : columnLists) {
Record finalRecord = record;
pool.submit(()->{
Future<?> future = pool.submit(() -> {
try {
for (Configuration col : columnList) {
String name = col.getString(Key.NAME);
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string"));
Column column = finalRecord.getColumn(col.getInt(Key.INDEX));
@ -145,10 +144,16 @@ public class KuduWriterTask {
}
}
}
} finally {
countDownLatch.countDown();
}
});
futures.add(future);
}
countDownLatch.await();
for (Future<?> future : futures) {
future.get();
}
try {
RetryUtil.executeWithRetry(() -> {
if (isUpsert) {
@ -173,7 +178,7 @@ public class KuduWriterTask {
LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
} else {
throw e;
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
}
}
}