diff --git a/kuduwriter/doc/image-20200901193148188.png b/kuduwriter/doc/image-20200901193148188.png new file mode 100644 index 00000000..7e7b8a1f Binary files /dev/null and b/kuduwriter/doc/image-20200901193148188.png differ diff --git a/kuduwriter/doc/kuduwirter.md b/kuduwriter/doc/kuduwirter.md new file mode 100644 index 00000000..1a952449 --- /dev/null +++ b/kuduwriter/doc/kuduwirter.md @@ -0,0 +1,143 @@ +# datax-kudu-plugins +datax kudu的writer插件 + + + +eg: + +```json +{ + "name": "kuduwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***", + "timeout": 60000, + "sessionTimeout": 60000 + + }, + "table": "", + "replicaCount": 3, + "truncate": false, + "writeMode": "upsert", + "partition": { + "range": { + "column1": [ + { + "lower": "2020-08-25", + "upper": "2020-08-26" + }, + { + "lower": "2020-08-26", + "upper": "2020-08-27" + }, + { + "lower": "2020-08-27", + "upper": "2020-08-28" + } + ] + }, + "hash": { + "column": [ + "column1" + ], + "number": 3 + } + }, + "column": [ + { + "index": 0, + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "index": 1, + "name": "c2", + "type": "string", + "compress": "DEFAULT_COMPRESSION", + "encoding": "AUTO_ENCODING", + "comment": "注解xxxx" + } + ], + "batchSize": 1024, + "bufferSize": 2048, + "skipFail": false, + "encoding": "UTF-8" + } +} +``` + +必须参数: + +```json + "writer": { + "name": "kuduwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***" + }, + "table": "***", + "column": [ + { + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "name": "c2", + "type": "string", + }, + { + "name": "c3", + "type": "string" + }, + { + "name": "c4", + "type": "string" + } + ] + } + } +``` + +主键列请写到最前面 + + + +![image-20200901193148188](./image-20200901193148188.png) + +##### 配置列表 + +| name | default | description | 是否必须 | +| -------------- | ------------------- | ------------------------------------------------------------ | -------- | +| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 | +| table | | 导入目标表名 | 是 | +| partition | | 分区 | 否 | +| column | | 列 | 是 | +| name | | 列名 | 是 | +| type | string | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 否 | +| index | 升序排列 | 列索引位置(要么全部列都写,要么都不写),如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 | +| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 | +| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 | +| encoding | AUTO_ENCODING | 编码 | 否 | +| replicaCount | 3 | 保留副本个数 | 否 | +| hash | | hash分区 | 否 | +| number | 3 | hash分区个数 | 否 | +| range | | range分区 | 否 | +| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 | +| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 | +| truncate | false | 是否清空表,本质上是删表重建 | 否 | +| writeMode | upsert | upsert,insert,update | 否 | +| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 | +| bufferSize | 3072 | 缓冲区大小 | 否 | +| skipFail | false | 是否跳过插入不成功的数据 | 否 | +| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 | +| sessionTimeout | 60000 | session超时时间 单位:ms | 否 | + + + + + + + + diff --git a/kuduwriter/src/main/assembly/package.xml b/kuduwriter/src/main/assembly/package.xml index 5b1a10a7..c9497b92 100644 --- a/kuduwriter/src/main/assembly/package.xml +++ b/kuduwriter/src/main/assembly/package.xml @@ -14,21 +14,21 @@ plugin.json plugin_job_template.json - plugin/writer/kudu11xwriter + plugin/writer/kuduwriter target/ - kudu11xwriter-0.0.1-SNAPSHOT.jar + kuduwriter-0.0.1-SNAPSHOT.jar - plugin/writer/kudu11xwriter + plugin/writer/kuduwriter false - plugin/writer/kudu11xwriter/libs + plugin/writer/kuduwriter/libs runtime diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java index 10568820..71686c22 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java @@ -10,11 +10,17 @@ import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; +import org.apache.kudu.shaded.org.checkerframework.checker.units.qual.K; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.rmi.runtime.Log; import java.nio.charset.Charset; import java.util.*; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -47,10 +53,10 @@ public class Kudu11xHelper { Map conf = Kudu11xHelper.getKuduConfiguration(kuduConfig); KuduClient kuduClient = null; try { - String masterAddress = (String)conf.get(Key.KUDU_MASTER); + String masterAddress = (String) conf.get(Key.KUDU_MASTER); kuduClient = new KuduClient.KuduClientBuilder(masterAddress) .defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT)) - .defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT)) + .defaultOperationTimeoutMs((Long) conf.get(Key.KUDU_SESSION_TIMEOUT)) .build(); } catch (Exception e) { throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); @@ -106,17 +112,17 @@ public class Kudu11xHelper { throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e); } finally { AtomicInteger i = new AtomicInteger(5); - while (i.get()>0) { + while (i.get() > 0) { try { - if (kuduClient.isCreateTableDone(tableName)){ + if (kuduClient.isCreateTableDone(tableName)) { Kudu11xHelper.closeClient(kuduClient); - LOG.info("Table "+ tableName +" is created!"); + LOG.info("Table " + tableName + " is created!"); break; } i.decrementAndGet(); LOG.error("timeout!"); - } catch (KuduException e) { - LOG.info("Wait for the table to be created..... "+i); + } catch (KuduException e) { + LOG.info("Wait for the table to be created..... " + i); try { Thread.sleep(1000L); } catch (InterruptedException ex) { @@ -135,6 +141,44 @@ public class Kudu11xHelper { } } + public static ThreadPoolExecutor createRowAddThreadPool(int coreSize) { + return new ThreadPoolExecutor(coreSize, + coreSize, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue(), + new ThreadFactory() { + private final ThreadGroup group = System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup(); + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + "pool-kudu_rows_add-thread-" + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public static List> getColumnLists(List columns) { + int quota = 8; + int num = (columns.size() - 1) / quota + 1; + int gap = columns.size() / num; + List> columnLists = new ArrayList<>(num); + for (int j = 0; j < num - 1; j++) { + List destList = new ArrayList<>(columns.subList(j * gap, (j + 1) * gap)); + columnLists.add(destList); + } + List destList = new ArrayList<>(columns.subList(gap * (num - 1), columns.size())); + columnLists.add(destList); + return columnLists; + } + public static boolean isTableExists(Configuration configuration) { String tableName = configuration.getString(Key.TABLE); String kuduConfig = configuration.getString(Key.KUDU_CONFIG); @@ -154,7 +198,7 @@ public class Kudu11xHelper { kuduClient.close(); } } catch (KuduException e) { - LOG.warn("kudu client is not gracefully closed !"); + LOG.warn("The \"kudu client\" was not stopped gracefully. !"); } @@ -172,8 +216,8 @@ public class Kudu11xHelper { String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) || "LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ? - "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())? - "INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase(); + "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ? + "INT32" : column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase(); String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE); Boolean key = column.getBool(Key.PRIMARYKEY, false); String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase(); @@ -194,9 +238,9 @@ public class Kudu11xHelper { return schema; } - public static Integer getPrimaryKeyIndexUntil(List columns){ + public static Integer getPrimaryKeyIndexUntil(List columns) { int i = 0; - while ( i < columns.size() ) { + while (i < columns.size()) { Configuration col = columns.get(i); if (!col.getBool(Key.PRIMARYKEY, false)) { break; @@ -244,6 +288,7 @@ public class Kudu11xHelper { } public static void validateParameter(Configuration configuration) { + LOG.info("Start validating parameters!"); configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE); configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE); String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); @@ -268,7 +313,39 @@ public class Kudu11xHelper { Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false); configuration.set(Key.SKIP_FAIL, isSkipFail); - LOG.info("==validate parameter complete!"); + List columns = configuration.getListConfiguration(Key.COLUMN); + List goalColumns = new ArrayList<>(); + //column参数验证 + int indexFlag = 0; + boolean primaryKey = true; + int primaryKeyFlag = 0; + for (int i = 0; i < columns.size(); i++) { + Configuration col = columns.get(i); + String index = col.getString(Key.INDEX); + if (index == null) { + index = String.valueOf(i); + col.set(Key.INDEX, index); + indexFlag++; + } + if(primaryKey != col.getBool(Key.PRIMARYKEY, false)){ + primaryKey = col.getBool(Key.PRIMARYKEY, false); + primaryKeyFlag++; + } + goalColumns.add(col); + } + if (indexFlag != 0 && indexFlag != columns.size()) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + "\"index\" either has values for all of them, or all of them are null!"); + } + if (primaryKeyFlag > 1){ + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + "\"primaryKey\" must be written in the front!"); + } + configuration.set(Key.COLUMN, goalColumns); +// LOG.info("------------------------------------"); +// LOG.info(configuration.toString()); +// LOG.info("------------------------------------"); + LOG.info("validate parameter complete!"); } public static void truncateTable(Configuration configuration) { diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java index 9447a6c2..83620f43 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java @@ -38,7 +38,7 @@ public class Kudu11xWriter extends Writer { @Override public List split(int i) { - List splitResultConfigs = new ArrayList(); + List splitResultConfigs = new ArrayList<>(); for (int j = 0; j < i; j++) { splitResultConfigs.add(config.clone()); } @@ -76,7 +76,7 @@ public class Kudu11xWriter extends Writer { kuduTaskProxy.session.close(); } }catch (Exception e){ - LOG.warn("kudu session is not gracefully closed !"); + LOG.warn("The \"kudu session\" was not stopped gracefully !"); } Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient); diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java index 127ee0c1..c4b90b8c 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -12,12 +12,13 @@ import org.apache.kudu.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; /** * @author daizihao @@ -26,28 +27,30 @@ import java.util.concurrent.atomic.AtomicLong; public class KuduWriterTask { private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class); - public List columns; - public String encoding; - public String insertMode; - public Double batchSize; - public long mutationBufferSpace; - public Boolean isUpsert; - public Boolean isSkipFail; - + private List columns; + private List> columnLists; + private ThreadPoolExecutor pool; + private String encoding; + private Double batchSize; + private Boolean isUpsert; + private Boolean isSkipFail; public KuduClient kuduClient; - public KuduTable table; public KuduSession session; + private KuduTable table; private Integer primaryKeyIndexUntil; + private final Object lock = new Object(); public KuduWriterTask(Configuration configuration) { - this.columns = configuration.getListConfiguration(Key.COLUMN); + columns = configuration.getListConfiguration(Key.COLUMN); + columnLists = Kudu11xHelper.getColumnLists(columns); + pool = Kudu11xHelper.createRowAddThreadPool(columnLists.size()); + this.encoding = configuration.getString(Key.ENCODING); - this.insertMode = configuration.getString(Key.INSERT_MODE); this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE); - this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); - this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert"); + this.isUpsert = !configuration.getString(Key.INSERT_MODE).equalsIgnoreCase("insert"); this.isSkipFail = configuration.getBool(Key.SKIP_FAIL); + long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG)); this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient); @@ -59,9 +62,9 @@ public class KuduWriterTask { } public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { - LOG.info("==kuduwriter began to write!"); + LOG.info("kuduwriter began to write!"); Record record; - AtomicLong counter = new AtomicLong(0L); + LongAdder counter = new LongAdder(); try { while ((record = lineReceiver.getFromReader()) != null) { if (record.getColumnNumber() != columns.size()) { @@ -70,7 +73,7 @@ public class KuduWriterTask { boolean isDirtyRecord = false; - for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) { + for (int i = 0; i < primaryKeyIndexUntil && !isDirtyRecord; i++) { Column column = record.getColumn(i); isDirtyRecord = StringUtils.isBlank(column.asString()); } @@ -80,51 +83,74 @@ public class KuduWriterTask { continue; } + CountDownLatch countDownLatch = new CountDownLatch(columnLists.size()); Upsert upsert = table.newUpsert(); Insert insert = table.newInsert(); - - for (int i = 0; i < columns.size(); i++) { - PartialRow row; - if (isUpsert) { - //覆盖更新 - row = upsert.getRow(); - } else { - //增量更新 - row = insert.getRow(); - } - Configuration col = columns.get(i); - String name = col.getString(Key.NAME); - ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE)); - Column column = record.getColumn(col.getInt(Key.INDEX, i)); - Object rawData = column.getRawData(); - if (rawData == null) { - row.setNull(name); - continue; - } - switch (type) { - case INT: - row.addInt(name, Integer.parseInt(rawData.toString())); - break; - case LONG: - case BIGINT: - row.addLong(name, Long.parseLong(rawData.toString())); - break; - case FLOAT: - row.addFloat(name, Float.parseFloat(rawData.toString())); - break; - case DOUBLE: - row.addDouble(name, Double.parseDouble(rawData.toString())); - break; - case BOOLEAN: - row.addBoolean(name, Boolean.getBoolean(rawData.toString())); - break; - case STRING: - default: - row.addString(name, rawData.toString()); - } + PartialRow row; + if (isUpsert) { + //覆盖更新 + row = upsert.getRow(); + } else { + //增量更新 + row = insert.getRow(); } + + for (List columnList : columnLists) { + Record finalRecord = record; + pool.submit(()->{ + + for (Configuration col : columnList) { + + String name = col.getString(Key.NAME); + ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string")); + Column column = finalRecord.getColumn(col.getInt(Key.INDEX)); + String rawData = column.asString(); + if (rawData == null) { + synchronized (lock) { + row.setNull(name); + } + continue; + } + switch (type) { + case INT: + synchronized (lock) { + row.addInt(name, Integer.parseInt(rawData)); + } + break; + case LONG: + case BIGINT: + synchronized (lock) { + row.addLong(name, Long.parseLong(rawData)); + } + break; + case FLOAT: + synchronized (lock) { + row.addFloat(name, Float.parseFloat(rawData)); + } + break; + case DOUBLE: + synchronized (lock) { + row.addDouble(name, Double.parseDouble(rawData)); + } + break; + case BOOLEAN: + synchronized (lock) { + row.addBoolean(name, Boolean.getBoolean(rawData)); + } + break; + case STRING: + default: + synchronized (lock) { + row.addString(name, rawData); + } + } + } + countDownLatch.countDown(); + }); + } + countDownLatch.await(); try { - RetryUtil.executeWithRetry(()->{ + RetryUtil.executeWithRetry(() -> { if (isUpsert) { //覆盖更新 session.apply(upsert); @@ -132,26 +158,27 @@ public class KuduWriterTask { //增量更新 session.apply(insert); } - //提前写数据,阈值可自定义 - if (counter.incrementAndGet() > batchSize * 0.75) { + //flush + if (counter.longValue() > (batchSize * 0.8)) { session.flush(); - counter.set(0L); + counter.reset(); } + counter.increment(); return true; - },5,1000L,true); + }, 5, 500L, true); } catch (Exception e) { - LOG.error("Data write failed!", e); + LOG.error("Record Write Failure!", e); if (isSkipFail) { - LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); + LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !"); taskPluginCollector.collectDirtyRecord(record, e.getMessage()); - }else { + } else { throw e; } } } } catch (Exception e) { - LOG.error("write failed! the task will exit!"); + LOG.error("write failure! the task will exit!"); throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); } AtomicInteger i = new AtomicInteger(10); @@ -161,23 +188,20 @@ public class KuduWriterTask { session.flush(); break; } - Thread.sleep(1000L); + Thread.sleep(20L); i.decrementAndGet(); } } catch (Exception e) { - LOG.info("Waiting for data to be inserted...... " + i + "s"); - try { - Thread.sleep(1000L); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - i.decrementAndGet(); + LOG.info("Waiting for data to be written to kudu...... " + i + "s"); + } finally { try { + pool.shutdown(); + //强制刷写 session.flush(); } catch (KuduException e) { - LOG.error("==kuduwriter flush error! the results may not be complete!"); - e.printStackTrace(); + LOG.error("kuduwriter flush error! The results may be incomplete!"); + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage()); } } diff --git a/kuduwriter/src/main/resources/plugin.json b/kuduwriter/src/main/resources/plugin.json index 948c7e22..f60dc825 100644 --- a/kuduwriter/src/main/resources/plugin.json +++ b/kuduwriter/src/main/resources/plugin.json @@ -1,5 +1,5 @@ { - "name": "kudu11xwriter", + "name": "kuduwriter", "class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter", "description": "use put: prod. mechanism: use kudu java api put data.", "developer": "com.q1.daizihao" diff --git a/kuduwriter/src/main/resources/plugin_job_template.json b/kuduwriter/src/main/resources/plugin_job_template.json index d2723098..3edc6c39 100644 --- a/kuduwriter/src/main/resources/plugin_job_template.json +++ b/kuduwriter/src/main/resources/plugin_job_template.json @@ -1,5 +1,5 @@ { - "name": "kudu11xwriter", + "name": "kuduwriter", "parameter": { "kuduConfig": { "kudu.master_addresses": "***",