support PaimonWrite plugin to paimon #2082

This commit is contained in:
liuw529 2024-03-25 00:00:04 +08:00
parent 9626738ca8
commit 42b796c19d
11 changed files with 1273 additions and 0 deletions

176
paimonwrite/pom.xml Normal file
View File

@ -0,0 +1,176 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax</groupId>
<artifactId>paimonwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>paimonwriter</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hive.version>2.3.1</hive.version>
<hadoop.version>3.3.4</hadoop.version>
<paimon.version>0.8-SNAPSHOT</paimon.version>
<fb303.version>0.9.3</fb303.version>
<thrift.version>0.12.0</thrift.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
<version>0.7.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.3</version>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>${fb303.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
<exclusions>
<exclusion>
<groupId>org.glassfish.jersey</groupId>
<artifactId>jersey-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.10.34</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.15.v20190215</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-unstructured-storage-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,266 @@
# PaimonWriter 插件文档
___
## 1 快速介绍
PaimonWriter插件实现了向数据湖Paimon中写入数据在底层实现上通过调用paimon的batch write和stream write的相关方法来讲数据写入到paimon中
## 2 实现原理
通过读取paimon的文件catalog或者hive catalog的路径以及相关hadoop配置hive配置等信息来写入数据 元数据文件等信息到文件系统中
## 3 功能说明
### 3.1 配置样例
* 配置一个从mysql到paimon导入的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age",
"score",
"create_at",
"update_at",
"dt"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
],
"table": [
"user"
]
}
],
"password": "root1234",
"username": "root",
"where": ""
}
},
"writer": {
"name": "paimonwriter",
"parameter": {
"tableName": "test",
"databaseName": "paimon",
"catalogPath": "/app/hive/warehouse",
"metastoreUri": "thrift://127.0.0.1:9083",
"hiveConfDir": "/your/path",
"catalogType": "hive",
"hiveConfDir": "/your/path",
"hadoopConfDir": "/your/path",
"tableBucket": 2,
"primaryKey": "dt,id",
"partitionFields": "dt",
"writeOption": "stream_insert",
"batchSize": 100,
"hadoopConfig": {
"hdfsUser": "hdfs",
"coreSitePath": "/your/path/core-site.xml",
"hdfsSitePath": "/your/path/hdfs-site.xml"
},
"paimonConfig": {
"compaction.min.file-num": "3",
"compaction.max.file-num": "6",
"snapshot.time-retained": "2h",
"snapshot.num-retained.min": "5",
"hive.table.owner": "zhangsan",
"hive.storage.format": "ORC"
},
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "score",
"type": "double"
},
{
"name": "create_at",
"type": "string"
},
{
"name": "update_at",
"type": "string"
},{
"name": "dt",
"type": "string"
}
]
}
}
}
]
}
}
```
### 3.2 参数说明
* **metastoreUri**
* 描述需要配置hive的metastore地址:thrift://127.0.0.1:9083,注意:当设置了metastoreUri,则不需要设置hiveConfDir。 <br />
* 必选metastoreUri和hiveConfDir配置二选一 <br />
* 默认值:无 <br />
* **hiveConfDir**
* 描述如果没有设置hive的metastoreUri,则需要设置hiveConfDir路径注意路径中必须要包含hive-site.xml文件。 <br />
* 必选metastoreUri和hiveConfDir配置二选一 <br />
* 默认值:无 <br />
* **catalogPath**
* 描述catalogPath是paimon创建的catalog路径可以包含文件系统的和hdfs系统的路径。 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **catalogType**
* 描述paimon的catalog类型支持两种选项1.file,2.hive <br />
* 必选:是 <br />
* 默认值:无 <br />
* **hadoopConfDir**
* 描述paimon依赖的hadoop文件配置路径注意路径下面要包含两个文件:hdfs-site.xml,core-site.xml <br />
* 必选hadoopConfDir和hadoopConfig下的coreSitePath,hdfsSitePath配置二选一 <br />
* 默认值:无 <br />
* **writeOption**
* 描述paimon写入数据的方式目前支持2种方式1.batch_insert(按照官方的定义模式,每次只能有一次提交)2.stream_insert(支持多次提交) <br />
* 必选:是 <br />
* 默认值false <br />
* **hadoopConfig**
* 描述设置hadoop的配置参数可以以设置配置文件core-site.xml和hdfs-site.xml以及可配置kerberos和s3相关参数。<br />
* 必选:否 <br />
* 默认值:无 <br />
* **paimonConfig**
* 描述paimon的相关配置信息都可以加入。<br />
* 必选:否 <br />
* 默认值:无 <br />
* **keyspace**
* 描述需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:所配置的表中需要同步的列集合。<br />
内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。
* 必选:是 <br />
* 默认值:无 <br />
* **bucket**
* 描述paimon设置bucket大小注意如果设置为-1则会出现无法动态的写入分区错误<br />
* 必选:否 <br />
* 默认值2 <br />
* **batchSize**
* 描述:一次批量提交(BATCH)的记录条数注意次配置是配合在stream_insert模式下使用的其他模式无效<br />
* 必选:否 <br />
* 默认值10 <br />
### 3.3 类型转换
| DataX 内部类型| paimon 数据类型 |
| -------- | ----- |
| Long |long|
| float |float|
| float |float|
| decimal |decimal|
| String |string |
| Date |date, timestamp,datatime, string |
| Boolean |boolean |
请注意:
* 目前不支持union,row,struct类型和custom类型。
## 4 性能报告
## 5 约束限制
### 5.1 主备同步数据恢复问题
## 6 FAQ

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/paimonwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>paimonwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/paimonwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/paimonwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,59 @@
package com.alibaba.datax.plugin.writer.paimonwriter;
public class Key {
public static final String PAIMON_DB_NAME = "databaseName";
public static final String PAIMON_TABLE_NAME = "tableName";
public static final String PAIMON_PRIMARY_KEY = "primaryKey";
public static final String PAIMON_PARTITION_FIELDS = "partitionFields";
public static final String PAIMON_BATCH_SIZE = "batchSize";
public static final String PAIMON_COLUMN = "column";
/**
* writerOption
*/
public static final String PAIMON_WRITE_OPTION = "writeOption";
public static final String PAIMON_WRITE_OPTION_BATCH_INSERT = "batch_insert";
public static final String PAIMON_WRITE_OPTION_STREAM_INSERT = "stream_insert";
public static final String PAIMON_CATALOG_TYPE = "catalogType";
/**
* warehouse path
*/
public static final String PAIMON_CATALOG_PATH = "catalogPath";
public static final String PAIMON_TABLE_BUCKET = "tableBucket";
public static final String PAIMON_METASTORE_TYPE = "metastoreType";
/**
* thrift://<hive-metastore-host-name>:<port>
*/
public static final String PAIMON_METASTORE_URI = "metastoreUri";
public static final String PAIMON_CATALOG_FILE = "file";
public static final String PAIMON_CATALOG_HIVE = "hive";
public static final String PAIMON_HIVE_CONF_DIR = "hiveConfDir";
public static final String PAIMON_HADOOP_CONF_DIR = "hadoopConfDir";
// Kerberos
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
// hadoop config
public static final String HADOOP_CONFIG = "hadoopConfig";
//paimon config
public static final String PAIMON_CONFIG = "paimonConfig";
//S3
public static final String S3A_SSL = "fs.s3a.connection.ssl.enabled";
public static final String S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
public static final String S3A_USER_NAME = "fs.s3a.access.key";
public static final String S3A_USER_PWD = "fs.s3a.secret.key";
public static final String S3A_ENDPOINT = "fs.s3a.endpoint";
public static final String S3A_IMPL = "fs.s3a.impl";
}

View File

@ -0,0 +1,563 @@
package com.alibaba.datax.plugin.writer.paimonwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.*;
import org.apache.paimon.types.*;
import org.apache.paimon.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.*;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HADOOP_SECURITY_AUTHENTICATION_KEY;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HAVE_KERBEROS;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_BATCH_SIZE;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_FILE;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_HIVE;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_PATH;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CONFIG;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HADOOP_CONF_DIR;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HIVE_CONF_DIR;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_METASTORE_URI;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_PRIMARY_KEY;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_TABLE_BUCKET;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_BATCH_INSERT;
import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_STREAM_INSERT;
import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.*;
import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.PAIMON_PARAM_LOST;
public class PaimonWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(PaimonWriter.class);
public static class Job extends Writer.Job {
private Configuration originalConfig;
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> list = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
list.add(originalConfig.clone());
}
return list;
}
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private String primaryKey;
private String partitionFields;
private String writeOption;
private int batchSize;
private Configuration sliceConfig;
private List<Configuration> columnsList;
private String catalogPath;
private String catalogType;
private Catalog catalog;
private Table table;
private int bucket;
private String hiveConfDir;
private String hadoopConfDir;
private String metastoreUri;
private String coreSitePath;
private String hdfsSitePath;
private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
@Override
public void init() {
//获取与本task相关的配置
this.sliceConfig = super.getPluginJobConf();
String tableName = sliceConfig.getNecessaryValue(PAIMON_TABLE_NAME, PAIMON_ERROR_TABLE);
String dbName = sliceConfig.getNecessaryValue(PAIMON_DB_NAME, PAIMON_ERROR_DB);
catalogPath = sliceConfig.getNecessaryValue(PAIMON_CATALOG_PATH, PAIMON_PARAM_LOST);
catalogType = sliceConfig.getNecessaryValue(PAIMON_CATALOG_TYPE, PAIMON_PARAM_LOST);
bucket = sliceConfig.getInt(PAIMON_TABLE_BUCKET, 2);
batchSize = sliceConfig.getInt(PAIMON_BATCH_SIZE, 10);
writeOption = sliceConfig.getNecessaryValue(PAIMON_WRITE_OPTION, PAIMON_PARAM_LOST);
partitionFields = sliceConfig.getString(PAIMON_PARTITION_FIELDS);
primaryKey = sliceConfig.getString(PAIMON_PRIMARY_KEY);
columnsList = sliceConfig.getListConfiguration(PAIMON_COLUMN);
Configuration hadoopSiteParams = sliceConfig.getConfiguration(HADOOP_CONFIG);
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(HADOOP_CONFIG));
if (null != hadoopSiteParams) {
Set<String> paramKeys = hadoopSiteParams.getKeys();
for (String each : paramKeys) {
if(each.equals("hdfsUser")) {
System.setProperty("HADOOP_USER_NAME", hadoopSiteParamsAsJsonObject.getString(each));
} else if(each.equals("coreSitePath")) {
coreSitePath = hadoopSiteParamsAsJsonObject.getString(each);
} else if(each.equals("hdfsSitePath")) {
hdfsSitePath = hadoopSiteParamsAsJsonObject.getString(each);
} else {
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
}
try {
//是否有Kerberos认证
Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false);
if(haveKerberos){
String kerberosKeytabFilePath = sliceConfig.getString(KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = sliceConfig.getString(KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
switch (catalogType) {
case PAIMON_CATALOG_FILE :
catalog = createFilesystemCatalog();
break;
case PAIMON_CATALOG_HIVE :
metastoreUri = sliceConfig.getString(PAIMON_METASTORE_URI);
hiveConfDir = sliceConfig.getString(PAIMON_HIVE_CONF_DIR);
hadoopConfDir = sliceConfig.getString(PAIMON_HADOOP_CONF_DIR);
catalog = createHiveCatalog();
break;
default :
LOG.error("unsupported catalog type :{}", catalogType);
break;
}
if(!tableExists(catalog, dbName, tableName)) {
LOG.info("{} 表不存在,开始创建...", dbName.concat("." + tableName));
CreateTable(catalog, dbName, tableName, columnsList, primaryKey.split(","), partitionFields.split(","));
}
table = getTable(catalog, dbName, tableName);
} catch (Exception e) {
LOG.error(ExceptionUtils.getStackTrace(e));
}
}
@Override
public void prepare() {
}
@Override
public void startWrite(RecordReceiver recordReceiver) {
Record record;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
WriteBuilder writeBuilder = null;
//Write records in distributed tasks
TableWrite write = null;
Boolean isStream = false;
switch (writeOption) {
case PAIMON_WRITE_OPTION_BATCH_INSERT:
writeBuilder = table.newBatchWriteBuilder().withOverwrite();
write = writeBuilder.newWrite();
break;
case PAIMON_WRITE_OPTION_STREAM_INSERT:
writeBuilder = table.newStreamWriteBuilder();
write = writeBuilder.newWrite();
isStream = true;
break;
default:
LOG.error("unsupported write option type :{}", writeOption);
}
TableCommit commit = null;
List<CommitMessage> messages = null;
AtomicLong counter = new AtomicLong(0);
long num = 0;
long commitIdentifier = 0;
while ((record = recordReceiver.getFromReader()) != null) {
GenericRow row = new GenericRow(columnsList.size());
for (int i = 0; i < columnsList.size(); i++) {
Configuration configuration = columnsList.get(i);
String columnType = configuration.getString("type");
Column column = record.getColumn(i);
Object rawData = column.getRawData();
if (rawData == null) {
row.setField(i, null);
continue;
}
switch (columnType) {
case "int":
row.setField(i, Integer.parseInt(rawData.toString()));
break;
case "long":
row.setField(i, Long.parseLong(rawData.toString()));
break;
case "float":
row.setField(i, Float.parseFloat(rawData.toString()));
break;
case "double":
row.setField(i, Double.parseDouble(rawData.toString()));
break;
case "date":
row.setField(i, dateFormat.format(rawData));
break;
case "datetime":
row.setField(i, dateTimeFormat.format(rawData));
break;
case "boolean":
row.setField(i, Boolean.parseBoolean(rawData.toString()));
break;
case "string":
if(column instanceof DateColumn) {
row.setField(i, BinaryString.fromString(column.asString()));
break;
}
default:
row.setField(i, BinaryString.fromString(rawData.toString()));
}
}
try {
write.write(row, bucket);
if(isStream) {
num = counter.incrementAndGet();
commitIdentifier++;
if(num >= batchSize) {
List<CommitMessage> streamMsgs = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier);
// Collect all CommitMessages to a global node and commit
StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit();
stc.commit(commitIdentifier, streamMsgs);
counter.set(0L);
}
}
} catch (Exception e) {
LOG.error("write is failed!", e);
}
}
try {
flushCache(isStream, commitIdentifier, num, writeBuilder, write, messages, commit);
} catch (Exception e) {
//Abort unsuccessful commit to delete data files
if(null != commit) {
commit.abort(messages);
}
LOG.error("data commit is failed!", e);
}
}
public void flushCache(boolean isStream, long commitIdentifier, long num, WriteBuilder writeBuilder, TableWrite write, List<CommitMessage> messages, TableCommit commit) throws Exception {
if (isStream && num > 0) {
messages = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier);
// Collect all CommitMessages to a global node and commit
StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit();
stc.commit(commitIdentifier, messages);
} else {
messages = ((BatchTableWrite)write).prepareCommit();
//Collect all CommitMessages to a global node and commit
commit = writeBuilder.newCommit();
if(commit == null || messages == null) {
throw new RuntimeException("commit or messages info not exist");
}
((BatchTableCommit) commit).commit(messages);
}
}
//file system catalog
public Catalog createFilesystemCatalog() {
CatalogContext context = CatalogContext.create(new org.apache.paimon.fs.Path(catalogPath));
return CatalogFactory.createCatalog(context);
}
//hive catalog
public Catalog createHiveCatalog() {
// Paimon Hive catalog relies on Hive jars
// You should add hive classpath or hive bundled jar.
Options options = new Options();
CatalogContext context;
options.set("warehouse", catalogPath);
options.set("metastore", catalogType);
//默认设置为外部表
options.set("table.type", "external");
/**
* 1.如果metastore uri 存在则不需要设置 hiveConfDir
* 2.如果metastore uri 不存在读取 hiveConfDir下的hive-site.xml也可以
*/
if(StringUtils.isNotBlank(metastoreUri)) {
options.set("uri", metastoreUri);
} else if(StringUtils.isNotBlank(hiveConfDir)) {
options.set("hive-conf-dir", hiveConfDir);
} else {
throw DataXException.asDataXException(PAIMON_PARAM_LOST,
String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_METASTORE_URI, PAIMON_HIVE_CONF_DIR));
}
/**
* 1通过配置hadoop-conf-dir(目录中必须包含hive-site.xml,core-site.xml文件)来创建catalog
* 2通过配置hadoopConf(指定coreSitePath/path/core-site.xml,hdfsSitePath: /path/hdfs-site.xml)的方式来创建catalog
*/
if(StringUtils.isNotBlank(hadoopConfDir)) {
options.set("hadoop-conf-dir", hadoopConfDir);
context = CatalogContext.create(options);
}else if(StringUtils.isNotBlank(coreSitePath) && StringUtils.isNotBlank(hdfsSitePath)) {
context = CatalogContext.create(options, hadoopConf);
} else {
throw DataXException.asDataXException(PAIMON_PARAM_LOST,
String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_HADOOP_CONF_DIR, "hadoopConfig:coreSiteFile&&hdfsSiteFile"));
}
return CatalogFactory.createCatalog(context);
}
public void CreateTable(Catalog catalog, String dbName, String tableName, List<Configuration> cols, String[] pks, String[] partKeys) {
Configuration paimonTableParams = sliceConfig.getConfiguration(PAIMON_CONFIG);
JSONObject paimonParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(PAIMON_CONFIG));
Schema.Builder schemaBuilder = Schema.newBuilder();
if (null != paimonTableParams) {
Set<String> paramKeys = paimonTableParams.getKeys();
for (String each : paramKeys) {
schemaBuilder.option(each, paimonParamsAsJsonObject.getString(each));
}
}
for (Configuration columnConfig : cols) {
String columnName = columnConfig.getString("name");
DataType columnType = getPaimonDataType(columnConfig.getString("type"));
schemaBuilder.column(columnName, columnType, columnName);
}
if(pks != null && partKeys.length > 0) {
schemaBuilder.primaryKey(pks);
}
Schema schema = schemaBuilder.build();
if(partKeys != null && partKeys.length > 0) {
schemaBuilder.partitionKeys(partKeys);
schema = schemaBuilder.option("metastore.partitioned-table", "true").build();
}
Identifier identifier = Identifier.create(dbName, tableName);
try {
catalog.createTable(identifier, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
throw new RuntimeException("table not exist");
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException("database not exist");
}
}
public int getMatchValue(String typeName) {
//获取长度
String regex = "\\((\\d+)\\)";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(typeName);
int res = 0;
if (matcher.find()) {
res = Integer.parseInt(matcher.group(1));
} else {
LOG.error("{}:类型错误,请检查!", typeName);
}
return res;
}
public Pair<Integer, Integer> getDecValue (String typeName) {
String regex = "dd\\((\\d+), (\\d+)\\)";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(typeName.trim());
int left = 0;
int right = 0;
if (matcher.find()) {
left = Integer.parseInt(matcher.group(1));
right = Integer.parseInt(matcher.group(2));
} else {
LOG.error("{}:类型错误,请检查!", typeName);
}
return Pair.of(left, right);
}
public DataType getPaimonDataType(String typeName) {
String type = typeName.toUpperCase();
DataType dt = DataTypes.STRING();
if(type.equals("BINARY") && !type.contains("VARBINARY")) {
dt = type.contains("(") ? new BinaryType(getMatchValue(type.trim())) : new BinaryType();
} else if(type.contains("VARBINARY")) {
dt = type.contains("(") ? new VarBinaryType(getMatchValue(type.trim())): new VarBinaryType();
} else if(type.contains("STRING")) {
dt = VarCharType.STRING_TYPE;
} else if(type.contains("VARCHAR")) {
dt = type.contains("(") ? new VarCharType(getMatchValue(type.trim())): new VarCharType();
} else if(type.contains("CHAR")) {
if(type.contains("NOT NULL")) {
dt = new CharType().copy(false);
}else if (type.contains("(")) {
dt = new CharType(getMatchValue(type.trim()));
}else {
dt = new CharType();
}
} else if(type.contains("BOOLEAN")) {
dt = new BooleanType();
} else if(type.contains("BYTES")) {
dt = new VarBinaryType(VarBinaryType.MAX_LENGTH);
} else if(type.contains("DEC")) { // 包含 DEC DECIMAL
if(type.contains(",")) {
dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight());
}else if(type.contains("(")) {
dt = new DecimalType(getMatchValue(type.trim()));
}else {
dt = new DecimalType();
}
} else if(type.contains("NUMERIC") || type.contains("DECIMAL")) {
if(type.contains(",")) {
dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight());
}else if(type.contains("(")) {
dt = new DecimalType(getMatchValue(type.trim()));
}else {
dt = new DecimalType();
}
} else if(type.equals("INT")) {
dt = new IntType();
} else if(type.equals("BIGINT") || type.equals("LONG")) {
dt = new BigIntType();
} else if(type.equals("TINYINT")) {
dt = new TinyIntType();
} else if(type.equals("SMALLINT")) {
dt = new SmallIntType();
} else if(type.equals("INTEGER")) {
dt = new IntType();
} else if(type.contains("FLOAT")) {
dt = new FloatType();
} else if(type.contains("DOUBLE")) {
dt = new DoubleType();
} else if(type.contains("DATE")) {
dt = new DateType();
} else if(type.contains("TIME")) {
dt = type.contains("(") ? new TimeType(getMatchValue(type.trim())): new TimeType();
} else if(type.contains("TIMESTAMP")) {
switch (type) {
case "TIMESTAMP":
case "TIMESTAMP WITHOUT TIME ZONE":
dt = new TimestampType();
break;
case "TIMESTAMP(3)":
case "TIMESTAMP(3) WITHOUT TIME ZONE":
dt = new TimestampType(3);
break;
case "TIMESTAMP WITH LOCAL TIME ZONE":
case "TIMESTAMP_LTZ":
dt = new LocalZonedTimestampType();
break;
case "TIMESTAMP(3) WITH LOCAL TIME ZONE":
case "TIMESTAMP_LTZ(3)":
dt = new LocalZonedTimestampType(3);
break;
default:
LOG.error("{}:类型错误,请检查!", type);
}
} else {
throw new UnsupportedOperationException(
"Not a supported type: " + typeName);
}
return dt;
}
public Table getTable(Catalog catalog, String dbName, String tableName) {
try {
Identifier identifier = Identifier.create(dbName, tableName);
return catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException("table not exist", e);
}
}
public boolean tableExists(Catalog catalog, String dbName, String tableName) {
Identifier identifier = Identifier.create(dbName, tableName);
boolean exists = catalog.tableExists(identifier);
return exists;
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){
if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
LOG.error(message);
throw DataXException.asDataXException(KERBEROS_LOGIN_ERROR, e);
}
}
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}

View File

@ -0,0 +1,35 @@
package com.alibaba.datax.plugin.writer.paimonwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum PaimonWriterErrorCode implements ErrorCode {
PAIMON_ERROR_DB("Paimon Error DB", "您的参数配置错误."),
PAIMON_ERROR_TABLE("Paimon Error Table", "您的参数配置错误."),
PAIMON_PARAM_LOST("Paimon Param Lost", "您缺失了必须填写的参数值."),
HDFS_CONNECT_ERROR("Hdfs Connect Error", "与HDFS建立连接时出现IO异常."),
KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS认证失败");
private final String code;
private final String description;
PaimonWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}

View File

@ -0,0 +1,6 @@
{
"name": "paimonwriter",
"class": "com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriter",
"description": "useScene: prod. mechanism: via FileSystem connect Paimon write data concurrent.",
"developer": "alibaba"
}

View File

@ -0,0 +1,13 @@
{
"name": "paimonwriter",
"parameter": {
"defaultFS": "",
"fileType": "",
"path": "",
"fileName": "",
"column": [],
"writeMode": "",
"fieldDelimiter": "",
"compress":""
}
}

View File

@ -0,0 +1,20 @@
package com.alibaba.datax.plugin.writer.paimonwriter;
import com.alibaba.datax.core.Engine;
import org.junit.Test;
public class mysql2PaimonTest {
private static final String host = "localhost";
@Test
public void case01() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql_to_paimon.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
}

View File

@ -0,0 +1,99 @@
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age",
"score",
"create_at",
"update_at",
"dt"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
],
"table": [
"user"
]
}
],
"password": "root1234",
"username": "root",
"where": ""
}
},
"writer": {
"name": "paimonwriter",
"parameter": {
"tableName": "test",
"databaseName": "paimon",
"catalogPath": "/app/hive/warehouse",
"metastoreUri": "thrift://127.0.0.1:9083",
"catalogType": "hive",
"hiveConfDir": "/your/path",
"hadoopConfDir": "/your/path",
"tableBucket": 2,
"primaryKey": "dt,id",
"partitionFields": "dt",
"writeOption": "stream_insert",
"batchSize": 100,
"hadoopConfig": {
"hdfsUser": "hdfs",
"coreSitePath": "/your/path/core-site.xml",
"hdfsSitePath": "/your/path/hdfs-site.xml"
},
"paimonConfig": {
"compaction.min.file-num": "3",
"compaction.max.file-num": "6",
"snapshot.time-retained": "2h",
"snapshot.num-retained.min": "5",
"hive.table.owner": "zhangsan",
"hive.storage.format": "ORC"
},
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "score",
"type": "double"
},
{
"name": "create_at",
"type": "string"
},
{
"name": "update_at",
"type": "string"
},{
"name": "dt",
"type": "string"
}
]
}
}
}
]
}
}

View File

@ -132,6 +132,7 @@
<module>gaussdbreader</module>
<module>gaussdbwriter</module>
<module>datax-example</module>
<module>paimonwrite</module>
</modules>