From 00d8e9783df1fd3337cfccd1c23a45cadfbadae2 Mon Sep 17 00:00:00 2001 From: daizihao Date: Tue, 13 Oct 2020 20:23:27 +0800 Subject: [PATCH] bug fix and write speed optimization --- .../writer/kudu11xwriter/Kudu11xHelper.java | 4 +- .../writer/kudu11xwriter/KuduWriterTask.java | 101 +++++++++--------- 2 files changed, 55 insertions(+), 50 deletions(-) diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java index 71686c22..cf1b0f8f 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java @@ -111,7 +111,7 @@ public class Kudu11xHelper { } catch (Exception e) { throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e); } finally { - AtomicInteger i = new AtomicInteger(5); + AtomicInteger i = new AtomicInteger(10); while (i.get() > 0) { try { if (kuduClient.isCreateTableDone(tableName)) { @@ -124,7 +124,7 @@ public class Kudu11xHelper { } catch (KuduException e) { LOG.info("Wait for the table to be created..... " + i); try { - Thread.sleep(1000L); + Thread.sleep(100L); } catch (InterruptedException ex) { ex.printStackTrace(); } diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java index c4b90b8c..bff3509f 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -94,61 +94,66 @@ public class KuduWriterTask { //增量更新 row = insert.getRow(); } - + List> futures = new ArrayList<>(); for (List columnList : columnLists) { Record finalRecord = record; - pool.submit(()->{ - - for (Configuration col : columnList) { - - String name = col.getString(Key.NAME); - ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string")); - Column column = finalRecord.getColumn(col.getInt(Key.INDEX)); - String rawData = column.asString(); - if (rawData == null) { - synchronized (lock) { - row.setNull(name); + Future future = pool.submit(() -> { + try { + for (Configuration col : columnList) { + String name = col.getString(Key.NAME); + ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string")); + Column column = finalRecord.getColumn(col.getInt(Key.INDEX)); + String rawData = column.asString(); + if (rawData == null) { + synchronized (lock) { + row.setNull(name); + } + continue; + } + switch (type) { + case INT: + synchronized (lock) { + row.addInt(name, Integer.parseInt(rawData)); + } + break; + case LONG: + case BIGINT: + synchronized (lock) { + row.addLong(name, Long.parseLong(rawData)); + } + break; + case FLOAT: + synchronized (lock) { + row.addFloat(name, Float.parseFloat(rawData)); + } + break; + case DOUBLE: + synchronized (lock) { + row.addDouble(name, Double.parseDouble(rawData)); + } + break; + case BOOLEAN: + synchronized (lock) { + row.addBoolean(name, Boolean.getBoolean(rawData)); + } + break; + case STRING: + default: + synchronized (lock) { + row.addString(name, rawData); + } } - continue; - } - switch (type) { - case INT: - synchronized (lock) { - row.addInt(name, Integer.parseInt(rawData)); - } - break; - case LONG: - case BIGINT: - synchronized (lock) { - row.addLong(name, Long.parseLong(rawData)); - } - break; - case FLOAT: - synchronized (lock) { - row.addFloat(name, Float.parseFloat(rawData)); - } - break; - case DOUBLE: - synchronized (lock) { - row.addDouble(name, Double.parseDouble(rawData)); - } - break; - case BOOLEAN: - synchronized (lock) { - row.addBoolean(name, Boolean.getBoolean(rawData)); - } - break; - case STRING: - default: - synchronized (lock) { - row.addString(name, rawData); - } } + } finally { + countDownLatch.countDown(); } - countDownLatch.countDown(); }); + futures.add(future); } countDownLatch.await(); + for (Future future : futures) { + future.get(); + } try { RetryUtil.executeWithRetry(() -> { if (isUpsert) { @@ -173,7 +178,7 @@ public class KuduWriterTask { LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !"); taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } else { - throw e; + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); } } }