mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:40:28 +08:00
commit
ff1f9618a6
6
kuduwriter/README.md
Normal file
6
kuduwriter/README.md
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
# datax-kudu-plugin
|
||||||
|
datax kudu的writer插件
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
仅在kudu11进行过测试
|
82
kuduwriter/pom.xml
Normal file
82
kuduwriter/pom.xml
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>kuduwriter</artifactId>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kudu</groupId>
|
||||||
|
<artifactId>kudu-client</artifactId>
|
||||||
|
<version>1.11.1</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.13</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-service-face</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<!-- compiler plugin -->
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${jdk-version}</source>
|
||||||
|
<target>${jdk-version}</target>
|
||||||
|
<encoding>${project-sourceEncoding}</encoding>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<!-- assembly plugin -->
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<finalName>datax</finalName>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>dwzip</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
35
kuduwriter/src/main/assembly/package.xml
Normal file
35
kuduwriter/src/main/assembly/package.xml
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
<assembly
|
||||||
|
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
<include>plugin_job_template.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>kudu11xwriter-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/kudu11xwriter</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/writer/kudu11xwriter/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
143
kuduwriter/src/main/doc/kuduwriter.md
Normal file
143
kuduwriter/src/main/doc/kuduwriter.md
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
# datax-kudu-plugin
|
||||||
|
datax kudu的writer插件
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
eg:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": "kudu11xwriter",
|
||||||
|
"parameter": {
|
||||||
|
"kuduConfig": {
|
||||||
|
"kudu.master_addresses": "***",
|
||||||
|
"timeout": 60000,
|
||||||
|
"sessionTimeout": 60000
|
||||||
|
|
||||||
|
},
|
||||||
|
"table": "",
|
||||||
|
"replicaCount": 3,
|
||||||
|
"truncate": false,
|
||||||
|
"writeMode": "upsert",
|
||||||
|
"partition": {
|
||||||
|
"range": {
|
||||||
|
"column1": [
|
||||||
|
{
|
||||||
|
"lower": "2020-08-25",
|
||||||
|
"upper": "2020-08-26"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-26",
|
||||||
|
"upper": "2020-08-27"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-27",
|
||||||
|
"upper": "2020-08-28"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": {
|
||||||
|
"column": [
|
||||||
|
"column1"
|
||||||
|
],
|
||||||
|
"number": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"name": "c1",
|
||||||
|
"type": "string",
|
||||||
|
"primaryKey": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 1,
|
||||||
|
"name": "c2",
|
||||||
|
"type": "string",
|
||||||
|
"compress": "DEFAULT_COMPRESSION",
|
||||||
|
"encoding": "AUTO_ENCODING",
|
||||||
|
"comment": "注解xxxx"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1024,
|
||||||
|
"bufferSize": 2048,
|
||||||
|
"skipFail": false,
|
||||||
|
"encoding": "UTF-8"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
必须参数:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"writer": {
|
||||||
|
"name": "kudu11xwriter",
|
||||||
|
"parameter": {
|
||||||
|
"kuduConfig": {
|
||||||
|
"kudu.master_addresses": "***"
|
||||||
|
},
|
||||||
|
"table": "***",
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"name": "c1",
|
||||||
|
"type": "string",
|
||||||
|
"primaryKey": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c2",
|
||||||
|
"type": "string",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c3",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "c4",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
主键列请写到最前面
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
##### 配置列表
|
||||||
|
|
||||||
|
| name | default | description | 是否必须 |
|
||||||
|
| -------------- | ------------------- | ------------------------------------------------------------ | -------- |
|
||||||
|
| kuduConfig | | kudu配置 (kudu.master_addresses等) | 是 |
|
||||||
|
| table | | 导入目标表名 | 是 |
|
||||||
|
| partition | | 分区 | 否 |
|
||||||
|
| column | | 列 | 是 |
|
||||||
|
| name | | 列名 | 是 |
|
||||||
|
| type | | 列的类型,现支持INT, FLOAT, STRING, BIGINT, DOUBLE, BOOLEAN, LONG。 | 是 |
|
||||||
|
| index | 升序排列 | 列索引位置,如reader中取到的某一字段在第二位置(eg: name, id, age)但kudu目标表结构不同(eg:id,name, age),此时就需要将index赋值为(1,0,2),默认顺序(0,1,2) | 否 |
|
||||||
|
| primaryKey | false | 是否为主键(请将所有的主键列写在前面),不表明主键将不会检查过滤脏数据 | 否 |
|
||||||
|
| compress | DEFAULT_COMPRESSION | 压缩格式 | 否 |
|
||||||
|
| encoding | AUTO_ENCODING | 编码 | 否 |
|
||||||
|
| replicaCount | 3 | 保留副本个数 | 否 |
|
||||||
|
| hash | | hash分区 | 否 |
|
||||||
|
| number | 3 | hash分区个数 | 否 |
|
||||||
|
| range | | range分区 | 否 |
|
||||||
|
| lower | | range分区下限 (eg: sql建表:partition value='haha' 对应:“lower”:“haha”,“upper”:“haha\000”) | 否 |
|
||||||
|
| upper | | range分区上限(eg: sql建表:partition "10" <= VALUES < "20" 对应:“lower”:“10”,“upper”:“20”) | 否 |
|
||||||
|
| truncate | false | 是否清空表,本质上是删表重建 | 否 |
|
||||||
|
| writeMode | upsert | upsert,insert,update | 否 |
|
||||||
|
| batchSize | 512 | 每xx行数据flush一次结果(最好不要超过1024) | 否 |
|
||||||
|
| bufferSize | 3072 | 缓冲区大小 | 否 |
|
||||||
|
| skipFail | false | 是否跳过插入不成功的数据 | 否 |
|
||||||
|
| timeout | 60000 | client超时时间,如创建表,删除表操作的超时时间。单位:ms | 否 |
|
||||||
|
| sessionTimeout | 60000 | session超时时间 单位:ms | 否 |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-31 19:12
|
||||||
|
**/
|
||||||
|
public enum ColumnType {
|
||||||
|
INT("int"),
|
||||||
|
FLOAT("float"),
|
||||||
|
STRING("string"),
|
||||||
|
BIGINT("bigint"),
|
||||||
|
DOUBLE("double"),
|
||||||
|
BOOLEAN("boolean"),
|
||||||
|
LONG("long");
|
||||||
|
private String mode;
|
||||||
|
ColumnType(String mode) {
|
||||||
|
this.mode = mode.toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMode() {
|
||||||
|
return mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ColumnType getByTypeName(String modeName) {
|
||||||
|
for (ColumnType modeType : values()) {
|
||||||
|
if (modeType.mode.equalsIgnoreCase(modeName)) {
|
||||||
|
return modeType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
|
||||||
|
String.format("Kuduwriter does not support the type:%s, currently supported types are:%s", modeName, Arrays.asList(values())));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-31 14:42
|
||||||
|
**/
|
||||||
|
public class Constant {
|
||||||
|
public static final String DEFAULT_ENCODING = "UTF-8";
|
||||||
|
// public static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
|
|
||||||
|
public static final String COMPRESSION = "DEFAULT_COMPRESSION";
|
||||||
|
public static final String ENCODING = "AUTO_ENCODING";
|
||||||
|
public static final Long ADMIN_TIMEOUTMS = 60000L;
|
||||||
|
public static final Long SESSION_TIMEOUTMS = 60000L;
|
||||||
|
|
||||||
|
|
||||||
|
public static final String INSERT_MODE = "upsert";
|
||||||
|
public static final long DEFAULT_WRITE_BATCH_SIZE = 512L;
|
||||||
|
public static final long DEFAULT_MUTATION_BUFFER_SPACE = 3072L;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-31 14:47
|
||||||
|
**/
|
||||||
|
public enum InsertModeType {
|
||||||
|
Insert("insert"),
|
||||||
|
Upsert("upsert"),
|
||||||
|
Update("update");
|
||||||
|
private String mode;
|
||||||
|
|
||||||
|
InsertModeType(String mode) {
|
||||||
|
this.mode = mode.toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMode() {
|
||||||
|
return mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static InsertModeType getByTypeName(String modeName) {
|
||||||
|
for (InsertModeType modeType : values()) {
|
||||||
|
if (modeType.mode.equalsIgnoreCase(modeName)) {
|
||||||
|
return modeType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
|
||||||
|
String.format("Kuduwriter does not support the mode :[%s], currently supported mode types are :%s", modeName, Arrays.asList(values())));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-31 14:17
|
||||||
|
**/
|
||||||
|
public class Key {
|
||||||
|
public final static String KUDU_CONFIG = "kuduConfig";
|
||||||
|
public final static String KUDU_MASTER = "kudu.master_addresses";
|
||||||
|
public final static String KUDU_ADMIN_TIMEOUT = "timeout";
|
||||||
|
public final static String KUDU_SESSION_TIMEOUT = "sessionTimeout";
|
||||||
|
|
||||||
|
public final static String TABLE = "table";
|
||||||
|
public final static String PARTITION = "partition";
|
||||||
|
public final static String COLUMN = "column";
|
||||||
|
|
||||||
|
public static final String NAME = "name";
|
||||||
|
public static final String TYPE = "type";
|
||||||
|
public static final String INDEX = "index";
|
||||||
|
public static final String PRIMARYKEY = "primaryKey";
|
||||||
|
public static final String COMPRESSION = "compress";
|
||||||
|
public static final String COMMENT = "comment";
|
||||||
|
public final static String ENCODING = "encoding";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static final String NUM_REPLICAS = "replicaCount";
|
||||||
|
public static final String HASH = "hash";
|
||||||
|
public static final String HASH_NUM = "number";
|
||||||
|
|
||||||
|
public static final String RANGE = "range";
|
||||||
|
public static final String LOWER = "lower";
|
||||||
|
public static final String UPPER = "upper";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static final String TRUNCATE = "truncate";
|
||||||
|
|
||||||
|
public static final String INSERT_MODE = "writeMode";
|
||||||
|
|
||||||
|
public static final String WRITE_BATCH_SIZE = "batchSize";
|
||||||
|
|
||||||
|
public static final String MUTATION_BUFFER_SPACE = "bufferSize";
|
||||||
|
public static final String SKIP_FAIL = "skipFail";
|
||||||
|
}
|
@ -0,0 +1,291 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.Validate;
|
||||||
|
import org.apache.kudu.ColumnSchema;
|
||||||
|
import org.apache.kudu.Schema;
|
||||||
|
import org.apache.kudu.Type;
|
||||||
|
import org.apache.kudu.client.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-27 18:30
|
||||||
|
**/
|
||||||
|
public class Kudu11xHelper {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Kudu11xHelper.class);
|
||||||
|
|
||||||
|
public static Map<String, Object> getKuduConfiguration(String kuduConfig) {
|
||||||
|
if (StringUtils.isBlank(kuduConfig)) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE,
|
||||||
|
"Connection configuration information required.");
|
||||||
|
}
|
||||||
|
Map<String, Object> kConfiguration;
|
||||||
|
try {
|
||||||
|
kConfiguration = JSON.parseObject(kuduConfig, HashMap.class);
|
||||||
|
Validate.isTrue(kConfiguration != null, "kuduConfig is null!");
|
||||||
|
kConfiguration.put(Key.KUDU_ADMIN_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_ADMIN_TIMEOUT, Constant.ADMIN_TIMEOUTMS));
|
||||||
|
kConfiguration.put(Key.KUDU_SESSION_TIMEOUT, kConfiguration.getOrDefault(Key.KUDU_SESSION_TIMEOUT, Constant.SESSION_TIMEOUTMS));
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return kConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static KuduClient getKuduClient(String kuduConfig) {
|
||||||
|
Map<String, Object> conf = Kudu11xHelper.getKuduConfiguration(kuduConfig);
|
||||||
|
KuduClient kuduClient = null;
|
||||||
|
try {
|
||||||
|
String masterAddress = (String)conf.get(Key.KUDU_MASTER);
|
||||||
|
kuduClient = new KuduClient.KuduClientBuilder(masterAddress)
|
||||||
|
.defaultAdminOperationTimeoutMs((Long) conf.get(Key.KUDU_ADMIN_TIMEOUT))
|
||||||
|
.defaultOperationTimeoutMs((Long)conf.get(Key.KUDU_SESSION_TIMEOUT))
|
||||||
|
.build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
|
||||||
|
}
|
||||||
|
return kuduClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static KuduTable getKuduTable(com.alibaba.datax.common.util.Configuration configuration, KuduClient kuduClient) {
|
||||||
|
String tableName = configuration.getString(Key.TABLE);
|
||||||
|
|
||||||
|
KuduTable table = null;
|
||||||
|
try {
|
||||||
|
if (kuduClient.tableExists(tableName)) {
|
||||||
|
table = kuduClient.openTable(tableName);
|
||||||
|
} else {
|
||||||
|
synchronized (Kudu11xHelper.class) {
|
||||||
|
if (!kuduClient.tableExists(tableName)) {
|
||||||
|
Schema schema = Kudu11xHelper.getSchema(configuration);
|
||||||
|
CreateTableOptions tableOptions = new CreateTableOptions();
|
||||||
|
|
||||||
|
Kudu11xHelper.setTablePartition(configuration, tableOptions, schema);
|
||||||
|
//副本数
|
||||||
|
Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3);
|
||||||
|
tableOptions.setNumReplicas(numReplicas);
|
||||||
|
table = kuduClient.createTable(tableName, schema, tableOptions);
|
||||||
|
} else {
|
||||||
|
table = kuduClient.openTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_TABLE_ERROR, e);
|
||||||
|
}
|
||||||
|
return table;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createTable(com.alibaba.datax.common.util.Configuration configuration) {
|
||||||
|
String tableName = configuration.getString(Key.TABLE);
|
||||||
|
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
|
||||||
|
KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
|
||||||
|
try {
|
||||||
|
Schema schema = Kudu11xHelper.getSchema(configuration);
|
||||||
|
CreateTableOptions tableOptions = new CreateTableOptions();
|
||||||
|
|
||||||
|
Kudu11xHelper.setTablePartition(configuration, tableOptions, schema);
|
||||||
|
//副本数
|
||||||
|
Integer numReplicas = configuration.getInt(Key.NUM_REPLICAS, 3);
|
||||||
|
tableOptions.setNumReplicas(numReplicas);
|
||||||
|
kuduClient.createTable(tableName, schema, tableOptions);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GREATE_KUDU_TABLE_ERROR, e);
|
||||||
|
} finally {
|
||||||
|
AtomicInteger i = new AtomicInteger(5);
|
||||||
|
while (i.get()>0) {
|
||||||
|
try {
|
||||||
|
if (kuduClient.isCreateTableDone(tableName)){
|
||||||
|
Kudu11xHelper.closeClient(kuduClient);
|
||||||
|
LOG.info("Table "+ tableName +" is created!");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i.decrementAndGet();
|
||||||
|
LOG.error("timeout!");
|
||||||
|
} catch (KuduException e) {
|
||||||
|
LOG.info("Wait for the table to be created..... "+i);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
i.decrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (kuduClient != null) {
|
||||||
|
kuduClient.close();
|
||||||
|
}
|
||||||
|
} catch (KuduException e) {
|
||||||
|
LOG.info("Kudu client has been shut down!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isTableExists(com.alibaba.datax.common.util.Configuration configuration) {
|
||||||
|
String tableName = configuration.getString(Key.TABLE);
|
||||||
|
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
|
||||||
|
KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
|
||||||
|
try {
|
||||||
|
return kuduClient.tableExists(tableName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.GET_KUDU_CONNECTION_ERROR, e);
|
||||||
|
} finally {
|
||||||
|
Kudu11xHelper.closeClient(kuduClient);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void closeClient(KuduClient kuduClient) {
|
||||||
|
try {
|
||||||
|
if (kuduClient != null) {
|
||||||
|
kuduClient.close();
|
||||||
|
}
|
||||||
|
} catch (KuduException e) {
|
||||||
|
LOG.warn("kudu client is not gracefully closed !");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Schema getSchema(com.alibaba.datax.common.util.Configuration configuration) {
|
||||||
|
List<Configuration> columns = configuration.getListConfiguration(Key.COLUMN);
|
||||||
|
List<ColumnSchema> columnSchemas = new ArrayList<>();
|
||||||
|
Schema schema = null;
|
||||||
|
if (columns == null || columns.isEmpty()) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, "column is not defined,eg:column:[{\"name\": \"cf0:column0\",\"type\": \"string\"},{\"name\": \"cf1:column1\",\"type\": \"long\"}]");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
for (Configuration column : columns) {
|
||||||
|
|
||||||
|
String type = "BIGINT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ||
|
||||||
|
"LONG".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase()) ?
|
||||||
|
"INT64" : "INT".equals(column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase())?
|
||||||
|
"INT32":column.getNecessaryValue(Key.TYPE, Kudu11xWriterErrorcode.REQUIRED_VALUE).toUpperCase();
|
||||||
|
String name = column.getNecessaryValue(Key.NAME, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
||||||
|
Boolean key = column.getBool(Key.PRIMARYKEY, false);
|
||||||
|
String encoding = column.getString(Key.ENCODING, Constant.ENCODING).toUpperCase();
|
||||||
|
String compression = column.getString(Key.COMPRESSION, Constant.COMPRESSION).toUpperCase();
|
||||||
|
String comment = column.getString(Key.COMMENT, "");
|
||||||
|
|
||||||
|
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder(name, Type.getTypeForName(type))
|
||||||
|
.key(key)
|
||||||
|
.encoding(ColumnSchema.Encoding.valueOf(encoding))
|
||||||
|
.compressionAlgorithm(ColumnSchema.CompressionAlgorithm.valueOf(compression))
|
||||||
|
.comment(comment)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
schema = new Schema(columnSchemas);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.REQUIRED_VALUE, e);
|
||||||
|
}
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Integer getPrimaryKeyIndexUntil(List<Configuration> columns){
|
||||||
|
int i = 0;
|
||||||
|
while ( i < columns.size() ) {
|
||||||
|
Configuration col = columns.get(i);
|
||||||
|
if (!col.getBool(Key.PRIMARYKEY, false)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setTablePartition(com.alibaba.datax.common.util.Configuration configuration,
|
||||||
|
CreateTableOptions tableOptions,
|
||||||
|
Schema schema) {
|
||||||
|
Configuration partition = configuration.getConfiguration(Key.PARTITION);
|
||||||
|
if (partition == null) {
|
||||||
|
ColumnSchema columnSchema = schema.getColumns().get(0);
|
||||||
|
tableOptions.addHashPartitions(Collections.singletonList(columnSchema.getName()), 3);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//range分区
|
||||||
|
Configuration range = partition.getConfiguration(Key.RANGE);
|
||||||
|
if (range != null) {
|
||||||
|
List<String> rangeColums = new ArrayList<>(range.getKeys());
|
||||||
|
tableOptions.setRangePartitionColumns(rangeColums);
|
||||||
|
for (String rangeColum : rangeColums) {
|
||||||
|
List<Configuration> lowerAndUppers = range.getListConfiguration(rangeColum);
|
||||||
|
for (Configuration lowerAndUpper : lowerAndUppers) {
|
||||||
|
PartialRow lower = schema.newPartialRow();
|
||||||
|
lower.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.LOWER, Kudu11xWriterErrorcode.REQUIRED_VALUE));
|
||||||
|
PartialRow upper = schema.newPartialRow();
|
||||||
|
upper.addString(rangeColum, lowerAndUpper.getNecessaryValue(Key.UPPER, Kudu11xWriterErrorcode.REQUIRED_VALUE));
|
||||||
|
tableOptions.addRangePartition(lower, upper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Set range partition complete!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置Hash分区
|
||||||
|
Configuration hash = partition.getConfiguration(Key.HASH);
|
||||||
|
if (hash != null) {
|
||||||
|
List<String> hashColums = hash.getList(Key.COLUMN, String.class);
|
||||||
|
Integer hashPartitionNum = configuration.getInt(Key.HASH_NUM, 3);
|
||||||
|
tableOptions.addHashPartitions(hashColums, hashPartitionNum);
|
||||||
|
LOG.info("Set hash partition complete!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void validateParameter(com.alibaba.datax.common.util.Configuration configuration) {
|
||||||
|
configuration.getNecessaryValue(Key.KUDU_CONFIG, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
||||||
|
configuration.getNecessaryValue(Key.TABLE, Kudu11xWriterErrorcode.REQUIRED_VALUE);
|
||||||
|
String encoding = configuration.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
|
||||||
|
if (!Charset.isSupported(encoding)) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.ILLEGAL_VALUE,
|
||||||
|
String.format("Encoding is not supported:[%s] .", encoding));
|
||||||
|
}
|
||||||
|
configuration.set(Key.ENCODING, encoding);
|
||||||
|
String insertMode = configuration.getString(Key.INSERT_MODE, Constant.INSERT_MODE);
|
||||||
|
try {
|
||||||
|
InsertModeType.getByTypeName(insertMode);
|
||||||
|
} catch (Exception e) {
|
||||||
|
insertMode = Constant.INSERT_MODE;
|
||||||
|
}
|
||||||
|
configuration.set(Key.INSERT_MODE, insertMode);
|
||||||
|
|
||||||
|
Long writeBufferSize = configuration.getLong(Key.WRITE_BATCH_SIZE, Constant.DEFAULT_WRITE_BATCH_SIZE);
|
||||||
|
configuration.set(Key.WRITE_BATCH_SIZE, writeBufferSize);
|
||||||
|
|
||||||
|
Long mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE, Constant.DEFAULT_MUTATION_BUFFER_SPACE);
|
||||||
|
configuration.set(Key.MUTATION_BUFFER_SPACE, mutationBufferSpace);
|
||||||
|
|
||||||
|
Boolean isSkipFail = configuration.getBool(Key.SKIP_FAIL, false);
|
||||||
|
configuration.set(Key.SKIP_FAIL, isSkipFail);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void truncateTable(Configuration configuration) {
|
||||||
|
String kuduConfig = configuration.getString(Key.KUDU_CONFIG);
|
||||||
|
String userTable = configuration.getString(Key.TABLE);
|
||||||
|
LOG.info(String.format("Because you have configured truncate is true,KuduWriter begins to truncate table %s .", userTable));
|
||||||
|
KuduClient kuduClient = Kudu11xHelper.getKuduClient(kuduConfig);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (kuduClient.tableExists(userTable)) {
|
||||||
|
kuduClient.deleteTable(userTable);
|
||||||
|
LOG.info(String.format("table %s has been deleted.", userTable));
|
||||||
|
}
|
||||||
|
} catch (KuduException e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.DELETE_KUDU_ERROR, e);
|
||||||
|
} finally {
|
||||||
|
Kudu11xHelper.closeClient(kuduClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,85 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-27 16:58
|
||||||
|
**/
|
||||||
|
public class Kudu11xWriter extends Writer {
|
||||||
|
public static class Job extends Writer.Job{
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
|
private Configuration config = null;
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.config = this.getPluginJobConf();
|
||||||
|
Kudu11xHelper.validateParameter(this.config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
Boolean truncate = config.getBool(Key.TRUNCATE,false);
|
||||||
|
if(truncate){
|
||||||
|
Kudu11xHelper.truncateTable(this.config);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!Kudu11xHelper.isTableExists(config)){
|
||||||
|
Kudu11xHelper.createTable(config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int i) {
|
||||||
|
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
|
||||||
|
for (int j = 0; j < i; j++) {
|
||||||
|
splitResultConfigs.add(config.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
return splitResultConfigs;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Writer.Task{
|
||||||
|
private Configuration taskConfig;
|
||||||
|
private KuduWriterTask kuduTaskProxy;
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.taskConfig = super.getPluginJobConf();
|
||||||
|
this.kuduTaskProxy = new KuduWriterTask(this.taskConfig);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void startWrite(RecordReceiver lineReceiver) {
|
||||||
|
this.kuduTaskProxy.startWriter(lineReceiver,super.getTaskPluginCollector());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
try {
|
||||||
|
if (kuduTaskProxy.session != null) {
|
||||||
|
kuduTaskProxy.session.close();
|
||||||
|
}
|
||||||
|
}catch (Exception e){
|
||||||
|
LOG.warn("kudu session is not gracefully closed !");
|
||||||
|
}
|
||||||
|
Kudu11xHelper.closeClient(kuduTaskProxy.kuduClient);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,39 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.spi.ErrorCode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-27 19:25
|
||||||
|
**/
|
||||||
|
public enum Kudu11xWriterErrorcode implements ErrorCode {
|
||||||
|
REQUIRED_VALUE("Kuduwriter-00", "You are missing a required parameter value."),
|
||||||
|
ILLEGAL_VALUE("Kuduwriter-01", "You fill in the parameter values are not legitimate."),
|
||||||
|
GET_KUDU_CONNECTION_ERROR("Kuduwriter-02", "Error getting Kudu connection."),
|
||||||
|
GET_KUDU_TABLE_ERROR("Kuduwriter-03", "Error getting Kudu table."),
|
||||||
|
CLOSE_KUDU_CONNECTION_ERROR("Kuduwriter-04", "Error closing Kudu connection."),
|
||||||
|
CLOSE_KUDU_SESSION_ERROR("Kuduwriter-06", "Error closing Kudu table connection."),
|
||||||
|
PUT_KUDU_ERROR("Kuduwriter-07", "IO exception occurred when writing to Kudu."),
|
||||||
|
DELETE_KUDU_ERROR("Kuduwriter-08", "An exception occurred while delete Kudu table."),
|
||||||
|
GREATE_KUDU_TABLE_ERROR("Kuduwriter-09", "Error creating Kudu table."),
|
||||||
|
PARAMETER_NUM_ERROR("Kuduwriter-10","The number of parameters does not match.")
|
||||||
|
;
|
||||||
|
|
||||||
|
private final String code;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
|
||||||
|
Kudu11xWriterErrorcode(String code, String description) {
|
||||||
|
this.code = code;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String getCode() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,180 @@
|
|||||||
|
package com.q1.datax.plugin.writer.kudu11xwriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.common.util.RetryUtil;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.kudu.client.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-31 16:55
|
||||||
|
**/
|
||||||
|
public class KuduWriterTask {
|
||||||
|
private final static Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
|
||||||
|
|
||||||
|
public List<Configuration> columns;
|
||||||
|
public String encoding;
|
||||||
|
public String insertMode;
|
||||||
|
public Double batchSize;
|
||||||
|
public long mutationBufferSpace;
|
||||||
|
public Boolean isUpsert;
|
||||||
|
public Boolean isSkipFail;
|
||||||
|
|
||||||
|
public KuduClient kuduClient;
|
||||||
|
public KuduTable table;
|
||||||
|
public KuduSession session;
|
||||||
|
private Integer primaryKeyIndexUntil;
|
||||||
|
|
||||||
|
|
||||||
|
public KuduWriterTask(com.alibaba.datax.common.util.Configuration configuration) {
|
||||||
|
this.columns = configuration.getListConfiguration(Key.COLUMN);
|
||||||
|
this.encoding = configuration.getString(Key.ENCODING);
|
||||||
|
this.insertMode = configuration.getString(Key.INSERT_MODE);
|
||||||
|
this.batchSize = configuration.getDouble(Key.WRITE_BATCH_SIZE);
|
||||||
|
this.mutationBufferSpace = configuration.getLong(Key.MUTATION_BUFFER_SPACE);
|
||||||
|
this.isUpsert = !configuration.getString(Key.INSERT_MODE).equals("insert");
|
||||||
|
|
||||||
|
this.kuduClient = Kudu11xHelper.getKuduClient(configuration.getString(Key.KUDU_CONFIG));
|
||||||
|
this.table = Kudu11xHelper.getKuduTable(configuration, kuduClient);
|
||||||
|
this.session = kuduClient.newSession();
|
||||||
|
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
|
||||||
|
session.setMutationBufferSpace((int) mutationBufferSpace);
|
||||||
|
this.primaryKeyIndexUntil = Kudu11xHelper.getPrimaryKeyIndexUntil(columns);
|
||||||
|
// tableName = configuration.getString(Key.TABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
|
||||||
|
Record record;
|
||||||
|
AtomicLong counter = new AtomicLong(0L);
|
||||||
|
try {
|
||||||
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
|
if (record.getColumnNumber() != columns.size()) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PARAMETER_NUM_ERROR, " number of record fields:" + record.getColumnNumber() + " number of configuration fields:" + columns.size());
|
||||||
|
}
|
||||||
|
boolean isDirtyRecord = false;
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i <= primaryKeyIndexUntil && !isDirtyRecord; i++) {
|
||||||
|
Column column = record.getColumn(i);
|
||||||
|
isDirtyRecord = StringUtils.isBlank(column.asString());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isDirtyRecord) {
|
||||||
|
taskPluginCollector.collectDirtyRecord(record, "primarykey field is null");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Upsert upsert = table.newUpsert();
|
||||||
|
Insert insert = table.newInsert();
|
||||||
|
|
||||||
|
for (int i = 0; i < columns.size(); i++) {
|
||||||
|
PartialRow row;
|
||||||
|
if (isUpsert) {
|
||||||
|
//覆盖更新
|
||||||
|
row = upsert.getRow();
|
||||||
|
} else {
|
||||||
|
//增量更新
|
||||||
|
row = insert.getRow();
|
||||||
|
}
|
||||||
|
Configuration col = columns.get(i);
|
||||||
|
String name = col.getString(Key.NAME);
|
||||||
|
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE));
|
||||||
|
Column column = record.getColumn(col.getInt(Key.INDEX, i));
|
||||||
|
Object rawData = column.getRawData();
|
||||||
|
if (rawData == null) {
|
||||||
|
row.setNull(name);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
switch (type) {
|
||||||
|
case INT:
|
||||||
|
row.addInt(name, Integer.parseInt(rawData.toString()));
|
||||||
|
break;
|
||||||
|
case LONG:
|
||||||
|
case BIGINT:
|
||||||
|
row.addLong(name, Long.parseLong(rawData.toString()));
|
||||||
|
break;
|
||||||
|
case FLOAT:
|
||||||
|
row.addFloat(name, Float.parseFloat(rawData.toString()));
|
||||||
|
break;
|
||||||
|
case DOUBLE:
|
||||||
|
row.addDouble(name, Double.parseDouble(rawData.toString()));
|
||||||
|
break;
|
||||||
|
case BOOLEAN:
|
||||||
|
row.addBoolean(name, Boolean.getBoolean(rawData.toString()));
|
||||||
|
break;
|
||||||
|
case STRING:
|
||||||
|
default:
|
||||||
|
row.addString(name, rawData.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
RetryUtil.executeWithRetry(()->{
|
||||||
|
if (isUpsert) {
|
||||||
|
//覆盖更新
|
||||||
|
session.apply(upsert);
|
||||||
|
} else {
|
||||||
|
//增量更新
|
||||||
|
session.apply(insert);
|
||||||
|
}
|
||||||
|
//提前写数据,阈值可自定义
|
||||||
|
if (counter.incrementAndGet() > batchSize * 0.75) {
|
||||||
|
session.flush();
|
||||||
|
counter.set(0L);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
},5,1000L,true);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Data write failed!", e);
|
||||||
|
if (isSkipFail) {
|
||||||
|
LOG.warn("Because you have configured skipFail is true,this data will be skipped!");
|
||||||
|
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
|
||||||
|
}else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e);
|
||||||
|
}
|
||||||
|
AtomicInteger i = new AtomicInteger(10);
|
||||||
|
try {
|
||||||
|
while (i.get() > 0) {
|
||||||
|
if (session.hasPendingOperations()) {
|
||||||
|
session.flush();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
i.decrementAndGet();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Waiting for data to be inserted...... " + i + "s");
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
i.decrementAndGet();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
session.flush();
|
||||||
|
} catch (KuduException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
package com.q1.kudu.conf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-09-16 11:39
|
||||||
|
**/
|
||||||
|
public class KuduConfig {
|
||||||
|
|
||||||
|
}
|
7
kuduwriter/src/main/resources/plugin.json
Normal file
7
kuduwriter/src/main/resources/plugin.json
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"name": "kudu11xwriter",
|
||||||
|
"class": "com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xWriter",
|
||||||
|
"description": "use put: prod. mechanism: use kudu java api put data.",
|
||||||
|
"developer": "com.q1.daizihao"
|
||||||
|
}
|
||||||
|
|
59
kuduwriter/src/main/resources/plugin_job_template.json
Normal file
59
kuduwriter/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
{
|
||||||
|
"name": "kudu11xwriter",
|
||||||
|
"parameter": {
|
||||||
|
"kuduConfig": {
|
||||||
|
"kudu.master_addresses": "***",
|
||||||
|
"timeout": 60000,
|
||||||
|
"sessionTimeout": 60000
|
||||||
|
|
||||||
|
},
|
||||||
|
"table": "",
|
||||||
|
"replicaCount": 3,
|
||||||
|
"truncate": false,
|
||||||
|
"writeMode": "upsert",
|
||||||
|
"partition": {
|
||||||
|
"range": {
|
||||||
|
"column1": [
|
||||||
|
{
|
||||||
|
"lower": "2020-08-25",
|
||||||
|
"upper": "2020-08-26"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-26",
|
||||||
|
"upper": "2020-08-27"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lower": "2020-08-27",
|
||||||
|
"upper": "2020-08-28"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": {
|
||||||
|
"column": [
|
||||||
|
"column1"
|
||||||
|
],
|
||||||
|
"number": 3
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"name": "c1",
|
||||||
|
"type": "string",
|
||||||
|
"primaryKey": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 1,
|
||||||
|
"name": "c2",
|
||||||
|
"type": "string",
|
||||||
|
"compress": "DEFAULT_COMPRESSION",
|
||||||
|
"encoding": "AUTO_ENCODING",
|
||||||
|
"comment": "注解xxxx"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1024,
|
||||||
|
"bufferSize": 2048,
|
||||||
|
"skipFail": false,
|
||||||
|
"encoding": "UTF-8"
|
||||||
|
}
|
||||||
|
}
|
23
kuduwriter/src/test/java/com/dai/test.java
Normal file
23
kuduwriter/src/test/java/com/dai/test.java
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package com.dai;
|
||||||
|
|
||||||
|
import com.q1.datax.plugin.writer.kudu11xwriter.Kudu11xHelper;
|
||||||
|
import org.junit.Test;
|
||||||
|
import com.q1.datax.plugin.writer.kudu11xwriter.ColumnType;
|
||||||
|
import com.q1.datax.plugin.writer.kudu11xwriter.InsertModeType;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.apache.kudu.client.AsyncKuduClient.LOG;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author daizihao
|
||||||
|
* @create 2020-08-28 11:03
|
||||||
|
**/
|
||||||
|
public class test {
|
||||||
|
@Test
|
||||||
|
public void kuduTypeTest() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
1
pom.xml
1
pom.xml
@ -102,6 +102,7 @@
|
|||||||
<module>plugin-unstructured-storage-util</module>
|
<module>plugin-unstructured-storage-util</module>
|
||||||
<module>hbase20xsqlreader</module>
|
<module>hbase20xsqlreader</module>
|
||||||
<module>hbase20xsqlwriter</module>
|
<module>hbase20xsqlwriter</module>
|
||||||
|
<module>kuduwriter</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
|
Loading…
Reference in New Issue
Block a user