diff --git a/paimonwrite/pom.xml b/paimonwrite/pom.xml new file mode 100644 index 00000000..75e7bf2d --- /dev/null +++ b/paimonwrite/pom.xml @@ -0,0 +1,176 @@ + + + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + com.alibaba.datax + paimonwriter + 0.0.1-SNAPSHOT + + paimonwriter + + + UTF-8 + 1.7 + 1.7 + 2.3.1 + 3.3.4 + 0.8-SNAPSHOT + 0.9.3 + 0.12.0 + + + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.paimon + paimon-bundle + 0.7.0-incubating + + + + org.apache.hive + hive-jdbc + 3.1.3 + + + hadoop-common + org.apache.hadoop + + + hadoop-hdfs + org.apache.hadoop + + + + + + org.apache.thrift + libfb303 + ${fb303.version} + + + + org.apache.thrift + libthrift + ${thrift.version} + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + com.google.guava + guava + 16.0.1 + + + org.apache.hadoop + hadoop-client + 3.3.4 + + + org.glassfish.jersey + jersey-client + + + + + + org.apache.hadoop + hadoop-aws + 3.3.4 + + + + com.amazonaws + aws-java-sdk + 1.10.34 + + + + com.google.guava + guava + 30.1.1-jre + + + + org.eclipse.jetty + jetty-servlet + 9.4.15.v20190215 + + + + com.alibaba.datax + plugin-unstructured-storage-util + ${datax-project-version} + + + + junit + junit + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/paimonwrite/src/doc/paimonwriter.md b/paimonwrite/src/doc/paimonwriter.md new file mode 100644 index 00000000..4d24d509 --- /dev/null +++ b/paimonwrite/src/doc/paimonwriter.md @@ -0,0 +1,266 @@ + +# PaimonWriter 插件文档 + + +___ + + + +## 1 快速介绍 + +PaimonWriter插件实现了向数据湖Paimon中写入数据,在底层实现上,通过调用paimon的batch write和stream write的相关方法来讲数据写入到paimon中 + +## 2 实现原理 + +通过读取paimon的文件catalog或者hive catalog的路径,以及相关hadoop配置,hive配置等信息来写入数据 元数据文件等信息到文件系统中 + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从mysql到paimon导入的作业: + +``` +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": [ + "id", + "name", + "age", + "score", + "create_at", + "update_at", + "dt" + ], + "connection": [ + { + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" + ], + "table": [ + "user" + ] + } + ], + "password": "root1234", + "username": "root", + "where": "" + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "tableName": "test", + "databaseName": "paimon", + "catalogPath": "/app/hive/warehouse", + "metastoreUri": "thrift://127.0.0.1:9083", + "hiveConfDir": "/your/path", + "catalogType": "hive", + "hiveConfDir": "/your/path", + "hadoopConfDir": "/your/path", + "tableBucket": 2, + "primaryKey": "dt,id", + "partitionFields": "dt", + "writeOption": "stream_insert", + "batchSize": 100, + "hadoopConfig": { + "hdfsUser": "hdfs", + "coreSitePath": "/your/path/core-site.xml", + "hdfsSitePath": "/your/path/hdfs-site.xml" + }, + "paimonConfig": { + "compaction.min.file-num": "3", + "compaction.max.file-num": "6", + "snapshot.time-retained": "2h", + "snapshot.num-retained.min": "5", + "hive.table.owner": "zhangsan", + "hive.storage.format": "ORC" + }, + "column": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + }, + { + "name": "score", + "type": "double" + }, + { + "name": "create_at", + "type": "string" + }, + { + "name": "update_at", + "type": "string" + },{ + "name": "dt", + "type": "string" + } + ] + } + } + } + ] + } +} + +``` + + +### 3.2 参数说明 + +* **metastoreUri** + + * 描述:需要配置hive的metastore地址:thrift://127.0.0.1:9083,注意:当设置了metastoreUri,则不需要设置hiveConfDir。
+ + * 必选:metastoreUri和hiveConfDir配置二选一
+ + * 默认值:无
+ +* **hiveConfDir** + + * 描述:如果没有设置hive的metastoreUri,则需要设置hiveConfDir路径,注意:路径中必须要包含hive-site.xml文件。
+ + * 必选:metastoreUri和hiveConfDir配置二选一
+ + * 默认值:无
+ +* **catalogPath** + + * 描述:catalogPath是paimon创建的catalog路径,可以包含文件系统的和hdfs系统的路径。
+ + * 必选:是
+ + * 默认值:无
+ +* **catalogType** + + * 描述:paimon的catalog类型,支持两种选项,1.file,2.hive
+ + * 必选:是
+ + * 默认值:无
+ +* **hadoopConfDir** + + * 描述:paimon依赖的hadoop文件配置路径,注意:路径下面要包含两个文件:hdfs-site.xml,core-site.xml
+ + * 必选:hadoopConfDir和hadoopConfig下的coreSitePath,hdfsSitePath配置二选一
+ + * 默认值:无
+ +* **writeOption** + + * 描述:paimon写入数据的方式,目前支持2种方式:1.batch_insert(按照官方的定义模式,每次只能有一次提交),2.stream_insert(支持多次提交)
+ + * 必选:是
+ + * 默认值:false
+ +* **hadoopConfig** + + * 描述:设置hadoop的配置参数,可以以设置配置文件core-site.xml和hdfs-site.xml以及可配置kerberos和s3相关参数。
+ + * 必选:否
+ + * 默认值:无
+ +* **paimonConfig** + + * 描述:paimon的相关配置信息都可以加入。
+ + * 必选:否
+ + * 默认值:无
+ +* **keyspace** + + * 描述:需要同步的表所在的keyspace。
+ + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:所选取的需要同步的表。
+ + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:所配置的表中需要同步的列集合。
+ 内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。 + + * 必选:是
+ + * 默认值:无
+ +* **bucket** + + * 描述:paimon设置bucket大小,注意如果设置为-1则会出现,无法动态的写入分区错误:
+ + * 必选:否
+ + * 默认值:2
+ +* **batchSize** + + * 描述:一次批量提交(BATCH)的记录条数,注意:次配置是配合在stream_insert模式下使用的,其他模式无效:
+ + * 必选:否
+ + * 默认值:10
+ + +### 3.3 类型转换 + +| DataX 内部类型| paimon 数据类型 | +| -------- | ----- | +| Long |long| +| float |float| +| float |float| +| decimal |decimal| +| String |string | +| Date |date, timestamp,datatime, string | +| Boolean |boolean | + + +请注意: + +* 目前不支持union,row,struct类型和custom类型。 + +## 4 性能报告 + +略 + +## 5 约束限制 + +### 5.1 主备同步数据恢复问题 + +略 + +## 6 FAQ + + + diff --git a/paimonwrite/src/main/assembly/package.xml b/paimonwrite/src/main/assembly/package.xml new file mode 100755 index 00000000..773e8bb0 --- /dev/null +++ b/paimonwrite/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/paimonwriter + + + target/ + + paimonwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/paimonwriter + + + + + + false + plugin/writer/paimonwriter/libs + runtime + + + diff --git a/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/Key.java b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/Key.java new file mode 100644 index 00000000..5dfd38a4 --- /dev/null +++ b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/Key.java @@ -0,0 +1,59 @@ +package com.alibaba.datax.plugin.writer.paimonwriter; + +public class Key { + + public static final String PAIMON_DB_NAME = "databaseName"; + public static final String PAIMON_TABLE_NAME = "tableName"; + public static final String PAIMON_PRIMARY_KEY = "primaryKey"; + public static final String PAIMON_PARTITION_FIELDS = "partitionFields"; + public static final String PAIMON_BATCH_SIZE = "batchSize"; + + public static final String PAIMON_COLUMN = "column"; + + /** + * writerOption + */ + public static final String PAIMON_WRITE_OPTION = "writeOption"; + public static final String PAIMON_WRITE_OPTION_BATCH_INSERT = "batch_insert"; + public static final String PAIMON_WRITE_OPTION_STREAM_INSERT = "stream_insert"; + + public static final String PAIMON_CATALOG_TYPE = "catalogType"; + /** + * warehouse path + */ + public static final String PAIMON_CATALOG_PATH = "catalogPath"; + public static final String PAIMON_TABLE_BUCKET = "tableBucket"; + public static final String PAIMON_METASTORE_TYPE = "metastoreType"; + /** + * thrift://: + */ + public static final String PAIMON_METASTORE_URI = "metastoreUri"; + + public static final String PAIMON_CATALOG_FILE = "file"; + public static final String PAIMON_CATALOG_HIVE = "hive"; + + public static final String PAIMON_HIVE_CONF_DIR = "hiveConfDir"; + public static final String PAIMON_HADOOP_CONF_DIR = "hadoopConfDir"; + + // Kerberos + public static final String HAVE_KERBEROS = "haveKerberos"; + public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath"; + public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal"; + + public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; + + // hadoop config + public static final String HADOOP_CONFIG = "hadoopConfig"; + + //paimon config + public static final String PAIMON_CONFIG = "paimonConfig"; + + //S3 + public static final String S3A_SSL = "fs.s3a.connection.ssl.enabled"; + public static final String S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + public static final String S3A_USER_NAME = "fs.s3a.access.key"; + public static final String S3A_USER_PWD = "fs.s3a.secret.key"; + public static final String S3A_ENDPOINT = "fs.s3a.endpoint"; + public static final String S3A_IMPL = "fs.s3a.impl"; + +} diff --git a/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriter.java b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriter.java new file mode 100644 index 00000000..aebfadd2 --- /dev/null +++ b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriter.java @@ -0,0 +1,563 @@ +package com.alibaba.datax.plugin.writer.paimonwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DateColumn; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.*; +import org.apache.paimon.types.*; +import org.apache.paimon.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.*; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HADOOP_SECURITY_AUTHENTICATION_KEY; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.HAVE_KERBEROS; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_BATCH_SIZE; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_FILE; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_HIVE; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CATALOG_PATH; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_CONFIG; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HADOOP_CONF_DIR; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_HIVE_CONF_DIR; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_METASTORE_URI; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_PRIMARY_KEY; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_TABLE_BUCKET; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_BATCH_INSERT; +import static com.alibaba.datax.plugin.writer.paimonwriter.Key.PAIMON_WRITE_OPTION_STREAM_INSERT; +import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.*; +import static com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriterErrorCode.PAIMON_PARAM_LOST; + +public class PaimonWriter extends Writer { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonWriter.class); + + public static class Job extends Writer.Job { + private Configuration originalConfig; + + @Override + public List split(int mandatoryNumber) { + List list = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { + list.add(originalConfig.clone()); + } + return list; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + + + public static class Task extends Writer.Task { + private String primaryKey; + private String partitionFields; + private String writeOption; + private int batchSize; + private Configuration sliceConfig; + private List columnsList; + + private String catalogPath; + private String catalogType; + private Catalog catalog; + private Table table; + private int bucket; + private String hiveConfDir; + private String hadoopConfDir; + private String metastoreUri; + private String coreSitePath; + private String hdfsSitePath; + private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + + @Override + public void init() { + //获取与本task相关的配置 + this.sliceConfig = super.getPluginJobConf(); + String tableName = sliceConfig.getNecessaryValue(PAIMON_TABLE_NAME, PAIMON_ERROR_TABLE); + String dbName = sliceConfig.getNecessaryValue(PAIMON_DB_NAME, PAIMON_ERROR_DB); + catalogPath = sliceConfig.getNecessaryValue(PAIMON_CATALOG_PATH, PAIMON_PARAM_LOST); + catalogType = sliceConfig.getNecessaryValue(PAIMON_CATALOG_TYPE, PAIMON_PARAM_LOST); + bucket = sliceConfig.getInt(PAIMON_TABLE_BUCKET, 2); + batchSize = sliceConfig.getInt(PAIMON_BATCH_SIZE, 10); + writeOption = sliceConfig.getNecessaryValue(PAIMON_WRITE_OPTION, PAIMON_PARAM_LOST); + + partitionFields = sliceConfig.getString(PAIMON_PARTITION_FIELDS); + primaryKey = sliceConfig.getString(PAIMON_PRIMARY_KEY); + columnsList = sliceConfig.getListConfiguration(PAIMON_COLUMN); + + Configuration hadoopSiteParams = sliceConfig.getConfiguration(HADOOP_CONFIG); + JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(HADOOP_CONFIG)); + if (null != hadoopSiteParams) { + Set paramKeys = hadoopSiteParams.getKeys(); + for (String each : paramKeys) { + if(each.equals("hdfsUser")) { + System.setProperty("HADOOP_USER_NAME", hadoopSiteParamsAsJsonObject.getString(each)); + } else if(each.equals("coreSitePath")) { + coreSitePath = hadoopSiteParamsAsJsonObject.getString(each); + } else if(each.equals("hdfsSitePath")) { + hdfsSitePath = hadoopSiteParamsAsJsonObject.getString(each); + } else { + hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); + } + } + } + + try { + //是否有Kerberos认证 + Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false); + if(haveKerberos){ + String kerberosKeytabFilePath = sliceConfig.getString(KERBEROS_KEYTAB_FILE_PATH); + String kerberosPrincipal = sliceConfig.getString(KERBEROS_PRINCIPAL); + hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); + this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf); + } + + switch (catalogType) { + case PAIMON_CATALOG_FILE : + catalog = createFilesystemCatalog(); + break; + case PAIMON_CATALOG_HIVE : + metastoreUri = sliceConfig.getString(PAIMON_METASTORE_URI); + hiveConfDir = sliceConfig.getString(PAIMON_HIVE_CONF_DIR); + hadoopConfDir = sliceConfig.getString(PAIMON_HADOOP_CONF_DIR); + catalog = createHiveCatalog(); + break; + default : + LOG.error("unsupported catalog type :{}", catalogType); + break; + } + + if(!tableExists(catalog, dbName, tableName)) { + LOG.info("{} 表不存在,开始创建...", dbName.concat("." + tableName)); + CreateTable(catalog, dbName, tableName, columnsList, primaryKey.split(","), partitionFields.split(",")); + } + + table = getTable(catalog, dbName, tableName); + + } catch (Exception e) { + LOG.error(ExceptionUtils.getStackTrace(e)); + } + } + + @Override + public void prepare() { + + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + Record record; + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + WriteBuilder writeBuilder = null; + //Write records in distributed tasks + TableWrite write = null; + Boolean isStream = false; + switch (writeOption) { + case PAIMON_WRITE_OPTION_BATCH_INSERT: + writeBuilder = table.newBatchWriteBuilder().withOverwrite(); + write = writeBuilder.newWrite(); + break; + case PAIMON_WRITE_OPTION_STREAM_INSERT: + writeBuilder = table.newStreamWriteBuilder(); + write = writeBuilder.newWrite(); + isStream = true; + break; + default: + LOG.error("unsupported write option type :{}", writeOption); + } + + TableCommit commit = null; + List messages = null; + AtomicLong counter = new AtomicLong(0); + long num = 0; + long commitIdentifier = 0; + + while ((record = recordReceiver.getFromReader()) != null) { + + GenericRow row = new GenericRow(columnsList.size()); + for (int i = 0; i < columnsList.size(); i++) { + Configuration configuration = columnsList.get(i); + String columnType = configuration.getString("type"); + Column column = record.getColumn(i); + Object rawData = column.getRawData(); + + if (rawData == null) { + row.setField(i, null); + continue; + } + + switch (columnType) { + case "int": + row.setField(i, Integer.parseInt(rawData.toString())); + break; + case "long": + row.setField(i, Long.parseLong(rawData.toString())); + break; + case "float": + row.setField(i, Float.parseFloat(rawData.toString())); + break; + case "double": + row.setField(i, Double.parseDouble(rawData.toString())); + break; + case "date": + row.setField(i, dateFormat.format(rawData)); + break; + case "datetime": + row.setField(i, dateTimeFormat.format(rawData)); + break; + case "boolean": + row.setField(i, Boolean.parseBoolean(rawData.toString())); + break; + case "string": + if(column instanceof DateColumn) { + row.setField(i, BinaryString.fromString(column.asString())); + break; + } + default: + row.setField(i, BinaryString.fromString(rawData.toString())); + } + + } + try { + write.write(row, bucket); + if(isStream) { + num = counter.incrementAndGet(); + commitIdentifier++; + if(num >= batchSize) { + List streamMsgs = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier); + // Collect all CommitMessages to a global node and commit + StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit(); + stc.commit(commitIdentifier, streamMsgs); + counter.set(0L); + } + } + + } catch (Exception e) { + LOG.error("write is failed!", e); + } + + } + + try { + flushCache(isStream, commitIdentifier, num, writeBuilder, write, messages, commit); + } catch (Exception e) { + //Abort unsuccessful commit to delete data files + if(null != commit) { + commit.abort(messages); + } + LOG.error("data commit is failed!", e); + } + + } + + public void flushCache(boolean isStream, long commitIdentifier, long num, WriteBuilder writeBuilder, TableWrite write, List messages, TableCommit commit) throws Exception { + + if (isStream && num > 0) { + messages = ((StreamTableWrite) write).prepareCommit(false, commitIdentifier); + // Collect all CommitMessages to a global node and commit + StreamTableCommit stc = (StreamTableCommit)writeBuilder.newCommit(); + stc.commit(commitIdentifier, messages); + } else { + messages = ((BatchTableWrite)write).prepareCommit(); + //Collect all CommitMessages to a global node and commit + commit = writeBuilder.newCommit(); + + if(commit == null || messages == null) { + throw new RuntimeException("commit or messages info not exist"); + } + ((BatchTableCommit) commit).commit(messages); + } + + } + + //file system catalog + public Catalog createFilesystemCatalog() { + CatalogContext context = CatalogContext.create(new org.apache.paimon.fs.Path(catalogPath)); + return CatalogFactory.createCatalog(context); + } + + //hive catalog + public Catalog createHiveCatalog() { + // Paimon Hive catalog relies on Hive jars + // You should add hive classpath or hive bundled jar. + Options options = new Options(); + CatalogContext context; + options.set("warehouse", catalogPath); + options.set("metastore", catalogType); + //默认设置为外部表 + options.set("table.type", "external"); + + /** + * 1.如果metastore uri 存在,则不需要设置 hiveConfDir + * 2.如果metastore uri 不存在,读取 hiveConfDir下的hive-site.xml也可以 + */ + if(StringUtils.isNotBlank(metastoreUri)) { + options.set("uri", metastoreUri); + } else if(StringUtils.isNotBlank(hiveConfDir)) { + options.set("hive-conf-dir", hiveConfDir); + } else { + throw DataXException.asDataXException(PAIMON_PARAM_LOST, + String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_METASTORE_URI, PAIMON_HIVE_CONF_DIR)); + } + + /** + * 1:通过配置hadoop-conf-dir(目录中必须包含hive-site.xml,core-site.xml文件)来创建catalog + * 2:通过配置hadoopConf(指定:coreSitePath:/path/core-site.xml,hdfsSitePath: /path/hdfs-site.xml)的方式来创建catalog + */ + if(StringUtils.isNotBlank(hadoopConfDir)) { + options.set("hadoop-conf-dir", hadoopConfDir); + context = CatalogContext.create(options); + }else if(StringUtils.isNotBlank(coreSitePath) && StringUtils.isNotBlank(hdfsSitePath)) { + context = CatalogContext.create(options, hadoopConf); + } else { + throw DataXException.asDataXException(PAIMON_PARAM_LOST, + String.format("您提供配置文件有误,[%s]和[%s]参数,至少需要配置一个,不允许为空或者留白 .", PAIMON_HADOOP_CONF_DIR, "hadoopConfig:coreSiteFile&&hdfsSiteFile")); + } + + return CatalogFactory.createCatalog(context); + + } + + public void CreateTable(Catalog catalog, String dbName, String tableName, List cols, String[] pks, String[] partKeys) { + + Configuration paimonTableParams = sliceConfig.getConfiguration(PAIMON_CONFIG); + JSONObject paimonParamsAsJsonObject = JSON.parseObject(sliceConfig.getString(PAIMON_CONFIG)); + + Schema.Builder schemaBuilder = Schema.newBuilder(); + + if (null != paimonTableParams) { + Set paramKeys = paimonTableParams.getKeys(); + for (String each : paramKeys) { + schemaBuilder.option(each, paimonParamsAsJsonObject.getString(each)); + } + } + + for (Configuration columnConfig : cols) { + String columnName = columnConfig.getString("name"); + DataType columnType = getPaimonDataType(columnConfig.getString("type")); + schemaBuilder.column(columnName, columnType, columnName); + } + + if(pks != null && partKeys.length > 0) { + schemaBuilder.primaryKey(pks); + } + + Schema schema = schemaBuilder.build(); + + if(partKeys != null && partKeys.length > 0) { + schemaBuilder.partitionKeys(partKeys); + schema = schemaBuilder.option("metastore.partitioned-table", "true").build(); + } + + Identifier identifier = Identifier.create(dbName, tableName); + try { + catalog.createTable(identifier, schema, false); + } catch (Catalog.TableAlreadyExistException e) { + throw new RuntimeException("table not exist"); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException("database not exist"); + } + + } + + public int getMatchValue(String typeName) { + + //获取长度 + String regex = "\\((\\d+)\\)"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(typeName); + int res = 0; + + if (matcher.find()) { + res = Integer.parseInt(matcher.group(1)); + } else { + LOG.error("{}:类型错误,请检查!", typeName); + } + return res; + } + + public Pair getDecValue (String typeName) { + + String regex = "dd\\((\\d+), (\\d+)\\)"; + + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(typeName.trim()); + int left = 0; + int right = 0; + + if (matcher.find()) { + left = Integer.parseInt(matcher.group(1)); + right = Integer.parseInt(matcher.group(2)); + } else { + LOG.error("{}:类型错误,请检查!", typeName); + } + + return Pair.of(left, right); + + } + + public DataType getPaimonDataType(String typeName) { + + String type = typeName.toUpperCase(); + DataType dt = DataTypes.STRING(); + + if(type.equals("BINARY") && !type.contains("VARBINARY")) { + dt = type.contains("(") ? new BinaryType(getMatchValue(type.trim())) : new BinaryType(); + } else if(type.contains("VARBINARY")) { + dt = type.contains("(") ? new VarBinaryType(getMatchValue(type.trim())): new VarBinaryType(); + } else if(type.contains("STRING")) { + dt = VarCharType.STRING_TYPE; + } else if(type.contains("VARCHAR")) { + dt = type.contains("(") ? new VarCharType(getMatchValue(type.trim())): new VarCharType(); + } else if(type.contains("CHAR")) { + if(type.contains("NOT NULL")) { + dt = new CharType().copy(false); + }else if (type.contains("(")) { + dt = new CharType(getMatchValue(type.trim())); + }else { + dt = new CharType(); + } + } else if(type.contains("BOOLEAN")) { + dt = new BooleanType(); + } else if(type.contains("BYTES")) { + dt = new VarBinaryType(VarBinaryType.MAX_LENGTH); + } else if(type.contains("DEC")) { // 包含 DEC 和 DECIMAL + if(type.contains(",")) { + dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight()); + }else if(type.contains("(")) { + dt = new DecimalType(getMatchValue(type.trim())); + }else { + dt = new DecimalType(); + } + } else if(type.contains("NUMERIC") || type.contains("DECIMAL")) { + if(type.contains(",")) { + dt = new DecimalType(getDecValue(type).getLeft(), getDecValue(type).getRight()); + }else if(type.contains("(")) { + dt = new DecimalType(getMatchValue(type.trim())); + }else { + dt = new DecimalType(); + } + } else if(type.equals("INT")) { + dt = new IntType(); + } else if(type.equals("BIGINT") || type.equals("LONG")) { + dt = new BigIntType(); + } else if(type.equals("TINYINT")) { + dt = new TinyIntType(); + } else if(type.equals("SMALLINT")) { + dt = new SmallIntType(); + } else if(type.equals("INTEGER")) { + dt = new IntType(); + } else if(type.contains("FLOAT")) { + dt = new FloatType(); + } else if(type.contains("DOUBLE")) { + dt = new DoubleType(); + } else if(type.contains("DATE")) { + dt = new DateType(); + } else if(type.contains("TIME")) { + dt = type.contains("(") ? new TimeType(getMatchValue(type.trim())): new TimeType(); + } else if(type.contains("TIMESTAMP")) { + switch (type) { + case "TIMESTAMP": + case "TIMESTAMP WITHOUT TIME ZONE": + dt = new TimestampType(); + break; + case "TIMESTAMP(3)": + case "TIMESTAMP(3) WITHOUT TIME ZONE": + dt = new TimestampType(3); + break; + case "TIMESTAMP WITH LOCAL TIME ZONE": + case "TIMESTAMP_LTZ": + dt = new LocalZonedTimestampType(); + break; + case "TIMESTAMP(3) WITH LOCAL TIME ZONE": + case "TIMESTAMP_LTZ(3)": + dt = new LocalZonedTimestampType(3); + break; + default: + LOG.error("{}:类型错误,请检查!", type); + } + } else { + throw new UnsupportedOperationException( + "Not a supported type: " + typeName); + } + + return dt; + + } + + public Table getTable(Catalog catalog, String dbName, String tableName) { + try { + Identifier identifier = Identifier.create(dbName, tableName); + return catalog.getTable(identifier); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException("table not exist", e); + } + } + + public boolean tableExists(Catalog catalog, String dbName, String tableName) { + Identifier identifier = Identifier.create(dbName, tableName); + boolean exists = catalog.tableExists(identifier); + return exists; + } + + private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){ + if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){ + UserGroupInformation.setConfiguration(hadoopConf); + try { + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); + } catch (Exception e) { + String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确", + kerberosKeytabFilePath, kerberosPrincipal); + LOG.error(message); + throw DataXException.asDataXException(KERBEROS_LOGIN_ERROR, e); + } + } + } + + @Override + public void post() { + + } + + @Override + public void destroy() { + + } + } + +} diff --git a/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriterErrorCode.java b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriterErrorCode.java new file mode 100644 index 00000000..1664b312 --- /dev/null +++ b/paimonwrite/src/main/java/com/alibaba/datax/plugin/writer/paimonwriter/PaimonWriterErrorCode.java @@ -0,0 +1,35 @@ +package com.alibaba.datax.plugin.writer.paimonwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum PaimonWriterErrorCode implements ErrorCode { + PAIMON_ERROR_DB("Paimon Error DB", "您的参数配置错误."), + PAIMON_ERROR_TABLE("Paimon Error Table", "您的参数配置错误."), + PAIMON_PARAM_LOST("Paimon Param Lost", "您缺失了必须填写的参数值."), + HDFS_CONNECT_ERROR("Hdfs Connect Error", "与HDFS建立连接时出现IO异常."), + KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS认证失败"); + + private final String code; + private final String description; + + PaimonWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, + this.description); + } +} diff --git a/paimonwrite/src/main/resources/plugin.json b/paimonwrite/src/main/resources/plugin.json new file mode 100755 index 00000000..16374839 --- /dev/null +++ b/paimonwrite/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "paimonwriter", + "class": "com.alibaba.datax.plugin.writer.paimonwriter.PaimonWriter", + "description": "useScene: prod. mechanism: via FileSystem connect Paimon write data concurrent.", + "developer": "alibaba" +} diff --git a/paimonwrite/src/main/resources/plugin_job_template.json b/paimonwrite/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..983d0aa6 --- /dev/null +++ b/paimonwrite/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "paimonwriter", + "parameter": { + "defaultFS": "", + "fileType": "", + "path": "", + "fileName": "", + "column": [], + "writeMode": "", + "fieldDelimiter": "", + "compress":"" + } +} diff --git a/paimonwrite/src/test/java/com/alibaba/datax/plugin/writer/paimonwriter/mysql2PaimonTest.java b/paimonwrite/src/test/java/com/alibaba/datax/plugin/writer/paimonwriter/mysql2PaimonTest.java new file mode 100644 index 00000000..84141681 --- /dev/null +++ b/paimonwrite/src/test/java/com/alibaba/datax/plugin/writer/paimonwriter/mysql2PaimonTest.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.paimonwriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Test; + +public class mysql2PaimonTest { + + private static final String host = "localhost"; + + @Test + public void case01() throws Throwable { + + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql_to_paimon.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + + } + + +} diff --git a/paimonwrite/src/test/resources/mysql_to_paimon.json b/paimonwrite/src/test/resources/mysql_to_paimon.json new file mode 100644 index 00000000..7e173bf4 --- /dev/null +++ b/paimonwrite/src/test/resources/mysql_to_paimon.json @@ -0,0 +1,99 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": [ + "id", + "name", + "age", + "score", + "create_at", + "update_at", + "dt" + ], + "connection": [ + { + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" + ], + "table": [ + "user" + ] + } + ], + "password": "root1234", + "username": "root", + "where": "" + } + }, + "writer": { + "name": "paimonwriter", + "parameter": { + "tableName": "test", + "databaseName": "paimon", + "catalogPath": "/app/hive/warehouse", + "metastoreUri": "thrift://127.0.0.1:9083", + "catalogType": "hive", + "hiveConfDir": "/your/path", + "hadoopConfDir": "/your/path", + "tableBucket": 2, + "primaryKey": "dt,id", + "partitionFields": "dt", + "writeOption": "stream_insert", + "batchSize": 100, + "hadoopConfig": { + "hdfsUser": "hdfs", + "coreSitePath": "/your/path/core-site.xml", + "hdfsSitePath": "/your/path/hdfs-site.xml" + }, + "paimonConfig": { + "compaction.min.file-num": "3", + "compaction.max.file-num": "6", + "snapshot.time-retained": "2h", + "snapshot.num-retained.min": "5", + "hive.table.owner": "zhangsan", + "hive.storage.format": "ORC" + }, + "column": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + }, + { + "name": "score", + "type": "double" + }, + { + "name": "create_at", + "type": "string" + }, + { + "name": "update_at", + "type": "string" + },{ + "name": "dt", + "type": "string" + } + ] + } + } + } + ] + } +} diff --git a/pom.xml b/pom.xml index 1b364a75..c4e06842 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,7 @@ gaussdbreader gaussdbwriter datax-example + paimonwrite