Merge pull request #2269 from ziming-ai/add-milvus-writer

Add milvus writer Base on nianliuu:master
This commit is contained in:
dingxiaobo 2025-02-08 14:59:30 +08:00 committed by GitHub
commit 7940b4fb74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1159 additions and 0 deletions

View File

@ -0,0 +1,273 @@
# DataX milvuswriter
---
## 1 快速介绍
milvuswriter 插件实现了写入数据到 milvus集合的功能; 面向ETL开发工程师使用 milvuswriter 从数仓导入数据到 milvus, 同时 milvuswriter 亦可以作为数据迁移工具为DBA等用户提供服务。
## 2 实现原理
milvuswriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 `upsert/insert `方式写入数据到milvus, 并通过batchSize累积的方式进行数据提交。
<br />
注意upsert写入方式推荐: 在非autid表场景下根据主键更新 Collection 中的某个 Entityautid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
insert写入方式: 多用于autid表插入数据milvus自动生成主键 非autoid表下使用insert会导致数据重复。
## 3 功能说明
### 3.1 配置样例
* 这里提供一份从内存产生数据导入到 milvus的配置样例。
```json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": 1,
"type": "long"
},
{
"value": "[1.1,1.2,1.3]",
"type": "string"
},
{
"value": 100,
"type": "long"
},
{
"value": 200,
"type": "long"
},
{
"value": 300,
"type": "long"
},
{
"value": 3.14159,
"type": "double"
},
{
"value": 3.1415926,
"type": "double"
},
{
"value": "testvarcharvalue",
"type": "string"
},
{
"value": true,
"type": "bool"
},
{
"value": "[1.123,1.2456,1.3789]",
"type": "string"
},
{
"value": "[2.123,2.2456,2.3789]",
"type": "string"
},
{
"value": "12345678",
"type": "string"
},
{
"value": "{\"a\":1,\"b\":2,\"c\":3}",
"type": "string"
},
{
"value": "[1,2,3,4]",
"type": "string"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"parameter": {
"schemaCreateMode": "createIfNotExist",
"connectTimeoutMs": 60000,
"writeMode": "upsert",
"collection": "demo01",
"type": "milvus",
"token": "xxxxxxx",
"endpoint": "https://xxxxxxxx.com:443",
"batchSize": 1024,
"column": [
{
"name": "id",
"type": "Int64",
"primaryKey": "true"
},
{
"name": "floatvector",
"type": "FloatVector",
"dimension": "3"
},
{
"name": "int8col",
"type": "Int8"
},
{
"name": "int16col",
"type": "Int16"
},
{
"name": "int32col",
"type": "Int32"
},
{
"name": "floatcol",
"type": "Float"
},
{
"name": "doublecol",
"type": "Double"
},
{
"name": "varcharcol",
"type": "VarChar"
},
{
"name": "boolcol",
"type": "Bool"
},
{
"name": "bfloat16vectorcol",
"type": "BFloat16Vector",
"dimension": "3"
},
{
"name": "float16vectorcol",
"type": "Float16Vector",
"dimension": "3"
},
{
"name": "binaryvectorcol",
"type": "BinaryVector",
"dimension": "64"
},
{
"name": "jsoncol",
"type": "JSON"
},
{
"name": "intarraycol",
"maxCapacity": "8",
"type": "Array",
"elementType": "Int32"
}
]
},
"name": "milvuswriter"
}
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"channel": 2
}
}
}
}
```
### 3.2 参数说明
* **endpoint**
* 描述milvus数据库的连接信息包含地址和端口例如https://xxxxxxxx.com:443
注意1、在一个数据库上只能配置一个 endpoint 值
2、一个milvus 写入任务仅能配置一个 endpoint
* 必选:是 <br />
* 默认值:无 <br />
* *schemaCreateMode*
* 描述: 集合创建的模式同步时milvus集合不存在的处理方式 根据配置的column属性进行创建
* 取值
* createIfNotExist 如果集合不存在,则创建集合,如果集合存在,则不执行任何操作
* ignore 如果集合不存在,任务异常报错,如果集合存在,则不执行任何操作
* recreate 如果集合不存在,则创建集合,如果集合存在,则删除集合重建集合
* 必选:否 <br />
* 默认值createIfNotExist <br />
* **connectTimeoutMs**
* 描述与milvus交互是客户端的连接超时时间单位毫秒 <br />
* 必选:否 <br />
* 默认值10000 <br />
* **token**
* 描述milvus实例认证的token秘钥与username认证方式二选一配置 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **username**
* 描述目的milvus数据库的用户名 与token二选一配置 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **password**
* 描述目的milvus数据库的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
* *writeMode*
* 描述: 写入milvus集合的写入方式
* 取值
* upsert推荐: 在非autid表场景下根据主键更新 Collection 中的某个 Entityautid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
* insert: 多用于autid表插入数据milvus自动生成主键 非autoid表下使用insert会导致数据重复。
* 必选:是 <br />
* 默认值upsert <br />
* **collection**
* 描述:目的集合名称。 只能配置一个milvus的集合名称。
* 必选:是 <br />
* 默认值:无 <br />
* **batchSize**
* 描述一次性批量提交的记录数大小该值可以极大减少DataX与milvus的网络交互次数并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。<br />
* 必选:否 <br />
* 默认值1024<br />
* **column**
* 描述目的集合需要写入数据的字段字段内容用json格式描述字段之间用英文逗号分隔。字段属性必填name、type, 其他属性在需要schemaCreateMode创建集合按需填入例如:
"column": [
{
"name": "id",
"type": "Int64",
"primaryKey": "true"
},
{
"name": "floatvector",
"type": "FloatVector",
"dimension": "3"
}]
* 必选:是 <br />
* 默认值:否 <br />
### 3.3 支持同步milvus字段类型
Bool,
Int8,
Int16,
Int32,
Int64,
Float,
Double,
String,
VarChar,
Array,
JSON,
BinaryVector,
FloatVector,
Float16Vector,
BFloat16Vector,
SparseFloatVector

125
milvuswriter/pom.xml Normal file
View File

@ -0,0 +1,125 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>milvuswriter</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<dependency>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<!--将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,36 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>milvuswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/milvuswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/milvuswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,17 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
public class KeyConstant {
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String ENDPOINT = "endpoint";
public static final String TOKEN = "token";
public static final String DATABASE = "database";
public static final String COLLECTION = "collection";
public static final String BATCH_SIZE = "batchSize";
public static final String COLUMN = "column";
public static final String SCHAME_CREATE_MODE = "schemaCreateMode";
public static final String WRITE_MODE = "writeMode";
public static final String PARTITION = "partition";
public static final String CONNECT_TIMEOUT_MS = "connectTimeoutMs";
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
}

View File

@ -0,0 +1,166 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.plugin.writer.milvuswriter.enums.WriteModeEnum;
import com.alibaba.fastjson2.JSONArray;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.vector.request.data.BFloat16Vec;
import io.milvus.v2.service.vector.request.data.Float16Vec;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
@Slf4j
public class MilvusBufferWriter {
private final MilvusClient milvusClient;
private final String collection;
private final Integer batchSize;
private List<JsonObject> dataCache;
private List<MilvusColumn> milvusColumnMeta;
private WriteModeEnum writeMode;
private String partition;
public MilvusBufferWriter(MilvusClient milvusClient, Configuration writerSliceConfig) {
this.milvusClient = milvusClient;
this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
this.dataCache = new ArrayList<>(batchSize);
this.milvusColumnMeta = JSON.parseObject(writerSliceConfig.getString(KeyConstant.COLUMN), new TypeReference<List<MilvusColumn>>() {
});
this.writeMode = WriteModeEnum.getEnum(writerSliceConfig.getString(KeyConstant.WRITE_MODE));
this.partition = writerSliceConfig.getString(KeyConstant.PARTITION);
}
public void add(Record record, TaskPluginCollector taskPluginCollector) {
try {
JsonObject data = this.convertByType(milvusColumnMeta, record);
dataCache.add(data);
} catch (Exception e) {
taskPluginCollector.collectDirtyRecord(record, String.format("parse record error errorMessage: %s", e.getMessage()));
}
}
public Boolean needCommit() {
return dataCache.size() >= batchSize;
}
public void commit() {
if (dataCache.isEmpty()) {
log.info("dataCache is empty, skip commit");
return;
}
if (writeMode == WriteModeEnum.INSERT) {
milvusClient.insert(collection, partition, dataCache);
} else {
milvusClient.upsert(collection, partition, dataCache);
}
dataCache = new ArrayList<>(batchSize);
}
public int getDataCacheSize() {
return dataCache.size();
}
private JsonObject convertByType(List<MilvusColumn> milvusColumnMeta, Record record) {
JsonObject data = new JsonObject();
Gson gson = new Gson();
for (int i = 0; i < record.getColumnNumber(); i++) {
MilvusColumn milvusColumn = milvusColumnMeta.get(i);
DataType fieldType = milvusColumn.getMilvusTypeEnum();
String fieldName = milvusColumn.getName();
Column column = record.getColumn(i);
try {
Object field = convertToMilvusField(fieldType, column, milvusColumn);
data.add(fieldName, gson.toJsonTree(field));
} catch (Exception e) {
log.error("parse error for column: {} errorMessage: {}", fieldName, e.getMessage(), e);
throw e;
}
}
return data;
}
//值需要跟这里匹配io.milvus.param.ParamUtils#checkFieldData(io.milvus.param.collection.FieldType, java.util.List<?>, boolean)
private Object convertToMilvusField(DataType type, Column column, MilvusColumn milvusColumn) {
if (column.getRawData() == null) {
return null;
}
switch (type) {
case Int8:
case Int16:
case Int32:
case Int64:
return column.asLong();
case Float:
case Double:
return column.asDouble();
case String:
case VarChar:
return column.asString();
case Bool:
return column.asBoolean();
case BFloat16Vector:
JSONArray bFloat16ArrayJson = JSON.parseArray(column.asString());
List<Float> bfloat16Vector = new ArrayList<>();
for (int i = 0; i < bFloat16ArrayJson.size(); i++) {
Float value = Float.parseFloat(bFloat16ArrayJson.getString(i));
bfloat16Vector.add(value);
}
BFloat16Vec bFloat16Vec = new BFloat16Vec(bfloat16Vector);
ByteBuffer byteBuffer = (ByteBuffer) bFloat16Vec.getData();
return byteBuffer.array();
case Float16Vector:
JSONArray float16ArrayJson = JSON.parseArray(column.asString());
List<Float> float16Vector = new ArrayList<>();
for (int i = 0; i < float16ArrayJson.size(); i++) {
Float floatValue = Float.parseFloat(float16ArrayJson.getString(i));
float16Vector.add(floatValue);
}
Float16Vec float16Vec = new Float16Vec(float16Vector);
ByteBuffer data = (ByteBuffer) float16Vec.getData();
return data.array();
case BinaryVector:
return column.asBytes();
case FloatVector:
JSONArray arrayJson = JSON.parseArray(column.asString());
return arrayJson.stream().map(item -> Float.parseFloat(String.valueOf(item))).collect(Collectors.toList());
case SparseFloatVector:
//[3:0.5, 24:0.8, 76:0.2]
try {
JSONArray sparseFloatArray = JSON.parseArray(column.asString());
TreeMap<Long, Float> mapValue = new TreeMap<>();
for (int i = 0; i < sparseFloatArray.size(); i++) {
String value = sparseFloatArray.getString(i);
String[] split = value.split(":");
Long key = Long.parseLong(split[0]);
Float val = Float.parseFloat(split[1]);
mapValue.put(key, val);
}
return mapValue;
} catch (Exception e) {
log.error("parse column[{}] SparseFloatVector value error, value should like [3:0.5, 24:0.8, 76:0.2], but get:{}", milvusColumn.getName(), column.asString());
throw e;
}
case JSON:
return column.asString();
case Array:
JSONArray parseArray = JSON.parseArray(column.asString());
return parseArray.stream().map(item -> String.valueOf(item)).collect(Collectors.toList());
default:
throw new RuntimeException(String.format("Unsupported data type[%s]", type));
}
}
}

View File

@ -0,0 +1,95 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import java.util.List;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.JsonObject;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DropCollectionReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
import io.milvus.v2.service.partition.request.CreatePartitionReq;
import io.milvus.v2.service.partition.request.HasPartitionReq;
import io.milvus.v2.service.vector.request.InsertReq;
import io.milvus.v2.service.vector.request.UpsertReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* @author ziming(子茗)
* @date 12/27/24
* @description
*/
@Slf4j
public class MilvusClient {
private MilvusClientV2 milvusClientV2;
public MilvusClient(Configuration conf) {
// connect to milvus
ConnectConfig connectConfig = ConnectConfig.builder().uri(conf.getString(KeyConstant.ENDPOINT)).build();
String token = null;
if (conf.getString(KeyConstant.TOKEN) != null) {
token = conf.getString(KeyConstant.TOKEN);
} else {
token = conf.getString(KeyConstant.USERNAME) + ":" + conf.getString(KeyConstant.PASSWORD);
}
connectConfig.setToken(token);
String database = conf.getString(KeyConstant.DATABASE);
if (StringUtils.isNotBlank(database)) {
log.info("use database {}", database);
connectConfig.setDbName(conf.getString(KeyConstant.DATABASE));
}
Integer connectTimeOut = conf.getInt(KeyConstant.CONNECT_TIMEOUT_MS);
if (connectTimeOut != null) {
connectConfig.setConnectTimeoutMs(connectTimeOut);
}
this.milvusClientV2 = new MilvusClientV2(connectConfig);
}
public void upsert(String collection, String partition, List<JsonObject> data) {
UpsertReq upsertReq = UpsertReq.builder().collectionName(collection).data(data).build();
if (StringUtils.isNotEmpty(partition)) {
upsertReq.setPartitionName(partition);
}
milvusClientV2.upsert(upsertReq);
}
public void insert(String collection, String partition, List<JsonObject> data) {
InsertReq insertReq = InsertReq.builder().collectionName(collection).data(data).build();
if (StringUtils.isNotEmpty(partition)) {
insertReq.setPartitionName(partition);
}
milvusClientV2.insert(insertReq);
}
public Boolean hasCollection(String collection) {
HasCollectionReq build = HasCollectionReq.builder().collectionName(collection).build();
return milvusClientV2.hasCollection(build);
}
public void createCollection(String collection, CreateCollectionReq.CollectionSchema schema) {
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder().collectionName(collection).collectionSchema(schema).build();
milvusClientV2.createCollection(createCollectionReq);
}
public void dropCollection(String collection) {
DropCollectionReq request = DropCollectionReq.builder().collectionName(collection).build();
milvusClientV2.dropCollection(request);
}
public Boolean hasPartition(String collection, String partition) {
HasPartitionReq hasPartitionReq = HasPartitionReq.builder().collectionName(collection).partitionName(partition).build();
return milvusClientV2.hasPartition(hasPartitionReq);
}
public void createPartition(String collectionName, String partitionName) {
CreatePartitionReq createPartitionReq = CreatePartitionReq.builder().collectionName(collectionName).partitionName(partitionName).build();
milvusClientV2.createPartition(createPartitionReq);
}
public void close() {
log.info("Closing Milvus client");
milvusClientV2.close();
}
}

View File

@ -0,0 +1,112 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import io.milvus.v2.common.DataType;
import java.util.Arrays;
/**
* @author ziming(子茗)
* @date 12/27/24
* @description
*/
public class MilvusColumn {
private String name;
private String type;
private DataType milvusTypeEnum;
private Boolean isPrimaryKey;
private Integer dimension;
private Boolean isPartitionKey;
private Integer maxLength;
private Boolean isAutoId;
private Integer maxCapacity;
private String elementType;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
for (DataType item : DataType.values()) {
if (item.name().equalsIgnoreCase(type)) {
this.milvusTypeEnum = item;
break;
}
}
if (this.milvusTypeEnum == null) {
throw new RuntimeException("Unsupported type: " + type + " supported types: " + Arrays.toString(DataType.values()));
}
}
public Integer getDimension() {
return dimension;
}
public void setDimension(Integer dimension) {
this.dimension = dimension;
}
public Integer getMaxLength() {
return maxLength;
}
public void setMaxLength(Integer maxLength) {
this.maxLength = maxLength;
}
public Boolean getPrimaryKey() {
return isPrimaryKey;
}
public Boolean getPartitionKey() {
return isPartitionKey;
}
public void setPartitionKey(Boolean partitionKey) {
isPartitionKey = partitionKey;
}
public void setPrimaryKey(Boolean primaryKey) {
isPrimaryKey = primaryKey;
}
public Boolean getAutoId() {
return isAutoId;
}
public void setAutoId(Boolean autoId) {
isAutoId = autoId;
}
public Integer getMaxCapacity() {
return maxCapacity;
}
public void setMaxCapacity(Integer maxCapacity) {
this.maxCapacity = maxCapacity;
}
public String getElementType() {
return elementType;
}
public void setElementType(String elementType) {
this.elementType = elementType;
}
public DataType getMilvusTypeEnum() {
return milvusTypeEnum;
}
public void setMilvusTypeEnum(DataType milvusTypeEnum) {
this.milvusTypeEnum = milvusTypeEnum;
}
}

View File

@ -0,0 +1,102 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import java.util.List;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.milvuswriter.enums.SchemaCreateModeEnum;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import lombok.extern.slf4j.Slf4j;
import static io.milvus.v2.common.DataType.valueOf;
/**
* @author ziming(子茗)
* @date 12/27/24
* @description
*/
@Slf4j
public class MilvusCreateCollection {
private Configuration config;
MilvusCreateCollection(Configuration originalConfig) {
this.config = originalConfig;
}
public void createCollectionByMode(MilvusClient milvusClient) {
String collection = this.config.getString(KeyConstant.COLLECTION);
SchemaCreateModeEnum schemaCreateMode = SchemaCreateModeEnum.getEnum(this.config.getString(KeyConstant.SCHAME_CREATE_MODE));
List<MilvusColumn> milvusColumnMeta = JSON.parseObject(config.getString(KeyConstant.COLUMN), new TypeReference<List<MilvusColumn>>() {
});
Boolean hasCollection = milvusClient.hasCollection(collection);
if (schemaCreateMode == SchemaCreateModeEnum.CREATEIFNOTEXIT) {
// create collection
if (hasCollection) {
log.info("collection[{}] already exists, continue create", collection);
} else {
log.info("creating collection[{}]", collection);
CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
milvusClient.createCollection(collection, collectionSchema);
}
} else if (schemaCreateMode == SchemaCreateModeEnum.RECREATE) {
if (hasCollection) {
log.info("collection already exist, try to drop");
milvusClient.dropCollection(collection);
}
log.info("creating collection[{}]", collection);
CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
milvusClient.createCollection(collection, collectionSchema);
} else if (schemaCreateMode == SchemaCreateModeEnum.IGNORE && !hasCollection) {
log.error("Collection not exist, throw exception");
throw new RuntimeException("Collection not exist");
}
}
private CreateCollectionReq.CollectionSchema prepareCollectionSchema(List<MilvusColumn> milvusColumnMeta) {
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
for (int i = 0; i < milvusColumnMeta.size(); i++) {
MilvusColumn milvusColumn = milvusColumnMeta.get(i);
AddFieldReq addFieldReq = AddFieldReq.builder()
.fieldName(milvusColumn.getName())
.dataType(valueOf(milvusColumn.getType()))
.build();
if (milvusColumn.getPrimaryKey() != null) {
addFieldReq.setIsPrimaryKey(milvusColumn.getPrimaryKey());
}
if (milvusColumn.getDimension() != null) {
addFieldReq.setDimension(milvusColumn.getDimension());
}
if (milvusColumn.getPartitionKey() != null) {
addFieldReq.setIsPartitionKey(milvusColumn.getPartitionKey());
}
if (milvusColumn.getMaxLength() != null) {
addFieldReq.setMaxLength(milvusColumn.getMaxLength());
}
if (milvusColumn.getAutoId() != null) {
addFieldReq.setAutoID(milvusColumn.getAutoId());
}
if (milvusColumn.getMaxCapacity() != null) {
addFieldReq.setMaxCapacity(milvusColumn.getMaxCapacity());
}
if (milvusColumn.getElementType() != null) {
addFieldReq.setElementType(DataType.valueOf(milvusColumn.getElementType()));
}
try {
collectionSchema.addField(addFieldReq);
} catch (Exception e) {
log.error("add filed[{}] error", milvusColumn.getName());
throw e;
}
}
Boolean enableDynamic = config.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA);
if (enableDynamic != null) {
collectionSchema.setEnableDynamicField(enableDynamic);
}
return collectionSchema;
}
}

View File

@ -0,0 +1,110 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class MilvusWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
originalConfig.getNecessaryValue(KeyConstant.ENDPOINT, MilvusWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(KeyConstant.COLUMN, MilvusWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(KeyConstant.COLLECTION, MilvusWriterErrorCode.REQUIRED_VALUE);
}
@Override
public void prepare() {
//collection create process
MilvusClient milvusClient = new MilvusClient(originalConfig);
try {
MilvusCreateCollection milvusCreateCollection = new MilvusCreateCollection(originalConfig);
milvusCreateCollection.createCollectionByMode(milvusClient);
String collection = originalConfig.getString(KeyConstant.COLLECTION);
String partition = originalConfig.getString(KeyConstant.PARTITION);
if (partition != null && !milvusClient.hasPartition(collection, partition)) {
log.info("collection[{}] not contain partition[{}],try to create partition", collection, partition);
milvusClient.createPartition(collection, partition);
}
} catch (Exception e) {
throw DataXException.asDataXException(MilvusWriterErrorCode.MILVUS_COLLECTION, e.getMessage(), e);
} finally {
milvusClient.close();
}
}
/**
* 切分任务<br>
*
* @param mandatoryNumber 为了做到ReaderWriter任务数对等这里要求Writer插件必须按照源端的切分数进行切分否则框架报错
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configList = new ArrayList<>();
for (int i = 0; i < mandatoryNumber; i++) {
configList.add(this.originalConfig.clone());
}
return configList;
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private MilvusBufferWriter milvusBufferWriter;
MilvusClient milvusClient;
@Override
public void init() {
log.info("Initializing Milvus writer");
// get configuration
Configuration writerSliceConfig = this.getPluginJobConf();
this.milvusClient = new MilvusClient(writerSliceConfig);
this.milvusBufferWriter = new MilvusBufferWriter(this.milvusClient, writerSliceConfig);
log.info("Milvus writer initialized");
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
milvusBufferWriter.add(record, this.getTaskPluginCollector());
if (milvusBufferWriter.needCommit()) {
log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
milvusBufferWriter.commit();
}
}
if (milvusBufferWriter.getDataCacheSize() > 0) {
log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
milvusBufferWriter.commit();
}
}
@Override
public void prepare() {
super.prepare();
}
@Override
public void destroy() {
if (this.milvusClient != null) {
this.milvusClient.close();
}
}
}
}

View File

@ -0,0 +1,35 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* @author ziming(子茗)
* @date 12/27/24
* @description
*/
public enum MilvusWriterErrorCode implements ErrorCode {
MILVUS_COLLECTION("MilvusWriter-01", "collection process error"),
REQUIRED_VALUE("MilvusWriter-02", "miss required parameter");
private final String code;
private final String description;
MilvusWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}

View File

@ -0,0 +1,34 @@
package com.alibaba.datax.plugin.writer.milvuswriter.enums;
import lombok.extern.slf4j.Slf4j;
/**
* @author ziming(子茗)
* @date 12/27/24
* @description
*/
@Slf4j
public enum SchemaCreateModeEnum {
CREATEIFNOTEXIT("createIfNotExist"),
IGNORE("ignore"),
RECREATE("recreate");
String type;
SchemaCreateModeEnum(String type) {
this.type = type;
}
public String getType() {
return type;
}
public static SchemaCreateModeEnum getEnum(String name) {
for (SchemaCreateModeEnum value : SchemaCreateModeEnum.values()) {
if (value.getType().equalsIgnoreCase(name)) {
return value;
}
}
log.info("use default CREATEIFNOTEXIT schame create mode");
return CREATEIFNOTEXIT;
}
}

View File

@ -0,0 +1,28 @@
package com.alibaba.datax.plugin.writer.milvuswriter.enums;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public enum WriteModeEnum {
INSERT("insert"),
UPSERT("upsert");
String mode;
public String getMode() {
return mode;
}
WriteModeEnum(String mode) {
this.mode = mode;
}
public static WriteModeEnum getEnum(String mode) {
for (WriteModeEnum writeModeEnum : WriteModeEnum.values()) {
if (writeModeEnum.getMode().equalsIgnoreCase(mode)) {
return writeModeEnum;
}
}
log.info("use default write mode upsert");
return UPSERT;
}
}

View File

@ -0,0 +1,6 @@
{
"name": "milvuswriter",
"class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter",
"description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.",
"developer": "nianliuu"
}

View File

@ -0,0 +1,12 @@
{
"name": "milvuswriter",
"parameter": {
"endpoint": "",
"username": "",
"password": "",
"database": "",
"collection": "",
"column": [],
"enableDynamicSchema": ""
}
}

View File

@ -546,5 +546,12 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>milvuswriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets> </fileSets>
</assembly> </assembly>

View File

@ -129,6 +129,7 @@
<module>adbmysqlwriter</module> <module>adbmysqlwriter</module>
<module>sybasewriter</module> <module>sybasewriter</module>
<module>neo4jwriter</module> <module>neo4jwriter</module>
<module>milvuswriter</module>
<!-- common support module --> <!-- common support module -->
<module>plugin-rdbms-util</module> <module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module> <module>plugin-unstructured-storage-util</module>