From 155279cd7b86f999447f331cada574df61d2f9f6 Mon Sep 17 00:00:00 2001 From: daizihao Date: Mon, 21 Sep 2020 15:02:52 +0800 Subject: [PATCH 1/2] kuduwriter --- kuduwriter/README.md | 143 +++++++++ kuduwriter/pom.xml | 82 +++++ kuduwriter/src/main/assembly/package.xml | 35 +++ .../writer/kudu11xwriter/ColumnType.java | 37 +++ .../plugin/writer/kudu11xwriter/Constant.java | 21 ++ .../writer/kudu11xwriter/InsertModeType.java | 34 ++ .../plugin/writer/kudu11xwriter/Key.java | 45 +++ .../writer/kudu11xwriter/Kudu11xHelper.java | 291 ++++++++++++++++++ .../writer/kudu11xwriter/Kudu11xWriter.java | 85 +++++ .../kudu11xwriter/Kudu11xWriterErrorcode.java | 39 +++ .../writer/kudu11xwriter/KuduWriterTask.java | 180 +++++++++++ .../java/com/q1/kudu/conf/KuduConfig.java | 9 + kuduwriter/src/main/resources/plugin.json | 7 + .../main/resources/plugin_job_template.json | 59 ++++ kuduwriter/src/test/java/com/dai/test.java | 23 ++ pom.xml | 1 + 16 files changed, 1091 insertions(+) create mode 100644 kuduwriter/README.md create mode 100644 kuduwriter/pom.xml create mode 100644 kuduwriter/src/main/assembly/package.xml create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java create mode 100644 kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java create mode 100644 kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java create mode 100644 kuduwriter/src/main/resources/plugin.json create mode 100644 kuduwriter/src/main/resources/plugin_job_template.json create mode 100644 kuduwriter/src/test/java/com/dai/test.java diff --git a/kuduwriter/README.md b/kuduwriter/README.md new file mode 100644 index 00000000..d938825b --- /dev/null +++ b/kuduwriter/README.md @@ -0,0 +1,143 @@ +# datax-kudu-plugins +datax kudu的writer插件 + + + +eg: + +```json +{ + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***", + "timeout": 60000, + "sessionTimeout": 60000 + + }, + "table": "", + "replicaCount": 3, + "truncate": false, + "writeMode": "upsert", + "partition": { + "range": { + "column1": [ + { + "lower": "2020-08-25", + "upper": "2020-08-26" + }, + { + "lower": "2020-08-26", + "upper": "2020-08-27" + }, + { + "lower": "2020-08-27", + "upper": "2020-08-28" + } + ] + }, + "hash": { + "column": [ + "column1" + ], + "number": 3 + } + }, + "column": [ + { + "index": 0, + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "index": 1, + "name": "c2", + "type": "string", + "compress": "DEFAULT_COMPRESSION", + "encoding": "AUTO_ENCODING", + "comment": "注解xxxx" + } + ], + "batchSize": 1024, + "bufferSize": 2048, + "skipFail": false, + "encoding": "UTF-8" + } +} +``` + +必须参数: + +```json + "writer": { + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***" + }, + "table": "***", + "column": [ + { + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "name": "c2", + "type": "string", + }, + { + "name": "c3", + "type": "string" + }, + { + "name": "c4", + "type": "string" + } + ] + } + } +``` + +主键列请写到最前面 + + + +![image-20200901193148188](C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200901193148188.png) + +##### 配置列表 + +| name | default | description | 是否必须 | +| ------------------- | ------------------- | ------------------------------------------------------------ | -------- | +| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 | +| table | | 导入目标表名 | 是 | +| partition | | 分区 | 否 | +| column | | 列 | 是 | +| name | | 列名 | 是 | +| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 | +| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 | +| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 | +| compression | DEFAULT_COMPRESSION | 压缩格式 | 否 | +| encoding | AUTO_ENCODING | 编码 | 否 | +| numReplicas | 3 | 保留副本个数 | 否 | +| hash | | hash分区 | 否 | +| num | 3 | hash分区个数 | 否 | +| range | | range分区 | 否 | +| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 | +| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 | +| truncate | false | 是否清空表,本质上是删表重建 | 否 | +| insertMode | upsert | upsert,insert,update | 否 | +| writeBatchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 | +| mutationBufferSpace | 3072 | 缓冲区大小 | 否 | +| skipFail | false | 是否跳过插入不成功的数据 | 否 | +| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 | +| sessionTimeout | 60000 | session超时时间 单位:ms | 否 | + + + + + + + + diff --git a/kuduwriter/pom.xml b/kuduwriter/pom.xml new file mode 100644 index 00000000..04b5ef53 --- /dev/null +++ b/kuduwriter/pom.xml @@ -0,0 +1,82 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + kuduwriter + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.kudu + kudu-client + 1.11.1 + + + junit + junit + 4.13 + test + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-service-face + + + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + \ No newline at end of file diff --git a/kuduwriter/src/main/assembly/package.xml b/kuduwriter/src/main/assembly/package.xml new file mode 100644 index 00000000..5b1a10a7 --- /dev/null +++ b/kuduwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/kudu11xwriter + + + target/ + + kudu11xwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/kudu11xwriter + + + + + + false + plugin/writer/kudu11xwriter/libs + runtime + + + \ No newline at end of file diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java new file mode 100644 index 00000000..ebd6ea79 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/ColumnType.java @@ -0,0 +1,37 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; + +import java.util.Arrays; + +/** + * @author daizihao + * @create 2020-08-31 19:12 + **/ +public enum ColumnType { + INT("int"), + FLOAT("float"), + STRING("string"), + BIGINT("bigint"), + DOUBLE("double"), + BOOLEAN("boolean"), + LONG("long"); + private String mode; + ColumnType(String mode) { + this.mode = mode.toLowerCase(); + } + + public String getMode() { + return mode; + } + + public static ColumnType getByTypeName(String modeName) { + for (ColumnType modeType : values()) { + if (modeType.mode.equalsIgnoreCase(modeName)) { + return modeType; + } + } + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Kuduwriter does not support the type:%s, currently supported types are:%s", modeName, Arrays.asList(values()))); + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java new file mode 100644 index 00000000..2710e350 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Constant.java @@ -0,0 +1,21 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +/** + * @author daizihao + * @create 2020-08-31 14:42 + **/ +public class Constant { + public static final String DEFAULT_ENCODING = "UTF-8"; +// public static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static final String COMPRESSION = "DEFAULT_COMPRESSION"; + public static final String ENCODING = "AUTO_ENCODING"; + public static final Long ADMIN_TIMEOUTMS = 60000L; + public static final Long SESSION_TIMEOUTMS = 60000L; + + + public static final String INSERT_MODE = "upsert"; + public static final long DEFAULT_WRITE_BATCH_SIZE = 512L; + public static final long DEFAULT_MUTATION_BUFFER_SPACE = 3072L; + +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java new file mode 100644 index 00000000..754ca4fc --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/InsertModeType.java @@ -0,0 +1,34 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; + +import java.util.Arrays; + +/** + * @author daizihao + * @create 2020-08-31 14:47 + **/ +public enum InsertModeType { + Insert("insert"), + Upsert("upsert"), + Update("update"); + private String mode; + + InsertModeType(String mode) { + this.mode = mode.toLowerCase(); + } + + public String getMode() { + return mode; + } + + public static InsertModeType getByTypeName(String modeName) { + for (InsertModeType modeType : values()) { + if (modeType.mode.equalsIgnoreCase(modeName)) { + return modeType; + } + } + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Kuduwriter does not support the mode :[%s], currently supported mode types are :%s", modeName, Arrays.asList(values()))); + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java new file mode 100644 index 00000000..7e5755aa --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Key.java @@ -0,0 +1,45 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +/** + * @author daizihao + * @create 2020-08-31 14:17 + **/ +public class Key { + public final static String KUDU_CONFIG = "kuduConfig"; + public final static String KUDU_MASTER = "kudu.master_addresses"; + public final static String KUDU_ADMIN_TIMEOUT = "timeout"; + public final static String KUDU_SESSION_TIMEOUT = "sessionTimeout"; + + public final static String TABLE = "table"; + public final static String PARTITION = "partition"; + public final static String COLUMN = "column"; + + public static final String NAME = "name"; + public static final String TYPE = "type"; + public static final String INDEX = "index"; + public static final String PRIMARYKEY = "primaryKey"; + public static final String COMPRESSION = "compress"; + public static final String COMMENT = "comment"; + public final static String ENCODING = "encoding"; + + + + public static final String NUM_REPLICAS = "replicaCount"; + public static final String HASH = "hash"; + public static final String HASH_NUM = "number"; + + public static final String RANGE = "range"; + public static final String LOWER = "lower"; + public static final String UPPER = "upper"; + + + + public static final String TRUNCATE = "truncate"; + + public static final String INSERT_MODE = "writeMode"; + + public static final String WRITE_BATCH_SIZE = "batchSize"; + + public static final String MUTATION_BUFFER_SPACE = "bufferSize"; + public static final String SKIP_FAIL = "skipFail"; +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java new file mode 100644 index 00000000..5dc9b8ca --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xHelper.java @@ -0,0 +1,291 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author daizihao + * @create 2020-08-27 18:30 + **/ +public class Kudu11xHelper { + + private static final Logger LOG = LoggerFactory.getLogger(Kudu11xHelper.class); + + public static Map getKuduConfiguration(String kuduConfig) { + if (StringUtils.isBlank(kuduConfig)) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, + "Connection configuration information required."); + } + Map kConfiguration; + try { + kConfiguration = JSON.parseObject(kuduConfig, HashMap.class); + Validate.isTrue(kConfiguration != null, "kuduConfig is null!"); + kConfiguration.put(Key.KUDU_ADMIN_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_ADMIN_TIMEOUT, Constant.ADMIN_TIMEOUTMS)); + kConfiguration.put(Key.KUDU_SESSION_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_SESSION_TIMEOUT, Constant.SESSION_TIMEOUTMS)); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } + + return kConfiguration; + } + + public static KuduClient getKuduClient(String kuduConfig) { + Map conf = Kudu11xHelper.getKuduConfiguration(kuduConfig); + KuduClient kuduClient = null; + try { + String masterAddress = (String)conf.get(Key.KUDU_MASTER); + kuduClient = new KuduClient.KuduClientBuilder(masterAddress) + .defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT)) + .defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT)) + .build(); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } + return kuduClient; + } + + public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration configuration, KuduClient kuduClient) { + String tableName = configuration.getString(Key.TABLE); + + KuduTable table = null; + try { + if (kuduClient.tableExists(tableName)) { + table = kuduClient.openTable(tableName); + } else { + synchronized (Kudu11xHelper.class) { + if (!kuduClient.tableExists(tableName)) { + Schema schema = Kudu11xHelper.getSchema(configuration); + CreateTableOptions tableOptions = new CreateTableOptions(); + + Kudu11xHelper.setTablePartition(configuration, tableOptions, schema); + //副本数 + Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3); + tableOptions.setNumReplicas(numReplicas); + table = kuduClient.createTable(tableName, schema, tableOptions); + } else { + table = kuduClient.openTable(tableName); + } + } + } + + + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_TABLE_ERROR, e); + } + return table; + } + + public static void createTable(com.alibaba.datax.common.util.Configuration configuration) { + String tableName = configuration.getString(Key.TABLE); + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + try { + Schema schema = Kudu11xHelper.getSchema(configuration); + CreateTableOptions tableOptions = new CreateTableOptions(); + + Kudu11xHelper.setTablePartition(configuration, tableOptions, schema); + //副本数 + Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3); + tableOptions.setNumReplicas(numReplicas); + kuduClient.createTable(tableName, schema, tableOptions); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e); + } finally { + AtomicInteger i = new AtomicInteger(5); + while (i.get()>0) { + try { + if (kuduClient.isCreateTableDone(tableName)){ + Kudu11xHelper.closeClient(kuduClient); + LOG.info("Table "+ tableName +" is created!"); + break; + } + i.decrementAndGet(); + LOG.error("timeout!"); + } catch (KuduException e) { + LOG.info("Wait for the table to be created..... "+i); + try { + Thread.sleep(1000L); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + i.decrementAndGet(); + } + } + try { + if (kuduClient != null) { + kuduClient.close(); + } + } catch (KuduException e) { + LOG.info("Kudu client has been shut down!"); + } + } + } + + public static boolean isTableExists(com.alibaba.datax.common.util.Configuration configuration) { + String tableName = configuration.getString(Key.TABLE); + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + try { + return kuduClient.tableExists(tableName); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e); + } finally { + Kudu11xHelper.closeClient(kuduClient); + } + } + + public static void closeClient(KuduClient kuduClient) { + try { + if (kuduClient != null) { + kuduClient.close(); + } + } catch (KuduException e) { + LOG.warn("kudu client is not gracefully closed !"); + + } + + } + + public static Schema getSchema(com.alibaba.datax.common.util.Configuration configuration) { + List columns = configuration.getListConfiguration(Key.COLUMN); + List columnSchemas = new ArrayList<>(); + Schema schema = null; + if (columns == null || columns.isEmpty()) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, "column is not defined,eg:column:[{\"name\": \"cf0:column0\",\"type\": \"string\"},{\"name\": \"cf1:column1\",\"type\": \"long\"}]"); + } + try { + for (Configuration column : columns) { + + String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) || + "LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ? + "INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())? + "INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase(); + String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE); + Boolean key = column.getBool(Key.PRIMARYKEY, false); + String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase(); + String compression = column.getString(Key.COMPRESSION, Constant.COMPRESSION).toUpperCase(); + String comment = column.getString(Key.COMMENT, ""); + + columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder(name, Type.getTypeForName(type)) + .key(key) + .encoding(ColumnSchema.Encoding.valueOf(encoding)) + .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.valueOf(compression)) + .comment(comment) + .build()); + } + schema = new Schema(columnSchemas); + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, e); + } + return schema; + } + + public static Integer getPrimaryKeyIndexUntil(List columns){ + int i = 0; + while ( i < columns.size() ) { + Configuration col = columns.get(i); + if (!col.getBool(Key.PRIMARYKEY, false)) { + break; + } + i++; + } + return i; + } + + public static void setTablePartition(com.alibaba.datax.common.util.Configuration configuration, + CreateTableOptions tableOptions, + Schema schema) { + Configuration partition = configuration.getConfiguration(Key.PARTITION); + if (partition == null) { + ColumnSchema columnSchema = schema.getColumns().get(0); + tableOptions.addHashPartitions(Collections.singletonList(columnSchema.getName()), 3); + return; + } + //range分区 + Configuration range = partition.getConfiguration(Key.RANGE); + if (range != null) { + List rangeColums = new ArrayList<>(range.getKeys()); + tableOptions.setRangePartitionColumns(rangeColums); + for (String rangeColum : rangeColums) { + List lowerAndUppers = range.getListConfiguration(rangeColum); + for (Configuration lowerAndUpper : lowerAndUppers) { + PartialRow lower = schema.newPartialRow(); + lower.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.LOWER, Kudu11xWriterErrorcode.REQUIRED_VALUE)); + PartialRow upper = schema.newPartialRow(); + upper.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.UPPER, Kudu11xWriterErrorcode.REQUIRED_VALUE)); + tableOptions.addRangePartition(lower, upper); + } + } + LOG.info("Set range partition complete!"); + } + + // 设置Hash分区 + Configuration hash = partition.getConfiguration(Key.HASH); + if (hash != null) { + List hashColums = hash.getList(Key.COLUMN, String.class); + Integer hashPartitionNum = configuration.getInt(Key.HASH_NUM, 3); + tableOptions.addHashPartitions(hashColums, hashPartitionNum); + LOG.info("Set hash partition complete!"); + } + } + + public static void validateParameter(com.alibaba.datax.common.util.Configuration configuration) { + configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE); + configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE); + String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); + if (!Charset.isSupported(encoding)) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE, + String.format("Encoding is not supported:[%s] .", encoding)); + } + configuration.set(Key.ENCODING, encoding); + String insertMode = configuration.getString(Key.INSERT_MODE, Constant.INSERT_MODE); + try { + InsertModeType.getByTypeName(insertMode); + } catch (Exception e) { + insertMode = Constant.INSERT_MODE; + } + configuration.set(Key.INSERT_MODE, insertMode); + + Long writeBufferSize = configuration.getLong(Key.WRITE_BATCH_SIZE, Constant.DEFAULT_WRITE_BATCH_SIZE); + configuration.set(Key.WRITE_BATCH_SIZE, writeBufferSize); + + Long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE, Constant.DEFAULT_MUTATION_BUFFER_SPACE); + configuration.set(Key.MUTATION_BUFFER_SPACE, mutationBufferSpace); + + Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false); + configuration.set(Key.SKIP_FAIL, isSkipFail); + } + + public static void truncateTable(Configuration configuration) { + String kuduConfig = configuration.getString(Key.KUDU_CONFIG); + String userTable = configuration.getString(Key.TABLE); + LOG.info(String.format("Because you have configured truncate is true,KuduWriter begins to truncate table %s .", userTable)); + KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig); + + try { + if (kuduClient.tableExists(userTable)) { + kuduClient.deleteTable(userTable); + LOG.info(String.format("table %s has been deleted.", userTable)); + } + } catch (KuduException e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.DELETE_KUDU_ERROR, e); + } finally { + Kudu11xHelper.closeClient(kuduClient); + } + + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java new file mode 100644 index 00000000..9447a6c2 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriter.java @@ -0,0 +1,85 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author daizihao + * @create 2020-08-27 16:58 + **/ +public class Kudu11xWriter extends Writer { + public static class Job extends Writer.Job{ + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + private Configuration config = null; + @Override + public void init() { + this.config = this.getPluginJobConf(); + Kudu11xHelper.validateParameter(this.config); + } + + @Override + public void prepare() { + Boolean truncate = config.getBool(Key.TRUNCATE,false); + if(truncate){ + Kudu11xHelper.truncateTable(this.config); + } + + if (!Kudu11xHelper.isTableExists(config)){ + Kudu11xHelper.createTable(config); + } + } + + @Override + public List split(int i) { + List splitResultConfigs = new ArrayList(); + for (int j = 0; j < i; j++) { + splitResultConfigs.add(config.clone()); + } + + return splitResultConfigs; + } + + + + @Override + public void destroy() { + + } + } + + public static class Task extends Writer.Task{ + private Configuration taskConfig; + private KuduWriterTask kuduTaskProxy; + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + @Override + public void init() { + this.taskConfig = super.getPluginJobConf(); + this.kuduTaskProxy = new KuduWriterTask(this.taskConfig); + } + @Override + public void startWrite(RecordReceiver lineReceiver) { + this.kuduTaskProxy.startWriter(lineReceiver,super.getTaskPluginCollector()); + } + + + @Override + public void destroy() { + try { + if (kuduTaskProxy.session != null) { + kuduTaskProxy.session.close(); + } + }catch (Exception e){ + LOG.warn("kudu session is not gracefully closed !"); + } + Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient); + + } + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java new file mode 100644 index 00000000..694f97a2 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/Kudu11xWriterErrorcode.java @@ -0,0 +1,39 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * @author daizihao + * @create 2020-08-27 19:25 + **/ +public enum Kudu11xWriterErrorcode implements ErrorCode { + REQUIRED_VALUE("Kuduwriter-00", "You are missing a required parameter value."), + ILLEGAL_VALUE("Kuduwriter-01", "You fill in the parameter values are not legitimate."), + GET_KUDU_CONNECTION_ERROR("Kuduwriter-02", "Error getting Kudu connection."), + GET_KUDU_TABLE_ERROR("Kuduwriter-03", "Error getting Kudu table."), + CLOSE_KUDU_CONNECTION_ERROR("Kuduwriter-04", "Error closing Kudu connection."), + CLOSE_KUDU_SESSION_ERROR("Kuduwriter-06", "Error closing Kudu table connection."), + PUT_KUDU_ERROR("Kuduwriter-07", "IO exception occurred when writing to Kudu."), + DELETE_KUDU_ERROR("Kuduwriter-08", "An exception occurred while delete Kudu table."), + GREATE_KUDU_TABLE_ERROR("Kuduwriter-09", "Error creating Kudu table."), + PARAMETER_NUM_ERROR("Kuduwriter-10","The number of parameters does not match.") + ; + + private final String code; + private final String description; + + + Kudu11xWriterErrorcode(String code, String description) { + this.code = code; + this.description = description; + } + @Override + public String getCode() { + return null; + } + + @Override + public String getDescription() { + return null; + } +} diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java new file mode 100644 index 00000000..e8e83896 --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -0,0 +1,180 @@ +package com.q1.datax.plugin.writer.kudu11xwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.RetryUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author daizihao + * @create 2020-08-31 16:55 + **/ +public class KuduWriterTask { + private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class); + + public List columns; + public String encoding; + public String insertMode; + public Double batchSize; + public long mutationBufferSpace; + public Boolean isUpsert; + public Boolean isSkipFail; + + public KuduClient kuduClient; + public KuduTable table; + public KuduSession session; + private Integer primaryKeyIndexUntil; + + + public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) { + this.columns = configuration.getListConfiguration(Key.COLUMN); + this.encoding = configuration.getString(Key.ENCODING); + this.insertMode = configuration.getString(Key.INSERT_MODE); + this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE); + this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE); + this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert"); + + this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG)); + this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient); + this.session = kuduClient.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + session.setMutationBufferSpace((int) mutationBufferSpace); + this.primaryKeyIndexUntil = Kudu11xHelper.getPrimaryKeyIndexUntil(columns); +// tableName = configuration.getString(Key.TABLE); + } + + public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { + Record record; + AtomicLong counter = new AtomicLong(0L); + try { + while ((record = lineReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != columns.size()) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PARAMETER_NUM_ERROR, " number of record fields:" + record.getColumnNumber() + " number of configuration fields:" + columns.size()); + } + boolean isDirtyRecord = false; + + + for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) { + Column column = record.getColumn(i); + isDirtyRecord = StringUtils.isBlank(column.asString()); + } + + if (isDirtyRecord) { + taskPluginCollector.collectDirtyRecord(record, "primarykey field is null"); + continue; + } + + Upsert upsert = table.newUpsert(); + Insert insert = table.newInsert(); + + for (int i = 0; i < columns.size(); i++) { + PartialRow row; + if (isUpsert) { + //覆盖更新 + row = upsert.getRow(); + } else { + //增量更新 + row = insert.getRow(); + } + Configuration col = columns.get(i); + String name = col.getString(Key.NAME); + ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE)); + Column column = record.getColumn(col.getInt(Key.INDEX, i)); + Object rawData = column.getRawData(); + if (rawData == null) { + row.setNull(name); + continue; + } + switch (type) { + case INT: + row.addInt(name, Integer.parseInt(rawData.toString())); + break; + case LONG: + case BIGINT: + row.addLong(name, Long.parseLong(rawData.toString())); + break; + case FLOAT: + row.addFloat(name, Float.parseFloat(rawData.toString())); + break; + case DOUBLE: + row.addDouble(name, Double.parseDouble(rawData.toString())); + break; + case BOOLEAN: + row.addBoolean(name, Boolean.getBoolean(rawData.toString())); + break; + case STRING: + default: + row.addString(name, rawData.toString()); + } + } + try { + RetryUtil.executeWithRetry(()->{ + if (isUpsert) { + //覆盖更新 + session.apply(upsert); + } else { + //增量更新 + session.apply(insert); + } + //提前写数据,阈值可自定义 + if (counter.incrementAndGet() > batchSize * 0.75) { + session.flush(); + counter.set(0L); + } + return true; + },5,1000L,true); + + } catch (Exception e) { + LOG.error("Data write failed!", e); + if (isSkipFail) { + LOG.warn("Because you have configured skipFail is true,this data will be skipped!"); + taskPluginCollector.collectDirtyRecord(record, e.getMessage()); + }else { + throw e; + } + } + } + } catch (Exception e) { + throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e); + } + AtomicInteger i = new AtomicInteger(10); + try { + while (i.get() > 0) { + if (session.hasPendingOperations()) { + session.flush(); + break; + } + Thread.sleep(1000L); + i.decrementAndGet(); + } + } catch (Exception e) { + LOG.info("Waiting for data to be inserted...... " + i + "s"); + try { + Thread.sleep(1000L); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + i.decrementAndGet(); + } finally { + try { + session.flush(); + } catch (KuduException e) { + e.printStackTrace(); + } + } + + } + + +} diff --git a/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java new file mode 100644 index 00000000..f1499a0f --- /dev/null +++ b/kuduwriter/src/main/java/com/q1/kudu/conf/KuduConfig.java @@ -0,0 +1,9 @@ +package com.q1.kudu.conf; + +/** + * @author daizihao + * @create 2020-09-16 11:39 + **/ +public class KuduConfig { + +} diff --git a/kuduwriter/src/main/resources/plugin.json b/kuduwriter/src/main/resources/plugin.json new file mode 100644 index 00000000..948c7e22 --- /dev/null +++ b/kuduwriter/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "kudu11xwriter", + "class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter", + "description": "use put: prod. mechanism: use kudu java api put data.", + "developer": "com.q1.daizihao" +} + diff --git a/kuduwriter/src/main/resources/plugin_job_template.json b/kuduwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..d2723098 --- /dev/null +++ b/kuduwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,59 @@ +{ + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***", + "timeout": 60000, + "sessionTimeout": 60000 + + }, + "table": "", + "replicaCount": 3, + "truncate": false, + "writeMode": "upsert", + "partition": { + "range": { + "column1": [ + { + "lower": "2020-08-25", + "upper": "2020-08-26" + }, + { + "lower": "2020-08-26", + "upper": "2020-08-27" + }, + { + "lower": "2020-08-27", + "upper": "2020-08-28" + } + ] + }, + "hash": { + "column": [ + "column1" + ], + "number": 3 + } + }, + "column": [ + { + "index": 0, + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "index": 1, + "name": "c2", + "type": "string", + "compress": "DEFAULT_COMPRESSION", + "encoding": "AUTO_ENCODING", + "comment": "注解xxxx" + } + ], + "batchSize": 1024, + "bufferSize": 2048, + "skipFail": false, + "encoding": "UTF-8" + } +} \ No newline at end of file diff --git a/kuduwriter/src/test/java/com/dai/test.java b/kuduwriter/src/test/java/com/dai/test.java new file mode 100644 index 00000000..ba4ceecd --- /dev/null +++ b/kuduwriter/src/test/java/com/dai/test.java @@ -0,0 +1,23 @@ +package com.dai; + +import com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xHelper; +import org.junit.Test; +import com.q1.datax.plugin.writer.kudu11xwriter.ColumnType; +import com.q1.datax.plugin.writer.kudu11xwriter.InsertModeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kudu.client.AsyncKuduClient.LOG; + +/** + * @author daizihao + * @create 2020-08-28 11:03 + **/ +public class test { + @Test + public void kuduTypeTest() { + + } +} diff --git a/pom.xml b/pom.xml index bd270cc5..57e44057 100755 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ plugin-unstructured-storage-util hbase20xsqlreader hbase20xsqlwriter + kuduwriter From d6daf9cfb059ca300fe1ad3fb8695e303b08991d Mon Sep 17 00:00:00 2001 From: daizihao Date: Mon, 21 Sep 2020 15:21:09 +0800 Subject: [PATCH 2/2] readme and doc update --- kuduwriter/README.md | 141 +------------------------ kuduwriter/src/main/doc/kuduwriter.md | 143 ++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 139 deletions(-) create mode 100644 kuduwriter/src/main/doc/kuduwriter.md diff --git a/kuduwriter/README.md b/kuduwriter/README.md index d938825b..f53de1b5 100644 --- a/kuduwriter/README.md +++ b/kuduwriter/README.md @@ -1,143 +1,6 @@ -# datax-kudu-plugins +# datax-kudu-plugin datax kudu的writer插件 -eg: - -```json -{ - "name": "kudu11xwriter", - "parameter": { - "kuduConfig": { - "kudu.master_addresses": "***", - "timeout": 60000, - "sessionTimeout": 60000 - - }, - "table": "", - "replicaCount": 3, - "truncate": false, - "writeMode": "upsert", - "partition": { - "range": { - "column1": [ - { - "lower": "2020-08-25", - "upper": "2020-08-26" - }, - { - "lower": "2020-08-26", - "upper": "2020-08-27" - }, - { - "lower": "2020-08-27", - "upper": "2020-08-28" - } - ] - }, - "hash": { - "column": [ - "column1" - ], - "number": 3 - } - }, - "column": [ - { - "index": 0, - "name": "c1", - "type": "string", - "primaryKey": true - }, - { - "index": 1, - "name": "c2", - "type": "string", - "compress": "DEFAULT_COMPRESSION", - "encoding": "AUTO_ENCODING", - "comment": "注解xxxx" - } - ], - "batchSize": 1024, - "bufferSize": 2048, - "skipFail": false, - "encoding": "UTF-8" - } -} -``` - -必须参数: - -```json - "writer": { - "name": "kudu11xwriter", - "parameter": { - "kuduConfig": { - "kudu.master_addresses": "***" - }, - "table": "***", - "column": [ - { - "name": "c1", - "type": "string", - "primaryKey": true - }, - { - "name": "c2", - "type": "string", - }, - { - "name": "c3", - "type": "string" - }, - { - "name": "c4", - "type": "string" - } - ] - } - } -``` - -主键列请写到最前面 - - - -![image-20200901193148188](C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200901193148188.png) - -##### 配置列表 - -| name | default | description | 是否必须 | -| ------------------- | ------------------- | ------------------------------------------------------------ | -------- | -| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 | -| table | | 导入目标表名 | 是 | -| partition | | 分区 | 否 | -| column | | 列 | 是 | -| name | | 列名 | 是 | -| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 | -| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 | -| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 | -| compression | DEFAULT_COMPRESSION | 压缩格式 | 否 | -| encoding | AUTO_ENCODING | 编码 | 否 | -| numReplicas | 3 | 保留副本个数 | 否 | -| hash | | hash分区 | 否 | -| num | 3 | hash分区个数 | 否 | -| range | | range分区 | 否 | -| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 | -| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 | -| truncate | false | 是否清空表,本质上是删表重建 | 否 | -| insertMode | upsert | upsert,insert,update | 否 | -| writeBatchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 | -| mutationBufferSpace | 3072 | 缓冲区大小 | 否 | -| skipFail | false | 是否跳过插入不成功的数据 | 否 | -| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 | -| sessionTimeout | 60000 | session超时时间 单位:ms | 否 | - - - - - - - - +仅在kudu11进行过测试 diff --git a/kuduwriter/src/main/doc/kuduwriter.md b/kuduwriter/src/main/doc/kuduwriter.md new file mode 100644 index 00000000..44bef235 --- /dev/null +++ b/kuduwriter/src/main/doc/kuduwriter.md @@ -0,0 +1,143 @@ +# datax-kudu-plugin +datax kudu的writer插件 + + + +eg: + +```json +{ + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***", + "timeout": 60000, + "sessionTimeout": 60000 + + }, + "table": "", + "replicaCount": 3, + "truncate": false, + "writeMode": "upsert", + "partition": { + "range": { + "column1": [ + { + "lower": "2020-08-25", + "upper": "2020-08-26" + }, + { + "lower": "2020-08-26", + "upper": "2020-08-27" + }, + { + "lower": "2020-08-27", + "upper": "2020-08-28" + } + ] + }, + "hash": { + "column": [ + "column1" + ], + "number": 3 + } + }, + "column": [ + { + "index": 0, + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "index": 1, + "name": "c2", + "type": "string", + "compress": "DEFAULT_COMPRESSION", + "encoding": "AUTO_ENCODING", + "comment": "注解xxxx" + } + ], + "batchSize": 1024, + "bufferSize": 2048, + "skipFail": false, + "encoding": "UTF-8" + } +} +``` + +必须参数: + +```json + "writer": { + "name": "kudu11xwriter", + "parameter": { + "kuduConfig": { + "kudu.master_addresses": "***" + }, + "table": "***", + "column": [ + { + "name": "c1", + "type": "string", + "primaryKey": true + }, + { + "name": "c2", + "type": "string", + }, + { + "name": "c3", + "type": "string" + }, + { + "name": "c4", + "type": "string" + } + ] + } + } +``` + +主键列请写到最前面 + + + +![image-20200901193148188](C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200901193148188.png) + +##### 配置列表 + +| name | default | description | 是否必须 | +| -------------- | ------------------- | ------------------------------------------------------------ | -------- | +| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 | +| table | | 导入目标表名 | 是 | +| partition | | 分区 | 否 | +| column | | 列 | 是 | +| name | | 列名 | 是 | +| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 | +| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 | +| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 | +| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 | +| encoding | AUTO_ENCODING | 编码 | 否 | +| replicaCount | 3 | 保留副本个数 | 否 | +| hash | | hash分区 | 否 | +| number | 3 | hash分区个数 | 否 | +| range | | range分区 | 否 | +| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 | +| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 | +| truncate | false | 是否清空表,本质上是删表重建 | 否 | +| writeMode | upsert | upsert,insert,update | 否 | +| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 | +| bufferSize | 3072 | 缓冲区大小 | 否 | +| skipFail | false | 是否跳过插入不成功的数据 | 否 | +| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 | +| sessionTimeout | 60000 | session超时时间 单位:ms | 否 | + + + + + + + +