mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:59:51 +08:00
Merge 42b796c19d
into 0824b45c5e
This commit is contained in:
commit
3da305bfe8
176
paimonwrite/pom.xml
Normal file
176
paimonwrite/pom.xml
Normal file
@ -0,0 +1,176 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<artifactId>datax-all</artifactId>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>paimonwriter</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
|
||||
<name>paimonwriter</name>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<maven.compiler.source>1.7</maven.compiler.source>
|
||||
<maven.compiler.target>1.7</maven.compiler.target>
|
||||
<hive.version>2.3.1</hive.version>
|
||||
<hadoop.version>3.3.4</hadoop.version>
|
||||
<paimon.version>0.8-SNAPSHOT</paimon.version>
|
||||
<fb303.version>0.9.3</fb303.version>
|
||||
<thrift.version>0.12.0</thrift.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-core</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>datax-common</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.paimon</groupId>
|
||||
<artifactId>paimon-bundle</artifactId>
|
||||
<version>0.7.0-incubating</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-jdbc</artifactId>
|
||||
<version>3.1.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libfb303</artifactId>
|
||||
<version>${fb303.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libthrift</artifactId>
|
||||
<version>${thrift.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>16.0.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>3.3.4</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.glassfish.jersey</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-aws</artifactId>
|
||||
<version>3.3.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
<version>1.10.34</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>30.1.1-jre</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
<version>9.4.15.v20190215</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.datax</groupId>
|
||||
<artifactId>plugin-unstructured-storage-util</artifactId>
|
||||
<version>${datax-project-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- compiler plugin -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>${jdk-version}</source>
|
||||
<target>${jdk-version}</target>
|
||||
<encoding>${project-sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||
</descriptors>
|
||||
<finalName>datax</finalName>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>dwzip</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
266
paimonwrite/src/doc/paimonwriter.md
Normal file
266
paimonwrite/src/doc/paimonwriter.md
Normal file
@ -0,0 +1,266 @@
|
||||
|
||||
# PaimonWriter 插件文档
|
||||
|
||||
|
||||
___
|
||||
|
||||
|
||||
|
||||
## 1 快速介绍
|
||||
|
||||
PaimonWriter插件实现了向数据湖Paimon中写入数据,在底层实现上,通过调用paimon的batch write和stream write的相关方法来讲数据写入到paimon中
|
||||
|
||||
## 2 实现原理
|
||||
|
||||
通过读取paimon的文件catalog或者hive catalog的路径,以及相关hadoop配置,hive配置等信息来写入数据 元数据文件等信息到文件系统中
|
||||
|
||||
## 3 功能说明
|
||||
|
||||
### 3.1 配置样例
|
||||
|
||||
* 配置一个从mysql到paimon导入的作业:
|
||||
|
||||
```
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 2
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"column": [
|
||||
"id",
|
||||
"name",
|
||||
"age",
|
||||
"score",
|
||||
"create_at",
|
||||
"update_at",
|
||||
"dt"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
|
||||
],
|
||||
"table": [
|
||||
"user"
|
||||
]
|
||||
}
|
||||
],
|
||||
"password": "root1234",
|
||||
"username": "root",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "paimonwriter",
|
||||
"parameter": {
|
||||
"tableName": "test",
|
||||
"databaseName": "paimon",
|
||||
"catalogPath": "/app/hive/warehouse",
|
||||
"metastoreUri": "thrift://127.0.0.1:9083",
|
||||
"hiveConfDir": "/your/path",
|
||||
"catalogType": "hive",
|
||||
"hiveConfDir": "/your/path",
|
||||
"hadoopConfDir": "/your/path",
|
||||
"tableBucket": 2,
|
||||
"primaryKey": "dt,id",
|
||||
"partitionFields": "dt",
|
||||
"writeOption": "stream_insert",
|
||||
"batchSize": 100,
|
||||
"hadoopConfig": {
|
||||
"hdfsUser": "hdfs",
|
||||
"coreSitePath": "/your/path/core-site.xml",
|
||||
"hdfsSitePath": "/your/path/hdfs-site.xml"
|
||||
},
|
||||
"paimonConfig": {
|
||||
"compaction.min.file-num": "3",
|
||||
"compaction.max.file-num": "6",
|
||||
"snapshot.time-retained": "2h",
|
||||
"snapshot.num-retained.min": "5",
|
||||
"hive.table.owner": "zhangsan",
|
||||
"hive.storage.format": "ORC"
|
||||
},
|
||||
"column": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "name",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "age",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "score",
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"name": "create_at",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "update_at",
|
||||
"type": "string"
|
||||
},{
|
||||
"name": "dt",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
### 3.2 参数说明
|
||||
|
||||
* **metastoreUri**
|
||||
|
||||
* 描述:需要配置hive的metastore地址:thrift://127.0.0.1:9083,注意:当设置了metastoreUri,则不需要设置hiveConfDir。 <br />
|
||||
|
||||
* 必选:metastoreUri和hiveConfDir配置二选一 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **hiveConfDir**
|
||||
|
||||
* 描述:如果没有设置hive的metastoreUri,则需要设置hiveConfDir路径,注意:路径中必须要包含hive-site.xml文件。 <br />
|
||||
|
||||
* 必选:metastoreUri和hiveConfDir配置二选一 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **catalogPath**
|
||||
|
||||
* 描述:catalogPath是paimon创建的catalog路径,可以包含文件系统的和hdfs系统的路径。 <br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **catalogType**
|
||||
|
||||
* 描述:paimon的catalog类型,支持两种选项,1.file,2.hive <br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **hadoopConfDir**
|
||||
|
||||
* 描述:paimon依赖的hadoop文件配置路径,注意:路径下面要包含两个文件:hdfs-site.xml,core-site.xml <br />
|
||||
|
||||
* 必选:hadoopConfDir和hadoopConfig下的coreSitePath,hdfsSitePath配置二选一 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **writeOption**
|
||||
|
||||
* 描述:paimon写入数据的方式,目前支持2种方式:1.batch_insert(按照官方的定义模式,每次只能有一次提交),2.stream_insert(支持多次提交) <br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:false <br />
|
||||
|
||||
* **hadoopConfig**
|
||||
|
||||
* 描述:设置hadoop的配置参数,可以以设置配置文件core-site.xml和hdfs-site.xml以及可配置kerberos和s3相关参数。<br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **paimonConfig**
|
||||
|
||||
* 描述:paimon的相关配置信息都可以加入。<br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **keyspace**
|
||||
|
||||
* 描述:需要同步的表所在的keyspace。<br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **table**
|
||||
|
||||
* 描述:所选取的需要同步的表。<br />
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **column**
|
||||
|
||||
* 描述:所配置的表中需要同步的列集合。<br />
|
||||
内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。
|
||||
|
||||
* 必选:是 <br />
|
||||
|
||||
* 默认值:无 <br />
|
||||
|
||||
* **bucket**
|
||||
|
||||
* 描述:paimon设置bucket大小,注意如果设置为-1则会出现,无法动态的写入分区错误:<br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:2 <br />
|
||||
|
||||
* **batchSize**
|
||||
|
||||
* 描述:一次批量提交(BATCH)的记录条数,注意:次配置是配合在stream_insert模式下使用的,其他模式无效:<br />
|
||||
|
||||
* 必选:否 <br />
|
||||
|
||||
* 默认值:10 <br />
|
||||
|
||||
|
||||
### 3.3 类型转换
|
||||
|
||||
| DataX 内部类型| paimon 数据类型 |
|
||||
| -------- | ----- |
|
||||
| Long |long|
|
||||
| float |float|
|
||||
| float |float|
|
||||
| decimal |decimal|
|
||||
| String |string |
|
||||
| Date |date, timestamp,datatime, string |
|
||||
| Boolean |boolean |
|
||||
|
||||
|
||||
请注意:
|
||||
|
||||
* 目前不支持union,row,struct类型和custom类型。
|
||||
|
||||
## 4 性能报告
|
||||
|
||||
略
|
||||
|
||||
## 5 约束限制
|
||||
|
||||
### 5.1 主备同步数据恢复问题
|
||||
|
||||
略
|
||||
|
||||
## 6 FAQ
|
||||
|
||||
|
||||
|
35
paimonwrite/src/main/assembly/package.xml
Executable file
35
paimonwrite/src/main/assembly/package.xml
Executable file
@ -0,0 +1,35 @@
|
||||
<assembly
|
||||
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id></id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>plugin.json</include>
|
||||
<include>plugin_job_template.json</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/paimonwriter</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>target/</directory>
|
||||
<includes>
|
||||
<include>paimonwriter-0.0.1-SNAPSHOT.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>plugin/writer/paimonwriter</outputDirectory>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>false</useProjectArtifact>
|
||||
<outputDirectory>plugin/writer/paimonwriter/libs</outputDirectory>
|
||||
<scope>runtime</scope>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
@ -0,0 +1,59 @@
|
||||
package com.alibaba.datax.plugin.writer.paimonwriter;
|
||||
|
||||
public class Key {
|
||||
|
||||
public static final String PAIMON_DB_NAME = "databaseName";
|
||||
public static final String PAIMON_TABLE_NAME = "tableName";
|
||||
public static final String PAIMON_PRIMARY_KEY = "primaryKey";
|
||||
public static final String PAIMON_PARTITION_FIELDS = "partitionFields";
|
||||
public static final String PAIMON_BATCH_SIZE = "batchSize";
|
||||
|
||||
public static final String PAIMON_COLUMN = "column";
|
||||
|
||||
/**
|
||||
* writerOption
|
||||
*/
|
||||
public static final String PAIMON_WRITE_OPTION = "writeOption";
|
||||
public static final String PAIMON_WRITE_OPTION_BATCH_INSERT = "batch_insert";
|
||||
public static final String PAIMON_WRITE_OPTION_STREAM_INSERT = "stream_insert";
|
||||
|
||||
public static final String PAIMON_CATALOG_TYPE = "catalogType";
|
||||
/**
|
||||
* warehouse path
|
||||
*/
|
||||
public static final String PAIMON_CATALOG_PATH = "catalogPath";
|
||||
public static final String PAIMON_TABLE_BUCKET = "tableBucket";
|
||||
public static final String PAIMON_METASTORE_TYPE = "metastoreType";
|
||||
/**
|
||||
* thrift://<hive-metastore-host-name>:<port>
|
||||
*/
|
||||
public static final String PAIMON_METASTORE_URI = "metastoreUri";
|
||||
|
||||
public static final String PAIMON_CATALOG_FILE = "file";
|
||||
public static final String PAIMON_CATALOG_HIVE = "hive";
|
||||
|
||||
public static final String PAIMON_HIVE_CONF_DIR = "hiveConfDir";
|
||||
public static final String PAIMON_HADOOP_CONF_DIR = "hadoopConfDir";
|
||||
|
||||
// Kerberos
|
||||
public static final String HAVE_KERBEROS = "haveKerberos";
|
||||
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
|
||||
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
|
||||
|
||||
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
|
||||
|
||||
// hadoop config
|
||||
public static final String HADOOP_CONFIG = "hadoopConfig";
|
||||
|
||||
//paimon config
|
||||
public static final String PAIMON_CONFIG = "paimonConfig";
|
||||
|
||||
//S3
|
||||
public static final String S3A_SSL = "fs.s3a.connection.ssl.enabled";
|
||||
public static final String S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
|
||||
public static final String S3A_USER_NAME = "fs.s3a.access.key";
|
||||
public static final String S3A_USER_PWD = "fs.s3a.secret.key";
|
||||
public static final String S3A_ENDPOINT = "fs.s3a.endpoint";
|
||||
public static final String S3A_IMPL = "fs.s3a.impl";
|
||||
|
||||
}
|
@ -0,0 +1,563 @@
|
||||
package com.alibaba.datax.plugin.writer.paimonwriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.DateColumn;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.paimon.catalog.Catalog;
|
||||
import org.apache.paimon.catalog.CatalogContext;
|
||||
import org.apache.paimon.catalog.CatalogFactory;
|
||||
import org.apache.paimon.catalog.Identifier;
|
||||
import org.apache.paimon.data.BinaryString;
|
||||
import org.apache.paimon.data.GenericRow;
|
||||
import org.apache.paimon.options.Options;
|
||||
import org.apache.paimon.schema.Schema;
|
||||
import org.apache.paimon.table.Table;
|
||||
import org.apache.paimon.table.sink.*;
|
||||
import org.apache.paimon.types.*;
|
||||
import org.apache.paimon.utils.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.*;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HADOOP_SECURITY_AUTHENTICATION_KEY;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HAVE_KERBEROS;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_BATCH_SIZE;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_FILE;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_HIVE;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_PATH;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CONFIG;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HADOOP_CONF_DIR;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HIVE_CONF_DIR;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_METASTORE_URI;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_PRIMARY_KEY;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_TABLE_BUCKET;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_BATCH_INSERT;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_STREAM_INSERT;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.*;
|
||||
import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.PAIMON_PARAM_LOST;
|
||||
|
||||
public class PaimonWriter extends Writer {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PaimonWriter.class);
|
||||
|
||||
public static class Job extends Writer.Job {
|
||||
private Configuration originalConfig;
|
||||
|
||||
@Override
|
||||
public List<Configuration> split(int mandatoryNumber) {
|
||||
List<Configuration> list = new ArrayList<>();
|
||||
for (int i = 0; i < mandatoryNumber; i++) {
|
||||
list.add(originalConfig.clone());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class Task extends Writer.Task {
|
||||
private String primaryKey;
|
||||
private String partitionFields;
|
||||
private String writeOption;
|
||||
private int batchSize;
|
||||
private Configuration sliceConfig;
|
||||
private List<Configuration> columnsList;
|
||||
|
||||
private String catalogPath;
|
||||
private String catalogType;
|
||||
private Catalog catalog;
|
||||
private Table table;
|
||||
private int bucket;
|
||||
private String hiveConfDir;
|
||||
private String hadoopConfDir;
|
||||
private String metastoreUri;
|
||||
private String coreSitePath;
|
||||
private String hdfsSitePath;
|
||||
private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
//获取与本task相关的配置
|
||||
this.sliceConfig = super.getPluginJobConf();
|
||||
String tableName = sliceConfig.getNecessaryValue(PAIMON_TABLE_NAME, PAIMON_ERROR_TABLE);
|
||||
String dbName = sliceConfig.getNecessaryValue(PAIMON_DB_NAME, PAIMON_ERROR_DB);
|
||||
catalogPath = sliceConfig.getNecessaryValue(PAIMON_CATALOG_PATH, PAIMON_PARAM_LOST);
|
||||
catalogType = sliceConfig.getNecessaryValue(PAIMON_CATALOG_TYPE, PAIMON_PARAM_LOST);
|
||||
bucket = sliceConfig.getInt(PAIMON_TABLE_BUCKET, 2);
|
||||
batchSize = sliceConfig.getInt(PAIMON_BATCH_SIZE, 10);
|
||||
writeOption = sliceConfig.getNecessaryValue(PAIMON_WRITE_OPTION, PAIMON_PARAM_LOST);
|
||||
|
||||
partitionFields = sliceConfig.getString(PAIMON_PARTITION_FIELDS);
|
||||
primaryKey = sliceConfig.getString(PAIMON_PRIMARY_KEY);
|
||||
columnsList = sliceConfig.getListConfiguration(PAIMON_COLUMN);
|
||||
|
||||
Configuration hadoopSiteParams = sliceConfig.getConfiguration(HADOOP_CONFIG);
|
||||
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(HADOOP_CONFIG));
|
||||
if (null != hadoopSiteParams) {
|
||||
Set<String> paramKeys = hadoopSiteParams.getKeys();
|
||||
for (String each : paramKeys) {
|
||||
if(each.equals("hdfsUser")) {
|
||||
System.setProperty("HADOOP_USER_NAME", hadoopSiteParamsAsJsonObject.getString(each));
|
||||
} else if(each.equals("coreSitePath")) {
|
||||
coreSitePath = hadoopSiteParamsAsJsonObject.getString(each);
|
||||
} else if(each.equals("hdfsSitePath")) {
|
||||
hdfsSitePath = hadoopSiteParamsAsJsonObject.getString(each);
|
||||
} else {
|
||||
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
//是否有Kerberos认证
|
||||
Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false);
|
||||
if(haveKerberos){
|
||||
String kerberosKeytabFilePath = sliceConfig.getString(KERBEROS_KEYTAB_FILE_PATH);
|
||||
String kerberosPrincipal = sliceConfig.getString(KERBEROS_PRINCIPAL);
|
||||
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
|
||||
this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
|
||||
}
|
||||
|
||||
switch (catalogType) {
|
||||
case PAIMON_CATALOG_FILE :
|
||||
catalog = createFilesystemCatalog();
|
||||
break;
|
||||
case PAIMON_CATALOG_HIVE :
|
||||
metastoreUri = sliceConfig.getString(PAIMON_METASTORE_URI);
|
||||
hiveConfDir = sliceConfig.getString(PAIMON_HIVE_CONF_DIR);
|
||||
hadoopConfDir = sliceConfig.getString(PAIMON_HADOOP_CONF_DIR);
|
||||
catalog = createHiveCatalog();
|
||||
break;
|
||||
default :
|
||||
LOG.error("unsupported catalog type :{}", catalogType);
|
||||
break;
|
||||
}
|
||||
|
||||
if(!tableExists(catalog, dbName, tableName)) {
|
||||
LOG.info("{} 表不存在,开始创建...", dbName.concat("." + tableName));
|
||||
CreateTable(catalog, dbName, tableName, columnsList, primaryKey.split(","), partitionFields.split(","));
|
||||
}
|
||||
|
||||
table = getTable(catalog, dbName, tableName);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error(ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startWrite(RecordReceiver recordReceiver) {
|
||||
Record record;
|
||||
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
|
||||
DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
WriteBuilder writeBuilder = null;
|
||||
//Write records in distributed tasks
|
||||
TableWrite write = null;
|
||||
Boolean isStream = false;
|
||||
switch (writeOption) {
|
||||
case PAIMON_WRITE_OPTION_BATCH_INSERT:
|
||||
writeBuilder = table.newBatchWriteBuilder().withOverwrite();
|
||||
write = writeBuilder.newWrite();
|
||||
break;
|
||||
case PAIMON_WRITE_OPTION_STREAM_INSERT:
|
||||
writeBuilder = table.newStreamWriteBuilder();
|
||||
write = writeBuilder.newWrite();
|
||||
isStream = true;
|
||||
break;
|
||||
default:
|
||||
LOG.error("unsupported write option type :{}", writeOption);
|
||||
}
|
||||
|
||||
TableCommit commit = null;
|
||||
List<CommitMessage> messages = null;
|
||||
AtomicLong counter = new AtomicLong(0);
|
||||
long num = 0;
|
||||
long commitIdentifier = 0;
|
||||
|
||||
while ((record = recordReceiver.getFromReader()) != null) {
|
||||
|
||||
GenericRow row = new GenericRow(columnsList.size());
|
||||
for (int i = 0; i < columnsList.size(); i++) {
|
||||
Configuration configuration = columnsList.get(i);
|
||||
String columnType = configuration.getString("type");
|
||||
Column column = record.getColumn(i);
|
||||
Object rawData = column.getRawData();
|
||||
|
||||
if (rawData == null) {
|
||||
row.setField(i, null);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (columnType) {
|
||||
case "int":
|
||||
row.setField(i, Integer.parseInt(rawData.toString()));
|
||||
break;
|
||||
case "long":
|
||||
row.setField(i, Long.parseLong(rawData.toString()));
|
||||
break;
|
||||
case "float":
|
||||
row.setField(i, Float.parseFloat(rawData.toString()));
|
||||
break;
|
||||
case "double":
|
||||
row.setField(i, Double.parseDouble(rawData.toString()));
|
||||
break;
|
||||
case "date":
|
||||
row.setField(i, dateFormat.format(rawData));
|
||||
break;
|
||||
case "datetime":
|
||||
row.setField(i, dateTimeFormat.format(rawData));
|
||||
break;
|
||||
case "boolean":
|
||||
row.setField(i, Boolean.parseBoolean(rawData.toString()));
|
||||
break;
|
||||
case "string":
|
||||
if(column instanceof DateColumn) {
|
||||
row.setField(i, BinaryString.fromString(column.asString()));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
row.setField(i, BinaryString.fromString(rawData.toString()));
|
||||
}
|
||||
|
||||
}
|
||||
try {
|
||||
write.write(row, bucket);
|
||||
if(isStream) {
|
||||
num = counter.incrementAndGet();
|
||||
commitIdentifier++;
|
||||
if(num >= batchSize) {
|
||||
List<CommitMessage> streamMsgs = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier);
|
||||
// Collect all CommitMessages to a global node and commit
|
||||
StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit();
|
||||
stc.commit(commitIdentifier, streamMsgs);
|
||||
counter.set(0L);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error("write is failed!", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
try {
|
||||
flushCache(isStream, commitIdentifier, num, writeBuilder, write, messages, commit);
|
||||
} catch (Exception e) {
|
||||
//Abort unsuccessful commit to delete data files
|
||||
if(null != commit) {
|
||||
commit.abort(messages);
|
||||
}
|
||||
LOG.error("data commit is failed!", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void flushCache(boolean isStream, long commitIdentifier, long num, WriteBuilder writeBuilder, TableWrite write, List<CommitMessage> messages, TableCommit commit) throws Exception {
|
||||
|
||||
if (isStream && num > 0) {
|
||||
messages = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier);
|
||||
// Collect all CommitMessages to a global node and commit
|
||||
StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit();
|
||||
stc.commit(commitIdentifier, messages);
|
||||
} else {
|
||||
messages = ((BatchTableWrite)write).prepareCommit();
|
||||
//Collect all CommitMessages to a global node and commit
|
||||
commit = writeBuilder.newCommit();
|
||||
|
||||
if(commit == null || messages == null) {
|
||||
throw new RuntimeException("commit or messages info not exist");
|
||||
}
|
||||
((BatchTableCommit) commit).commit(messages);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//file system catalog
|
||||
public Catalog createFilesystemCatalog() {
|
||||
CatalogContext context = CatalogContext.create(new org.apache.paimon.fs.Path(catalogPath));
|
||||
return CatalogFactory.createCatalog(context);
|
||||
}
|
||||
|
||||
//hive catalog
|
||||
public Catalog createHiveCatalog() {
|
||||
// Paimon Hive catalog relies on Hive jars
|
||||
// You should add hive classpath or hive bundled jar.
|
||||
Options options = new Options();
|
||||
CatalogContext context;
|
||||
options.set("warehouse", catalogPath);
|
||||
options.set("metastore", catalogType);
|
||||
//默认设置为外部表
|
||||
options.set("table.type", "external");
|
||||
|
||||
/**
|
||||
* 1.如果metastore uri 存在,则不需要设置 hiveConfDir
|
||||
* 2.如果metastore uri 不存在,读取 hiveConfDir下的hive-site.xml也可以
|
||||
*/
|
||||
if(StringUtils.isNotBlank(metastoreUri)) {
|
||||
options.set("uri", metastoreUri);
|
||||
} else if(StringUtils.isNotBlank(hiveConfDir)) {
|
||||
options.set("hive-conf-dir", hiveConfDir);
|
||||
} else {
|
||||
throw DataXException.asDataXException(PAIMON_PARAM_LOST,
|
||||
String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_METASTORE_URI, PAIMON_HIVE_CONF_DIR));
|
||||
}
|
||||
|
||||
/**
|
||||
* 1:通过配置hadoop-conf-dir(目录中必须包含hive-site.xml,core-site.xml文件)来创建catalog
|
||||
* 2:通过配置hadoopConf(指定:coreSitePath:/path/core-site.xml,hdfsSitePath: /path/hdfs-site.xml)的方式来创建catalog
|
||||
*/
|
||||
if(StringUtils.isNotBlank(hadoopConfDir)) {
|
||||
options.set("hadoop-conf-dir", hadoopConfDir);
|
||||
context = CatalogContext.create(options);
|
||||
}else if(StringUtils.isNotBlank(coreSitePath) && StringUtils.isNotBlank(hdfsSitePath)) {
|
||||
context = CatalogContext.create(options, hadoopConf);
|
||||
} else {
|
||||
throw DataXException.asDataXException(PAIMON_PARAM_LOST,
|
||||
String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_HADOOP_CONF_DIR, "hadoopConfig:coreSiteFile&&hdfsSiteFile"));
|
||||
}
|
||||
|
||||
return CatalogFactory.createCatalog(context);
|
||||
|
||||
}
|
||||
|
||||
public void CreateTable(Catalog catalog, String dbName, String tableName, List<Configuration> cols, String[] pks, String[] partKeys) {
|
||||
|
||||
Configuration paimonTableParams = sliceConfig.getConfiguration(PAIMON_CONFIG);
|
||||
JSONObject paimonParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(PAIMON_CONFIG));
|
||||
|
||||
Schema.Builder schemaBuilder = Schema.newBuilder();
|
||||
|
||||
if (null != paimonTableParams) {
|
||||
Set<String> paramKeys = paimonTableParams.getKeys();
|
||||
for (String each : paramKeys) {
|
||||
schemaBuilder.option(each, paimonParamsAsJsonObject.getString(each));
|
||||
}
|
||||
}
|
||||
|
||||
for (Configuration columnConfig : cols) {
|
||||
String columnName = columnConfig.getString("name");
|
||||
DataType columnType = getPaimonDataType(columnConfig.getString("type"));
|
||||
schemaBuilder.column(columnName, columnType, columnName);
|
||||
}
|
||||
|
||||
if(pks != null && partKeys.length > 0) {
|
||||
schemaBuilder.primaryKey(pks);
|
||||
}
|
||||
|
||||
Schema schema = schemaBuilder.build();
|
||||
|
||||
if(partKeys != null && partKeys.length > 0) {
|
||||
schemaBuilder.partitionKeys(partKeys);
|
||||
schema = schemaBuilder.option("metastore.partitioned-table", "true").build();
|
||||
}
|
||||
|
||||
Identifier identifier = Identifier.create(dbName, tableName);
|
||||
try {
|
||||
catalog.createTable(identifier, schema, false);
|
||||
} catch (Catalog.TableAlreadyExistException e) {
|
||||
throw new RuntimeException("table not exist");
|
||||
} catch (Catalog.DatabaseNotExistException e) {
|
||||
throw new RuntimeException("database not exist");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public int getMatchValue(String typeName) {
|
||||
|
||||
//获取长度
|
||||
String regex = "\\((\\d+)\\)";
|
||||
Pattern pattern = Pattern.compile(regex);
|
||||
Matcher matcher = pattern.matcher(typeName);
|
||||
int res = 0;
|
||||
|
||||
if (matcher.find()) {
|
||||
res = Integer.parseInt(matcher.group(1));
|
||||
} else {
|
||||
LOG.error("{}:类型错误,请检查!", typeName);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public Pair<Integer, Integer> getDecValue (String typeName) {
|
||||
|
||||
String regex = "dd\\((\\d+), (\\d+)\\)";
|
||||
|
||||
Pattern pattern = Pattern.compile(regex);
|
||||
Matcher matcher = pattern.matcher(typeName.trim());
|
||||
int left = 0;
|
||||
int right = 0;
|
||||
|
||||
if (matcher.find()) {
|
||||
left = Integer.parseInt(matcher.group(1));
|
||||
right = Integer.parseInt(matcher.group(2));
|
||||
} else {
|
||||
LOG.error("{}:类型错误,请检查!", typeName);
|
||||
}
|
||||
|
||||
return Pair.of(left, right);
|
||||
|
||||
}
|
||||
|
||||
public DataType getPaimonDataType(String typeName) {
|
||||
|
||||
String type = typeName.toUpperCase();
|
||||
DataType dt = DataTypes.STRING();
|
||||
|
||||
if(type.equals("BINARY") && !type.contains("VARBINARY")) {
|
||||
dt = type.contains("(") ? new BinaryType(getMatchValue(type.trim())) : new BinaryType();
|
||||
} else if(type.contains("VARBINARY")) {
|
||||
dt = type.contains("(") ? new VarBinaryType(getMatchValue(type.trim())): new VarBinaryType();
|
||||
} else if(type.contains("STRING")) {
|
||||
dt = VarCharType.STRING_TYPE;
|
||||
} else if(type.contains("VARCHAR")) {
|
||||
dt = type.contains("(") ? new VarCharType(getMatchValue(type.trim())): new VarCharType();
|
||||
} else if(type.contains("CHAR")) {
|
||||
if(type.contains("NOT NULL")) {
|
||||
dt = new CharType().copy(false);
|
||||
}else if (type.contains("(")) {
|
||||
dt = new CharType(getMatchValue(type.trim()));
|
||||
}else {
|
||||
dt = new CharType();
|
||||
}
|
||||
} else if(type.contains("BOOLEAN")) {
|
||||
dt = new BooleanType();
|
||||
} else if(type.contains("BYTES")) {
|
||||
dt = new VarBinaryType(VarBinaryType.MAX_LENGTH);
|
||||
} else if(type.contains("DEC")) { // 包含 DEC 和 DECIMAL
|
||||
if(type.contains(",")) {
|
||||
dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight());
|
||||
}else if(type.contains("(")) {
|
||||
dt = new DecimalType(getMatchValue(type.trim()));
|
||||
}else {
|
||||
dt = new DecimalType();
|
||||
}
|
||||
} else if(type.contains("NUMERIC") || type.contains("DECIMAL")) {
|
||||
if(type.contains(",")) {
|
||||
dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight());
|
||||
}else if(type.contains("(")) {
|
||||
dt = new DecimalType(getMatchValue(type.trim()));
|
||||
}else {
|
||||
dt = new DecimalType();
|
||||
}
|
||||
} else if(type.equals("INT")) {
|
||||
dt = new IntType();
|
||||
} else if(type.equals("BIGINT") || type.equals("LONG")) {
|
||||
dt = new BigIntType();
|
||||
} else if(type.equals("TINYINT")) {
|
||||
dt = new TinyIntType();
|
||||
} else if(type.equals("SMALLINT")) {
|
||||
dt = new SmallIntType();
|
||||
} else if(type.equals("INTEGER")) {
|
||||
dt = new IntType();
|
||||
} else if(type.contains("FLOAT")) {
|
||||
dt = new FloatType();
|
||||
} else if(type.contains("DOUBLE")) {
|
||||
dt = new DoubleType();
|
||||
} else if(type.contains("DATE")) {
|
||||
dt = new DateType();
|
||||
} else if(type.contains("TIME")) {
|
||||
dt = type.contains("(") ? new TimeType(getMatchValue(type.trim())): new TimeType();
|
||||
} else if(type.contains("TIMESTAMP")) {
|
||||
switch (type) {
|
||||
case "TIMESTAMP":
|
||||
case "TIMESTAMP WITHOUT TIME ZONE":
|
||||
dt = new TimestampType();
|
||||
break;
|
||||
case "TIMESTAMP(3)":
|
||||
case "TIMESTAMP(3) WITHOUT TIME ZONE":
|
||||
dt = new TimestampType(3);
|
||||
break;
|
||||
case "TIMESTAMP WITH LOCAL TIME ZONE":
|
||||
case "TIMESTAMP_LTZ":
|
||||
dt = new LocalZonedTimestampType();
|
||||
break;
|
||||
case "TIMESTAMP(3) WITH LOCAL TIME ZONE":
|
||||
case "TIMESTAMP_LTZ(3)":
|
||||
dt = new LocalZonedTimestampType(3);
|
||||
break;
|
||||
default:
|
||||
LOG.error("{}:类型错误,请检查!", type);
|
||||
}
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Not a supported type: " + typeName);
|
||||
}
|
||||
|
||||
return dt;
|
||||
|
||||
}
|
||||
|
||||
public Table getTable(Catalog catalog, String dbName, String tableName) {
|
||||
try {
|
||||
Identifier identifier = Identifier.create(dbName, tableName);
|
||||
return catalog.getTable(identifier);
|
||||
} catch (Catalog.TableNotExistException e) {
|
||||
throw new RuntimeException("table not exist", e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tableExists(Catalog catalog, String dbName, String tableName) {
|
||||
Identifier identifier = Identifier.create(dbName, tableName);
|
||||
boolean exists = catalog.tableExists(identifier);
|
||||
return exists;
|
||||
}
|
||||
|
||||
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){
|
||||
if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){
|
||||
UserGroupInformation.setConfiguration(hadoopConf);
|
||||
try {
|
||||
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
|
||||
} catch (Exception e) {
|
||||
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
|
||||
kerberosKeytabFilePath, kerberosPrincipal);
|
||||
LOG.error(message);
|
||||
throw DataXException.asDataXException(KERBEROS_LOGIN_ERROR, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package com.alibaba.datax.plugin.writer.paimonwriter;
|
||||
|
||||
import com.alibaba.datax.common.spi.ErrorCode;
|
||||
|
||||
public enum PaimonWriterErrorCode implements ErrorCode {
|
||||
PAIMON_ERROR_DB("Paimon Error DB", "您的参数配置错误."),
|
||||
PAIMON_ERROR_TABLE("Paimon Error Table", "您的参数配置错误."),
|
||||
PAIMON_PARAM_LOST("Paimon Param Lost", "您缺失了必须填写的参数值."),
|
||||
HDFS_CONNECT_ERROR("Hdfs Connect Error", "与HDFS建立连接时出现IO异常."),
|
||||
KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS认证失败");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
PaimonWriterErrorCode(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCode() {
|
||||
return this.code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return this.description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Code:[%s], Description:[%s].", this.code,
|
||||
this.description);
|
||||
}
|
||||
}
|
6
paimonwrite/src/main/resources/plugin.json
Executable file
6
paimonwrite/src/main/resources/plugin.json
Executable file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "paimonwriter",
|
||||
"class": "com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriter",
|
||||
"description": "useScene: prod. mechanism: via FileSystem connect Paimon write data concurrent.",
|
||||
"developer": "alibaba"
|
||||
}
|
13
paimonwrite/src/main/resources/plugin_job_template.json
Normal file
13
paimonwrite/src/main/resources/plugin_job_template.json
Normal file
@ -0,0 +1,13 @@
|
||||
{
|
||||
"name": "paimonwriter",
|
||||
"parameter": {
|
||||
"defaultFS": "",
|
||||
"fileType": "",
|
||||
"path": "",
|
||||
"fileName": "",
|
||||
"column": [],
|
||||
"writeMode": "",
|
||||
"fieldDelimiter": "",
|
||||
"compress":""
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package com.alibaba.datax.plugin.writer.paimonwriter;
|
||||
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import org.junit.Test;
|
||||
|
||||
public class mysql2PaimonTest {
|
||||
|
||||
private static final String host = "localhost";
|
||||
|
||||
@Test
|
||||
public void case01() throws Throwable {
|
||||
|
||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql_to_paimon.json"};
|
||||
System.setProperty("datax.home", "../target/datax/datax");
|
||||
Engine.entry(params);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
99
paimonwrite/src/test/resources/mysql_to_paimon.json
Normal file
99
paimonwrite/src/test/resources/mysql_to_paimon.json
Normal file
@ -0,0 +1,99 @@
|
||||
{
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 2
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"column": [
|
||||
"id",
|
||||
"name",
|
||||
"age",
|
||||
"score",
|
||||
"create_at",
|
||||
"update_at",
|
||||
"dt"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
|
||||
],
|
||||
"table": [
|
||||
"user"
|
||||
]
|
||||
}
|
||||
],
|
||||
"password": "root1234",
|
||||
"username": "root",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "paimonwriter",
|
||||
"parameter": {
|
||||
"tableName": "test",
|
||||
"databaseName": "paimon",
|
||||
"catalogPath": "/app/hive/warehouse",
|
||||
"metastoreUri": "thrift://127.0.0.1:9083",
|
||||
"catalogType": "hive",
|
||||
"hiveConfDir": "/your/path",
|
||||
"hadoopConfDir": "/your/path",
|
||||
"tableBucket": 2,
|
||||
"primaryKey": "dt,id",
|
||||
"partitionFields": "dt",
|
||||
"writeOption": "stream_insert",
|
||||
"batchSize": 100,
|
||||
"hadoopConfig": {
|
||||
"hdfsUser": "hdfs",
|
||||
"coreSitePath": "/your/path/core-site.xml",
|
||||
"hdfsSitePath": "/your/path/hdfs-site.xml"
|
||||
},
|
||||
"paimonConfig": {
|
||||
"compaction.min.file-num": "3",
|
||||
"compaction.max.file-num": "6",
|
||||
"snapshot.time-retained": "2h",
|
||||
"snapshot.num-retained.min": "5",
|
||||
"hive.table.owner": "zhangsan",
|
||||
"hive.storage.format": "ORC"
|
||||
},
|
||||
"column": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "name",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "age",
|
||||
"type": "int"
|
||||
},
|
||||
{
|
||||
"name": "score",
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"name": "create_at",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "update_at",
|
||||
"type": "string"
|
||||
},{
|
||||
"name": "dt",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user