diff --git a/kuduwriter/README.md b/kuduwriter/README.md
new file mode 100644
index 00000000..f53de1b5
--- /dev/null
+++ b/kuduwriter/README.md
@@ -0,0 +1,6 @@
+# datax-kudu-plugin
+datax kudu的writer插件
+
+
+
+仅在kudu11进行过测试
diff --git a/kuduwriter/pom.xml b/kuduwriter/pom.xml
new file mode 100644
index 00000000..04b5ef53
--- /dev/null
+++ b/kuduwriter/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ kuduwriter
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.apache.kudu
+ kudu-client
+ 1.11.1
+
+
+ junit
+ junit
+ 4.13
+ test
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-service-face
+
+
+ test
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/kuduwriter/src/main/assembly/package.xml b/kuduwriter/src/main/assembly/package.xml
new file mode 100644
index 00000000..5b1a10a7
--- /dev/null
+++ b/kuduwriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/kudu11xwriter
+
+
+ target/
+
+ kudu11xwriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/kudu11xwriter
+
+
+
+
+
+ false
+ plugin/writer/kudu11xwriter/libs
+ runtime
+
+
+
\ No newline at end of file
diff --git a/kuduwriter/src/main/doc/kuduwriter.md b/kuduwriter/src/main/doc/kuduwriter.md
new file mode 100644
index 00000000..44bef235
--- /dev/null
+++ b/kuduwriter/src/main/doc/kuduwriter.md
@@ -0,0 +1,143 @@
+# datax-kudu-plugin
+datax kudu的writer插件
+
+
+
+eg:
+
+```json
+{
+ "name": "kudu11xwriter",
+ "parameter": {
+ "kuduConfig": {
+ "kudu.master_addresses": "***",
+ "timeout": 60000,
+ "sessionTimeout": 60000
+
+ },
+ "table": "",
+ "replicaCount": 3,
+ "truncate": false,
+ "writeMode": "upsert",
+ "partition": {
+ "range": {
+ "column1": [
+ {
+ "lower": "2020-08-25",
+ "upper": "2020-08-26"
+ },
+ {
+ "lower": "2020-08-26",
+ "upper": "2020-08-27"
+ },
+ {
+ "lower": "2020-08-27",
+ "upper": "2020-08-28"
+ }
+ ]
+ },
+ "hash": {
+ "column": [
+ "column1"
+ ],
+ "number": 3
+ }
+ },
+ "column": [
+ {
+ "index": 0,
+ "name": "c1",
+ "type": "string",
+ "primaryKey": true
+ },
+ {
+ "index": 1,
+ "name": "c2",
+ "type": "string",
+ "compress": "DEFAULT_COMPRESSION",
+ "encoding": "AUTO_ENCODING",
+ "comment": "注解xxxx"
+ }
+ ],
+ "batchSize": 1024,
+ "bufferSize": 2048,
+ "skipFail": false,
+ "encoding": "UTF-8"
+ }
+}
+```
+
+必须参数:
+
+```json
+ "writer": {
+ "name": "kudu11xwriter",
+ "parameter": {
+ "kuduConfig": {
+ "kudu.master_addresses": "***"
+ },
+ "table": "***",
+ "column": [
+ {
+ "name": "c1",
+ "type": "string",
+ "primaryKey": true
+ },
+ {
+ "name": "c2",
+ "type": "string",
+ },
+ {
+ "name": "c3",
+ "type": "string"
+ },
+ {
+ "name": "c4",
+ "type": "string"
+ }
+ ]
+ }
+ }
+```
+
+主键列请写到最前面
+
+
+
+
+
+##### 配置列表
+
+| name | default | description | 是否必须 |
+| -------------- | ------------------- | ------------------------------------------------------------ | -------- |
+| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 |
+| table | | 导入目标表名 | 是 |
+| partition | | 分区 | 否 |
+| column | | 列 | 是 |
+| name | | 列名 | 是 |
+| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 |
+| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 |
+| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 |
+| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 |
+| encoding | AUTO_ENCODING | 编码 | 否 |
+| replicaCount | 3 | 保留副本个数 | 否 |
+| hash | | hash分区 | 否 |
+| number | 3 | hash分区个数 | 否 |
+| range | | range分区 | 否 |
+| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 |
+| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 |
+| truncate | false | 是否清空表,本质上是删表重建 | 否 |
+| writeMode | upsert | upsert,insert,update | 否 |
+| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 |
+| bufferSize | 3072 | 缓冲区大小 | 否 |
+| skipFail | false | 是否跳过插入不成功的数据 | 否 |
+| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 |
+| sessionTimeout | 60000 | session超时时间 单位:ms | 否 |
+
+
+
+
+
+
+
+
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java
new file mode 100644
index 00000000..ebd6ea79
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java
@@ -0,0 +1,37 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.exception.DataXException;
+
+import java.util.Arrays;
+
+/**
+ * @author daizihao
+ * @create 2020-08-31 19:12
+ **/
+public enum ColumnType {
+ INT("int"),
+ FLOAT("float"),
+ STRING("string"),
+ BIGINT("bigint"),
+ DOUBLE("double"),
+ BOOLEAN("boolean"),
+ LONG("long");
+ private String mode;
+ ColumnType(String mode) {
+ this.mode = mode.toLowerCase();
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public static ColumnType getByTypeName(String modeName) {
+ for (ColumnType modeType : values()) {
+ if (modeType.mode.equalsIgnoreCase(modeName)) {
+ return modeType;
+ }
+ }
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
+ String.format("Kuduwriter does not support the type:%s, currently supported types are:%s", modeName, Arrays.asList(values())));
+ }
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java
new file mode 100644
index 00000000..2710e350
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java
@@ -0,0 +1,21 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+/**
+ * @author daizihao
+ * @create 2020-08-31 14:42
+ **/
+public class Constant {
+ public static final String DEFAULT_ENCODING = "UTF-8";
+// public static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ public static final String COMPRESSION = "DEFAULT_COMPRESSION";
+ public static final String ENCODING = "AUTO_ENCODING";
+ public static final Long ADMIN_TIMEOUTMS = 60000L;
+ public static final Long SESSION_TIMEOUTMS = 60000L;
+
+
+ public static final String INSERT_MODE = "upsert";
+ public static final long DEFAULT_WRITE_BATCH_SIZE = 512L;
+ public static final long DEFAULT_MUTATION_BUFFER_SPACE = 3072L;
+
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java
new file mode 100644
index 00000000..754ca4fc
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java
@@ -0,0 +1,34 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.exception.DataXException;
+
+import java.util.Arrays;
+
+/**
+ * @author daizihao
+ * @create 2020-08-31 14:47
+ **/
+public enum InsertModeType {
+ Insert("insert"),
+ Upsert("upsert"),
+ Update("update");
+ private String mode;
+
+ InsertModeType(String mode) {
+ this.mode = mode.toLowerCase();
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public static InsertModeType getByTypeName(String modeName) {
+ for (InsertModeType modeType : values()) {
+ if (modeType.mode.equalsIgnoreCase(modeName)) {
+ return modeType;
+ }
+ }
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
+ String.format("Kuduwriter does not support the mode :[%s], currently supported mode types are :%s", modeName, Arrays.asList(values())));
+ }
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java
new file mode 100644
index 00000000..7e5755aa
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java
@@ -0,0 +1,45 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+/**
+ * @author daizihao
+ * @create 2020-08-31 14:17
+ **/
+public class Key {
+ public final static String KUDU_CONFIG = "kuduConfig";
+ public final static String KUDU_MASTER = "kudu.master_addresses";
+ public final static String KUDU_ADMIN_TIMEOUT = "timeout";
+ public final static String KUDU_SESSION_TIMEOUT = "sessionTimeout";
+
+ public final static String TABLE = "table";
+ public final static String PARTITION = "partition";
+ public final static String COLUMN = "column";
+
+ public static final String NAME = "name";
+ public static final String TYPE = "type";
+ public static final String INDEX = "index";
+ public static final String PRIMARYKEY = "primaryKey";
+ public static final String COMPRESSION = "compress";
+ public static final String COMMENT = "comment";
+ public final static String ENCODING = "encoding";
+
+
+
+ public static final String NUM_REPLICAS = "replicaCount";
+ public static final String HASH = "hash";
+ public static final String HASH_NUM = "number";
+
+ public static final String RANGE = "range";
+ public static final String LOWER = "lower";
+ public static final String UPPER = "upper";
+
+
+
+ public static final String TRUNCATE = "truncate";
+
+ public static final String INSERT_MODE = "writeMode";
+
+ public static final String WRITE_BATCH_SIZE = "batchSize";
+
+ public static final String MUTATION_BUFFER_SPACE = "bufferSize";
+ public static final String SKIP_FAIL = "skipFail";
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java
new file mode 100644
index 00000000..5dc9b8ca
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java
@@ -0,0 +1,291 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author daizihao
+ * @create 2020-08-27 18:30
+ **/
+public class Kudu11xHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Kudu11xHelper.class);
+
+ public static Map getKuduConfiguration(String kuduConfig) {
+ if (StringUtils.isBlank(kuduConfig)) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE,
+ "Connection configuration information required.");
+ }
+ Map kConfiguration;
+ try {
+ kConfiguration = JSON.parseObject(kuduConfig, HashMap.class);
+ Validate.isTrue(kConfiguration != null, "kuduConfig is null!");
+ kConfiguration.put(Key.KUDU_ADMIN_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_ADMIN_TIMEOUT, Constant.ADMIN_TIMEOUTMS));
+ kConfiguration.put(Key.KUDU_SESSION_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_SESSION_TIMEOUT, Constant.SESSION_TIMEOUTMS));
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
+ }
+
+ return kConfiguration;
+ }
+
+ public static KuduClient getKuduClient(String kuduConfig) {
+ Map conf = Kudu11xHelper.getKuduConfiguration(kuduConfig);
+ KuduClient kuduClient = null;
+ try {
+ String masterAddress = (String)conf.get(Key.KUDU_MASTER);
+ kuduClient = new KuduClient.KuduClientBuilder(masterAddress)
+ .defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT))
+ .defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT))
+ .build();
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
+ }
+ return kuduClient;
+ }
+
+ public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration configuration, KuduClient kuduClient) {
+ String tableName = configuration.getString(Key.TABLE);
+
+ KuduTable table = null;
+ try {
+ if (kuduClient.tableExists(tableName)) {
+ table = kuduClient.openTable(tableName);
+ } else {
+ synchronized (Kudu11xHelper.class) {
+ if (!kuduClient.tableExists(tableName)) {
+ Schema schema = Kudu11xHelper.getSchema(configuration);
+ CreateTableOptions tableOptions = new CreateTableOptions();
+
+ Kudu11xHelper.setTablePartition(configuration, tableOptions, schema);
+ //副本数
+ Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3);
+ tableOptions.setNumReplicas(numReplicas);
+ table = kuduClient.createTable(tableName, schema, tableOptions);
+ } else {
+ table = kuduClient.openTable(tableName);
+ }
+ }
+ }
+
+
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_TABLE_ERROR, e);
+ }
+ return table;
+ }
+
+ public static void createTable(com.alibaba.datax.common.util.Configuration configuration) {
+ String tableName = configuration.getString(Key.TABLE);
+ String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
+ KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
+ try {
+ Schema schema = Kudu11xHelper.getSchema(configuration);
+ CreateTableOptions tableOptions = new CreateTableOptions();
+
+ Kudu11xHelper.setTablePartition(configuration, tableOptions, schema);
+ //副本数
+ Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3);
+ tableOptions.setNumReplicas(numReplicas);
+ kuduClient.createTable(tableName, schema, tableOptions);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e);
+ } finally {
+ AtomicInteger i = new AtomicInteger(5);
+ while (i.get()>0) {
+ try {
+ if (kuduClient.isCreateTableDone(tableName)){
+ Kudu11xHelper.closeClient(kuduClient);
+ LOG.info("Table "+ tableName +" is created!");
+ break;
+ }
+ i.decrementAndGet();
+ LOG.error("timeout!");
+ } catch (KuduException e) {
+ LOG.info("Wait for the table to be created..... "+i);
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ i.decrementAndGet();
+ }
+ }
+ try {
+ if (kuduClient != null) {
+ kuduClient.close();
+ }
+ } catch (KuduException e) {
+ LOG.info("Kudu client has been shut down!");
+ }
+ }
+ }
+
+ public static boolean isTableExists(com.alibaba.datax.common.util.Configuration configuration) {
+ String tableName = configuration.getString(Key.TABLE);
+ String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
+ KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
+ try {
+ return kuduClient.tableExists(tableName);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
+ } finally {
+ Kudu11xHelper.closeClient(kuduClient);
+ }
+ }
+
+ public static void closeClient(KuduClient kuduClient) {
+ try {
+ if (kuduClient != null) {
+ kuduClient.close();
+ }
+ } catch (KuduException e) {
+ LOG.warn("kudu client is not gracefully closed !");
+
+ }
+
+ }
+
+ public static Schema getSchema(com.alibaba.datax.common.util.Configuration configuration) {
+ List columns = configuration.getListConfiguration(Key.COLUMN);
+ List columnSchemas = new ArrayList<>();
+ Schema schema = null;
+ if (columns == null || columns.isEmpty()) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, "column is not defined,eg:column:[{\"name\": \"cf0:column0\",\"type\": \"string\"},{\"name\": \"cf1:column1\",\"type\": \"long\"}]");
+ }
+ try {
+ for (Configuration column : columns) {
+
+ String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ||
+ "LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
+ "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())?
+ "INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase();
+ String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE);
+ Boolean key = column.getBool(Key.PRIMARYKEY, false);
+ String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase();
+ String compression = column.getString(Key.COMPRESSION, Constant.COMPRESSION).toUpperCase();
+ String comment = column.getString(Key.COMMENT, "");
+
+ columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder(name, Type.getTypeForName(type))
+ .key(key)
+ .encoding(ColumnSchema.Encoding.valueOf(encoding))
+ .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.valueOf(compression))
+ .comment(comment)
+ .build());
+ }
+ schema = new Schema(columnSchemas);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, e);
+ }
+ return schema;
+ }
+
+ public static Integer getPrimaryKeyIndexUntil(List columns){
+ int i = 0;
+ while ( i < columns.size() ) {
+ Configuration col = columns.get(i);
+ if (!col.getBool(Key.PRIMARYKEY, false)) {
+ break;
+ }
+ i++;
+ }
+ return i;
+ }
+
+ public static void setTablePartition(com.alibaba.datax.common.util.Configuration configuration,
+ CreateTableOptions tableOptions,
+ Schema schema) {
+ Configuration partition = configuration.getConfiguration(Key.PARTITION);
+ if (partition == null) {
+ ColumnSchema columnSchema = schema.getColumns().get(0);
+ tableOptions.addHashPartitions(Collections.singletonList(columnSchema.getName()), 3);
+ return;
+ }
+ //range分区
+ Configuration range = partition.getConfiguration(Key.RANGE);
+ if (range != null) {
+ List rangeColums = new ArrayList<>(range.getKeys());
+ tableOptions.setRangePartitionColumns(rangeColums);
+ for (String rangeColum : rangeColums) {
+ List lowerAndUppers = range.getListConfiguration(rangeColum);
+ for (Configuration lowerAndUpper : lowerAndUppers) {
+ PartialRow lower = schema.newPartialRow();
+ lower.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.LOWER, Kudu11xWriterErrorcode.REQUIRED_VALUE));
+ PartialRow upper = schema.newPartialRow();
+ upper.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.UPPER, Kudu11xWriterErrorcode.REQUIRED_VALUE));
+ tableOptions.addRangePartition(lower, upper);
+ }
+ }
+ LOG.info("Set range partition complete!");
+ }
+
+ // 设置Hash分区
+ Configuration hash = partition.getConfiguration(Key.HASH);
+ if (hash != null) {
+ List hashColums = hash.getList(Key.COLUMN, String.class);
+ Integer hashPartitionNum = configuration.getInt(Key.HASH_NUM, 3);
+ tableOptions.addHashPartitions(hashColums, hashPartitionNum);
+ LOG.info("Set hash partition complete!");
+ }
+ }
+
+ public static void validateParameter(com.alibaba.datax.common.util.Configuration configuration) {
+ configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
+ configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
+ String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
+ if (!Charset.isSupported(encoding)) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
+ String.format("Encoding is not supported:[%s] .", encoding));
+ }
+ configuration.set(Key.ENCODING, encoding);
+ String insertMode = configuration.getString(Key.INSERT_MODE, Constant.INSERT_MODE);
+ try {
+ InsertModeType.getByTypeName(insertMode);
+ } catch (Exception e) {
+ insertMode = Constant.INSERT_MODE;
+ }
+ configuration.set(Key.INSERT_MODE, insertMode);
+
+ Long writeBufferSize = configuration.getLong(Key.WRITE_BATCH_SIZE, Constant.DEFAULT_WRITE_BATCH_SIZE);
+ configuration.set(Key.WRITE_BATCH_SIZE, writeBufferSize);
+
+ Long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE, Constant.DEFAULT_MUTATION_BUFFER_SPACE);
+ configuration.set(Key.MUTATION_BUFFER_SPACE, mutationBufferSpace);
+
+ Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
+ configuration.set(Key.SKIP_FAIL, isSkipFail);
+ }
+
+ public static void truncateTable(Configuration configuration) {
+ String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
+ String userTable = configuration.getString(Key.TABLE);
+ LOG.info(String.format("Because you have configured truncate is true,KuduWriter begins to truncate table %s .", userTable));
+ KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
+
+ try {
+ if (kuduClient.tableExists(userTable)) {
+ kuduClient.deleteTable(userTable);
+ LOG.info(String.format("table %s has been deleted.", userTable));
+ }
+ } catch (KuduException e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.DELETE_KUDU_ERROR, e);
+ } finally {
+ Kudu11xHelper.closeClient(kuduClient);
+ }
+
+ }
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java
new file mode 100644
index 00000000..9447a6c2
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java
@@ -0,0 +1,85 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author daizihao
+ * @create 2020-08-27 16:58
+ **/
+public class Kudu11xWriter extends Writer {
+ public static class Job extends Writer.Job{
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+ private Configuration config = null;
+ @Override
+ public void init() {
+ this.config = this.getPluginJobConf();
+ Kudu11xHelper.validateParameter(this.config);
+ }
+
+ @Override
+ public void prepare() {
+ Boolean truncate = config.getBool(Key.TRUNCATE,false);
+ if(truncate){
+ Kudu11xHelper.truncateTable(this.config);
+ }
+
+ if (!Kudu11xHelper.isTableExists(config)){
+ Kudu11xHelper.createTable(config);
+ }
+ }
+
+ @Override
+ public List split(int i) {
+ List splitResultConfigs = new ArrayList();
+ for (int j = 0; j < i; j++) {
+ splitResultConfigs.add(config.clone());
+ }
+
+ return splitResultConfigs;
+ }
+
+
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+
+ public static class Task extends Writer.Task{
+ private Configuration taskConfig;
+ private KuduWriterTask kuduTaskProxy;
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+ @Override
+ public void init() {
+ this.taskConfig = super.getPluginJobConf();
+ this.kuduTaskProxy = new KuduWriterTask(this.taskConfig);
+ }
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ this.kuduTaskProxy.startWriter(lineReceiver,super.getTaskPluginCollector());
+ }
+
+
+ @Override
+ public void destroy() {
+ try {
+ if (kuduTaskProxy.session != null) {
+ kuduTaskProxy.session.close();
+ }
+ }catch (Exception e){
+ LOG.warn("kudu session is not gracefully closed !");
+ }
+ Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);
+
+ }
+ }
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java
new file mode 100644
index 00000000..694f97a2
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java
@@ -0,0 +1,39 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * @author daizihao
+ * @create 2020-08-27 19:25
+ **/
+public enum Kudu11xWriterErrorcode implements ErrorCode {
+ REQUIRED_VALUE("Kuduwriter-00", "You are missing a required parameter value."),
+ ILLEGAL_VALUE("Kuduwriter-01", "You fill in the parameter values are not legitimate."),
+ GET_KUDU_CONNECTION_ERROR("Kuduwriter-02", "Error getting Kudu connection."),
+ GET_KUDU_TABLE_ERROR("Kuduwriter-03", "Error getting Kudu table."),
+ CLOSE_KUDU_CONNECTION_ERROR("Kuduwriter-04", "Error closing Kudu connection."),
+ CLOSE_KUDU_SESSION_ERROR("Kuduwriter-06", "Error closing Kudu table connection."),
+ PUT_KUDU_ERROR("Kuduwriter-07", "IO exception occurred when writing to Kudu."),
+ DELETE_KUDU_ERROR("Kuduwriter-08", "An exception occurred while delete Kudu table."),
+ GREATE_KUDU_TABLE_ERROR("Kuduwriter-09", "Error creating Kudu table."),
+ PARAMETER_NUM_ERROR("Kuduwriter-10","The number of parameters does not match.")
+ ;
+
+ private final String code;
+ private final String description;
+
+
+ Kudu11xWriterErrorcode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+ @Override
+ public String getCode() {
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+}
diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java
new file mode 100644
index 00000000..e8e83896
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java
@@ -0,0 +1,180 @@
+package com.q1.datax.plugin.writer.kudu11xwriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.util.RetryUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author daizihao
+ * @create 2020-08-31 16:55
+ **/
+public class KuduWriterTask {
+ private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
+
+ public List columns;
+ public String encoding;
+ public String insertMode;
+ public Double batchSize;
+ public long mutationBufferSpace;
+ public Boolean isUpsert;
+ public Boolean isSkipFail;
+
+ public KuduClient kuduClient;
+ public KuduTable table;
+ public KuduSession session;
+ private Integer primaryKeyIndexUntil;
+
+
+ public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) {
+ this.columns = configuration.getListConfiguration(Key.COLUMN);
+ this.encoding = configuration.getString(Key.ENCODING);
+ this.insertMode = configuration.getString(Key.INSERT_MODE);
+ this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
+ this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
+ this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
+
+ this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
+ this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
+ this.session = kuduClient.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+ session.setMutationBufferSpace((int) mutationBufferSpace);
+ this.primaryKeyIndexUntil = Kudu11xHelper.getPrimaryKeyIndexUntil(columns);
+// tableName = configuration.getString(Key.TABLE);
+ }
+
+ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
+ Record record;
+ AtomicLong counter = new AtomicLong(0L);
+ try {
+ while ((record = lineReceiver.getFromReader()) != null) {
+ if (record.getColumnNumber() != columns.size()) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.PARAMETER_NUM_ERROR, " number of record fields:" + record.getColumnNumber() + " number of configuration fields:" + columns.size());
+ }
+ boolean isDirtyRecord = false;
+
+
+ for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) {
+ Column column = record.getColumn(i);
+ isDirtyRecord = StringUtils.isBlank(column.asString());
+ }
+
+ if (isDirtyRecord) {
+ taskPluginCollector.collectDirtyRecord(record, "primarykey field is null");
+ continue;
+ }
+
+ Upsert upsert = table.newUpsert();
+ Insert insert = table.newInsert();
+
+ for (int i = 0; i < columns.size(); i++) {
+ PartialRow row;
+ if (isUpsert) {
+ //覆盖更新
+ row = upsert.getRow();
+ } else {
+ //增量更新
+ row = insert.getRow();
+ }
+ Configuration col = columns.get(i);
+ String name = col.getString(Key.NAME);
+ ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE));
+ Column column = record.getColumn(col.getInt(Key.INDEX, i));
+ Object rawData = column.getRawData();
+ if (rawData == null) {
+ row.setNull(name);
+ continue;
+ }
+ switch (type) {
+ case INT:
+ row.addInt(name, Integer.parseInt(rawData.toString()));
+ break;
+ case LONG:
+ case BIGINT:
+ row.addLong(name, Long.parseLong(rawData.toString()));
+ break;
+ case FLOAT:
+ row.addFloat(name, Float.parseFloat(rawData.toString()));
+ break;
+ case DOUBLE:
+ row.addDouble(name, Double.parseDouble(rawData.toString()));
+ break;
+ case BOOLEAN:
+ row.addBoolean(name, Boolean.getBoolean(rawData.toString()));
+ break;
+ case STRING:
+ default:
+ row.addString(name, rawData.toString());
+ }
+ }
+ try {
+ RetryUtil.executeWithRetry(()->{
+ if (isUpsert) {
+ //覆盖更新
+ session.apply(upsert);
+ } else {
+ //增量更新
+ session.apply(insert);
+ }
+ //提前写数据,阈值可自定义
+ if (counter.incrementAndGet() > batchSize * 0.75) {
+ session.flush();
+ counter.set(0L);
+ }
+ return true;
+ },5,1000L,true);
+
+ } catch (Exception e) {
+ LOG.error("Data write failed!", e);
+ if (isSkipFail) {
+ LOG.warn("Because you have configured skipFail is true,this data will be skipped!");
+ taskPluginCollector.collectDirtyRecord(record, e.getMessage());
+ }else {
+ throw e;
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e);
+ }
+ AtomicInteger i = new AtomicInteger(10);
+ try {
+ while (i.get() > 0) {
+ if (session.hasPendingOperations()) {
+ session.flush();
+ break;
+ }
+ Thread.sleep(1000L);
+ i.decrementAndGet();
+ }
+ } catch (Exception e) {
+ LOG.info("Waiting for data to be inserted...... " + i + "s");
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ i.decrementAndGet();
+ } finally {
+ try {
+ session.flush();
+ } catch (KuduException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+
+}
diff --git a/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java
new file mode 100644
index 00000000..f1499a0f
--- /dev/null
+++ b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java
@@ -0,0 +1,9 @@
+package com.q1.kudu.conf;
+
+/**
+ * @author daizihao
+ * @create 2020-09-16 11:39
+ **/
+public class KuduConfig {
+
+}
diff --git a/kuduwriter/src/main/resources/plugin.json b/kuduwriter/src/main/resources/plugin.json
new file mode 100644
index 00000000..948c7e22
--- /dev/null
+++ b/kuduwriter/src/main/resources/plugin.json
@@ -0,0 +1,7 @@
+{
+ "name": "kudu11xwriter",
+ "class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
+ "description": "use put: prod. mechanism: use kudu java api put data.",
+ "developer": "com.q1.daizihao"
+}
+
diff --git a/kuduwriter/src/main/resources/plugin_job_template.json b/kuduwriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000..d2723098
--- /dev/null
+++ b/kuduwriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,59 @@
+{
+ "name": "kudu11xwriter",
+ "parameter": {
+ "kuduConfig": {
+ "kudu.master_addresses": "***",
+ "timeout": 60000,
+ "sessionTimeout": 60000
+
+ },
+ "table": "",
+ "replicaCount": 3,
+ "truncate": false,
+ "writeMode": "upsert",
+ "partition": {
+ "range": {
+ "column1": [
+ {
+ "lower": "2020-08-25",
+ "upper": "2020-08-26"
+ },
+ {
+ "lower": "2020-08-26",
+ "upper": "2020-08-27"
+ },
+ {
+ "lower": "2020-08-27",
+ "upper": "2020-08-28"
+ }
+ ]
+ },
+ "hash": {
+ "column": [
+ "column1"
+ ],
+ "number": 3
+ }
+ },
+ "column": [
+ {
+ "index": 0,
+ "name": "c1",
+ "type": "string",
+ "primaryKey": true
+ },
+ {
+ "index": 1,
+ "name": "c2",
+ "type": "string",
+ "compress": "DEFAULT_COMPRESSION",
+ "encoding": "AUTO_ENCODING",
+ "comment": "注解xxxx"
+ }
+ ],
+ "batchSize": 1024,
+ "bufferSize": 2048,
+ "skipFail": false,
+ "encoding": "UTF-8"
+ }
+}
\ No newline at end of file
diff --git a/kuduwriter/src/test/java/com/dai/test.java b/kuduwriter/src/test/java/com/dai/test.java
new file mode 100644
index 00000000..ba4ceecd
--- /dev/null
+++ b/kuduwriter/src/test/java/com/dai/test.java
@@ -0,0 +1,23 @@
+package com.dai;
+
+import com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xHelper;
+import org.junit.Test;
+import com.q1.datax.plugin.writer.kudu11xwriter.ColumnType;
+import com.q1.datax.plugin.writer.kudu11xwriter.InsertModeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kudu.client.AsyncKuduClient.LOG;
+
+/**
+ * @author daizihao
+ * @create 2020-08-28 11:03
+ **/
+public class test {
+ @Test
+ public void kuduTypeTest() {
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index b3bf7b9d..5b7f8e48 100755
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
plugin-unstructured-storage-util
hbase20xsqlreader
hbase20xsqlwriter
+ kuduwriter