mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 01:30:50 +08:00
to #62452156 feat:add milvus writer
This commit is contained in:
parent
906bf3ded9
commit
b3c1c07b50
0
milvusjob.json
Normal file
0
milvusjob.json
Normal file
273
milvuswriter/doc/milvuswriter.md
Normal file
273
milvuswriter/doc/milvuswriter.md
Normal file
@ -0,0 +1,273 @@
|
|||||||
|
# DataX milvuswriter
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
|
||||||
|
## 1 快速介绍
|
||||||
|
|
||||||
|
milvuswriter 插件实现了写入数据到 milvus集合的功能; 面向ETL开发工程师,使用 milvuswriter 从数仓导入数据到 milvus, 同时 milvuswriter 亦可以作为数据迁移工具为DBA等用户提供服务。
|
||||||
|
|
||||||
|
|
||||||
|
## 2 实现原理
|
||||||
|
|
||||||
|
milvuswriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 `upsert/insert `方式写入数据到milvus, 并通过batchSize累积的方式进行数据提交。
|
||||||
|
<br />
|
||||||
|
|
||||||
|
注意:upsert写入方式(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
|
||||||
|
insert写入方式: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。
|
||||||
|
|
||||||
|
|
||||||
|
## 3 功能说明
|
||||||
|
|
||||||
|
### 3.1 配置样例
|
||||||
|
|
||||||
|
* 这里提供一份从内存产生数据导入到 milvus的配置样例。
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "streamreader",
|
||||||
|
"parameter": {
|
||||||
|
"column" : [
|
||||||
|
{
|
||||||
|
"value": 1,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "[1.1,1.2,1.3]",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": 100,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": 200,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": 300,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": 3.14159,
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": 3.1415926,
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "testvarcharvalue",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": true,
|
||||||
|
"type": "bool"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "[1.123,1.2456,1.3789]",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "[2.123,2.2456,2.3789]",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "12345678",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "{\"a\":1,\"b\":2,\"c\":3}",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "[1,2,3,4]",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sliceRecordCount": 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"parameter": {
|
||||||
|
"schemaCreateMode": "createIfNotExist",
|
||||||
|
"connectTimeoutMs": 60000,
|
||||||
|
"writeMode": "upsert",
|
||||||
|
"collection": "demo01",
|
||||||
|
"type": "milvus",
|
||||||
|
"token": "xxxxxxx",
|
||||||
|
"endpoint": "https://xxxxxxxx.com:443",
|
||||||
|
"batchSize": 1024,
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "Int64",
|
||||||
|
"primaryKey": "true"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "floatvector",
|
||||||
|
"type": "FloatVector",
|
||||||
|
"dimension": "3"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "int8col",
|
||||||
|
"type": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "int16col",
|
||||||
|
"type": "Int16"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "int32col",
|
||||||
|
"type": "Int32"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "floatcol",
|
||||||
|
"type": "Float"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "doublecol",
|
||||||
|
"type": "Double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "varcharcol",
|
||||||
|
"type": "VarChar"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "boolcol",
|
||||||
|
"type": "Bool"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "bfloat16vectorcol",
|
||||||
|
"type": "BFloat16Vector",
|
||||||
|
"dimension": "3"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "float16vectorcol",
|
||||||
|
"type": "Float16Vector",
|
||||||
|
"dimension": "3"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "binaryvectorcol",
|
||||||
|
"type": "BinaryVector",
|
||||||
|
"dimension": "64"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "jsoncol",
|
||||||
|
"type": "JSON"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "intarraycol",
|
||||||
|
"maxCapacity": "8",
|
||||||
|
"type": "Array",
|
||||||
|
"elementType": "Int32"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"name": "milvuswriter"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"errorLimit": {
|
||||||
|
"record": "0"
|
||||||
|
},
|
||||||
|
"speed": {
|
||||||
|
"concurrent": 2,
|
||||||
|
"channel": 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### 3.2 参数说明
|
||||||
|
|
||||||
|
* **endpoint**
|
||||||
|
* 描述:milvus数据库的连接信息,包含地址和端口,例如https://xxxxxxxx.com:443
|
||||||
|
|
||||||
|
注意:1、在一个数据库上只能配置一个 endpoint 值
|
||||||
|
2、一个milvus 写入任务仅能配置一个 endpoint
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* *schemaCreateMode*
|
||||||
|
* 描述: 集合创建的模式,同步时milvus集合不存在的处理方式, 根据配置的column属性进行创建
|
||||||
|
* 取值
|
||||||
|
* createIfNotExist: 如果集合不存在,则创建集合,如果集合存在,则不执行任何操作
|
||||||
|
* ignore: 如果集合不存在,任务异常报错,如果集合存在,则不执行任何操作
|
||||||
|
* recreate: 如果集合不存在,则创建集合,如果集合存在,则删除集合重建集合
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:createIfNotExist <br />
|
||||||
|
* **connectTimeoutMs**
|
||||||
|
* 描述:与milvus交互是客户端的连接超时时间,单位毫秒 <br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:10000 <br />
|
||||||
|
* **token**
|
||||||
|
* 描述:milvus实例认证的token秘钥,与username认证方式二选一配置 <br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **username**
|
||||||
|
* 描述:目的milvus数据库的用户名, 与token二选一配置 <br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **password**
|
||||||
|
* 描述:目的milvus数据库的密码 <br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* *writeMode*
|
||||||
|
* 描述: 写入milvus集合的写入方式
|
||||||
|
* 取值
|
||||||
|
* upsert(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
|
||||||
|
* insert: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:upsert <br />
|
||||||
|
* **collection**
|
||||||
|
* 描述:目的集合名称。 只能配置一个milvus的集合名称。
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:无 <br />
|
||||||
|
* **batchSize**
|
||||||
|
* 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与milvus的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。<br />
|
||||||
|
* 必选:否 <br />
|
||||||
|
* 默认值:1024<br />
|
||||||
|
|
||||||
|
* **column**
|
||||||
|
* 描述:目的集合需要写入数据的字段,字段内容用json格式描述,字段之间用英文逗号分隔。字段属性必填name、type, 其他属性在需要schemaCreateMode创建集合按需填入,例如:
|
||||||
|
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "Int64",
|
||||||
|
"primaryKey": "true"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "floatvector",
|
||||||
|
"type": "FloatVector",
|
||||||
|
"dimension": "3"
|
||||||
|
}]
|
||||||
|
* 必选:是 <br />
|
||||||
|
* 默认值:否 <br />
|
||||||
|
### 3.3 支持同步milvus字段类型
|
||||||
|
Bool,
|
||||||
|
Int8,
|
||||||
|
Int16,
|
||||||
|
Int32,
|
||||||
|
Int64,
|
||||||
|
Float,
|
||||||
|
Double,
|
||||||
|
String,
|
||||||
|
VarChar,
|
||||||
|
Array,
|
||||||
|
JSON,
|
||||||
|
BinaryVector,
|
||||||
|
FloatVector,
|
||||||
|
Float16Vector,
|
||||||
|
BFloat16Vector,
|
||||||
|
SparseFloatVector
|
||||||
|
|
@ -16,32 +16,48 @@
|
|||||||
<kotlin.code.style>official</kotlin.code.style>
|
<kotlin.code.style>official</kotlin.code.style>
|
||||||
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
|
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencyManagement>
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>guava</artifactId>
|
|
||||||
<groupId>com.google.guava</groupId>
|
|
||||||
<version>32.0.1-jre</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
</dependencyManagement>
|
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.fastjson2</groupId>
|
||||||
|
<artifactId>fastjson2</artifactId>
|
||||||
|
<version>2.0.49</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.30</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<artifactId>guava</artifactId>
|
||||||
|
<groupId>com.google.guava</groupId>
|
||||||
|
<version>32.0.1-jre</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.milvus</groupId>
|
<groupId>io.milvus</groupId>
|
||||||
<artifactId>milvus-sdk-java</artifactId>
|
<artifactId>milvus-sdk-java</artifactId>
|
||||||
<version>2.4.8</version>
|
<version>2.5.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.jetbrains.kotlin</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>kotlin-test-junit5</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
<version>2.0.0</version>
|
<version>3.3.3</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.junit.jupiter</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit-jupiter</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<version>5.10.0</version>
|
<version>4.11</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
@ -50,16 +66,16 @@
|
|||||||
<version>2.0.0</version>
|
<version>2.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.datax</groupId>
|
<groupId>org.powermock</groupId>
|
||||||
<artifactId>datax-common</artifactId>
|
<artifactId>powermock-module-junit4</artifactId>
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
<version>2.0.9</version>
|
||||||
<scope>compile</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.projectlombok</groupId>
|
<groupId>org.powermock</groupId>
|
||||||
<artifactId>lombok</artifactId>
|
<artifactId>powermock-api-mockito2</artifactId>
|
||||||
<version>1.18.30</version>
|
<version>2.0.9</version>
|
||||||
<scope>provided</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
@ -1,130 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.alibaba.datax.plugin.writer.milvuswriter;
|
|
||||||
|
|
||||||
import java.nio.Buffer;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
public class BufferUtils {
|
|
||||||
|
|
||||||
public static ByteBuffer toByteBuffer(Short[] shortArray) {
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
|
|
||||||
|
|
||||||
for (Short value : shortArray) {
|
|
||||||
byteBuffer.putShort(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compatible compilation and running versions are not consistent
|
|
||||||
// Flip the buffer to prepare for reading
|
|
||||||
((Buffer) byteBuffer).flip();
|
|
||||||
|
|
||||||
return byteBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Short[] toShortArray(ByteBuffer byteBuffer) {
|
|
||||||
Short[] shortArray = new Short[byteBuffer.capacity() / 2];
|
|
||||||
|
|
||||||
for (int i = 0; i < shortArray.length; i++) {
|
|
||||||
shortArray[i] = byteBuffer.getShort();
|
|
||||||
}
|
|
||||||
|
|
||||||
return shortArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ByteBuffer toByteBuffer(Float[] floatArray) {
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
|
|
||||||
|
|
||||||
for (float value : floatArray) {
|
|
||||||
byteBuffer.putFloat(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
((Buffer) byteBuffer).flip();
|
|
||||||
|
|
||||||
return byteBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Float[] toFloatArray(ByteBuffer byteBuffer) {
|
|
||||||
Float[] floatArray = new Float[byteBuffer.capacity() / 4];
|
|
||||||
|
|
||||||
for (int i = 0; i < floatArray.length; i++) {
|
|
||||||
floatArray[i] = byteBuffer.getFloat();
|
|
||||||
}
|
|
||||||
|
|
||||||
return floatArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ByteBuffer toByteBuffer(Double[] doubleArray) {
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
|
|
||||||
|
|
||||||
for (double value : doubleArray) {
|
|
||||||
byteBuffer.putDouble(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
((Buffer) byteBuffer).flip();
|
|
||||||
|
|
||||||
return byteBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
|
|
||||||
Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
|
|
||||||
|
|
||||||
for (int i = 0; i < doubleArray.length; i++) {
|
|
||||||
doubleArray[i] = byteBuffer.getDouble();
|
|
||||||
}
|
|
||||||
|
|
||||||
return doubleArray;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ByteBuffer toByteBuffer(Integer[] intArray) {
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
|
|
||||||
|
|
||||||
for (int value : intArray) {
|
|
||||||
byteBuffer.putInt(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
((Buffer) byteBuffer).flip();
|
|
||||||
|
|
||||||
return byteBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Integer[] toIntArray(ByteBuffer byteBuffer) {
|
|
||||||
Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
|
|
||||||
|
|
||||||
for (int i = 0; i < intArray.length; i++) {
|
|
||||||
intArray[i] = byteBuffer.getInt();
|
|
||||||
}
|
|
||||||
|
|
||||||
return intArray;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +1,17 @@
|
|||||||
package com.alibaba.datax.plugin.writer.milvuswriter;
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
public class KeyConstant {
|
public class KeyConstant {
|
||||||
public static final String URI = "uri";
|
public static final String USERNAME = "username";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
public static final String ENDPOINT = "endpoint";
|
||||||
public static final String TOKEN = "token";
|
public static final String TOKEN = "token";
|
||||||
public static final String DATABASE = "database";
|
public static final String DATABASE = "database";
|
||||||
public static final String COLLECTION = "collection";
|
public static final String COLLECTION = "collection";
|
||||||
public static final String AUTO_ID = "autoId";
|
|
||||||
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
|
|
||||||
public static final String BATCH_SIZE = "batchSize";
|
public static final String BATCH_SIZE = "batchSize";
|
||||||
public static final String COLUMN = "column";
|
public static final String COLUMN = "column";
|
||||||
public static final String COLUMN_TYPE = "type";
|
public static final String SCHAME_CREATE_MODE = "schemaCreateMode";
|
||||||
public static final String COLUMN_NAME = "name";
|
public static final String WRITE_MODE = "writeMode";
|
||||||
public static final String VECTOR_DIMENSION = "dimension";
|
public static final String PARTITION = "partition";
|
||||||
public static final String IS_PRIMARY_KEY = "isPrimaryKey";
|
public static final String CONNECT_TIMEOUT_MS = "connectTimeoutMs";
|
||||||
// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception"
|
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
|
||||||
public static final String schemaCreateMode = "schemaCreateMode";
|
|
||||||
public static final String IS_PARTITION_KEY = "isPartitionKey";
|
|
||||||
public static final String MAX_LENGTH = "maxLength";
|
|
||||||
}
|
}
|
@ -1,43 +1,166 @@
|
|||||||
package com.alibaba.datax.plugin.writer.milvuswriter;
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.plugin.writer.milvuswriter.enums.WriteModeEnum;
|
||||||
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import io.milvus.v2.client.MilvusClientV2;
|
import io.milvus.v2.common.DataType;
|
||||||
import io.milvus.v2.service.vector.request.UpsertReq;
|
import io.milvus.v2.service.vector.request.data.BFloat16Vec;
|
||||||
|
import io.milvus.v2.service.vector.request.data.Float16Vec;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.TypeReference;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MilvusBufferWriter {
|
public class MilvusBufferWriter {
|
||||||
|
|
||||||
private final MilvusClientV2 milvusClientV2;
|
private final MilvusClient milvusClient;
|
||||||
private final String collection;
|
private final String collection;
|
||||||
private final Integer batchSize;
|
private final Integer batchSize;
|
||||||
private List<JsonObject> dataCache;
|
private List<JsonObject> dataCache;
|
||||||
|
private List<MilvusColumn> milvusColumnMeta;
|
||||||
|
private WriteModeEnum writeMode;
|
||||||
|
private String partition;
|
||||||
|
|
||||||
public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){
|
public MilvusBufferWriter(MilvusClient milvusClient, Configuration writerSliceConfig) {
|
||||||
this.milvusClientV2 = milvusClientV2;
|
this.milvusClient = milvusClient;
|
||||||
this.collection = collection;
|
this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
|
||||||
this.batchSize = batchSize;
|
this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
|
||||||
this.dataCache = new ArrayList<>();
|
this.dataCache = new ArrayList<>(batchSize);
|
||||||
|
this.milvusColumnMeta = JSON.parseObject(writerSliceConfig.getString(KeyConstant.COLUMN), new TypeReference<List<MilvusColumn>>() {
|
||||||
|
});
|
||||||
|
this.writeMode = WriteModeEnum.getEnum(writerSliceConfig.getString(KeyConstant.WRITE_MODE));
|
||||||
|
this.partition = writerSliceConfig.getString(KeyConstant.PARTITION);
|
||||||
}
|
}
|
||||||
public void write(JsonObject data){
|
|
||||||
dataCache.add(data);
|
public void add(Record record, TaskPluginCollector taskPluginCollector) {
|
||||||
|
try {
|
||||||
|
JsonObject data = this.convertByType(milvusColumnMeta, record);
|
||||||
|
dataCache.add(data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
taskPluginCollector.collectDirtyRecord(record, String.format("parse record error errorMessage: %s", e.getMessage()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
public Boolean needCommit(){
|
|
||||||
|
public Boolean needCommit() {
|
||||||
return dataCache.size() >= batchSize;
|
return dataCache.size() >= batchSize;
|
||||||
}
|
}
|
||||||
public void commit(){
|
|
||||||
if(dataCache.isEmpty()){
|
public void commit() {
|
||||||
|
if (dataCache.isEmpty()) {
|
||||||
log.info("dataCache is empty, skip commit");
|
log.info("dataCache is empty, skip commit");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
UpsertReq upsertReq = UpsertReq.builder()
|
if (writeMode == WriteModeEnum.INSERT) {
|
||||||
.collectionName(collection)
|
milvusClient.insert(collection, partition, dataCache);
|
||||||
.data(dataCache)
|
} else {
|
||||||
.build();
|
milvusClient.upsert(collection, partition, dataCache);
|
||||||
milvusClientV2.upsert(upsertReq);
|
}
|
||||||
dataCache = new ArrayList<>();
|
dataCache = new ArrayList<>(batchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getDataCacheSize() {
|
||||||
|
return dataCache.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private JsonObject convertByType(List<MilvusColumn> milvusColumnMeta, Record record) {
|
||||||
|
JsonObject data = new JsonObject();
|
||||||
|
Gson gson = new Gson();
|
||||||
|
for (int i = 0; i < record.getColumnNumber(); i++) {
|
||||||
|
MilvusColumn milvusColumn = milvusColumnMeta.get(i);
|
||||||
|
DataType fieldType = milvusColumn.getMilvusTypeEnum();
|
||||||
|
String fieldName = milvusColumn.getName();
|
||||||
|
Column column = record.getColumn(i);
|
||||||
|
try {
|
||||||
|
Object field = convertToMilvusField(fieldType, column, milvusColumn);
|
||||||
|
data.add(fieldName, gson.toJsonTree(field));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("parse error for column: {} errorMessage: {}", fieldName, e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
//值需要跟这里匹配:io.milvus.param.ParamUtils#checkFieldData(io.milvus.param.collection.FieldType, java.util.List<?>, boolean)
|
||||||
|
private Object convertToMilvusField(DataType type, Column column, MilvusColumn milvusColumn) {
|
||||||
|
if (column.getRawData() == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
switch (type) {
|
||||||
|
case Int8:
|
||||||
|
case Int16:
|
||||||
|
case Int32:
|
||||||
|
case Int64:
|
||||||
|
return column.asLong();
|
||||||
|
case Float:
|
||||||
|
case Double:
|
||||||
|
return column.asDouble();
|
||||||
|
case String:
|
||||||
|
case VarChar:
|
||||||
|
return column.asString();
|
||||||
|
case Bool:
|
||||||
|
return column.asBoolean();
|
||||||
|
case BFloat16Vector:
|
||||||
|
JSONArray bFloat16ArrayJson = JSON.parseArray(column.asString());
|
||||||
|
List<Float> bfloat16Vector = new ArrayList<>();
|
||||||
|
for (int i = 0; i < bFloat16ArrayJson.size(); i++) {
|
||||||
|
Float value = Float.parseFloat(bFloat16ArrayJson.getString(i));
|
||||||
|
bfloat16Vector.add(value);
|
||||||
|
}
|
||||||
|
BFloat16Vec bFloat16Vec = new BFloat16Vec(bfloat16Vector);
|
||||||
|
ByteBuffer byteBuffer = (ByteBuffer) bFloat16Vec.getData();
|
||||||
|
return byteBuffer.array();
|
||||||
|
case Float16Vector:
|
||||||
|
JSONArray float16ArrayJson = JSON.parseArray(column.asString());
|
||||||
|
List<Float> float16Vector = new ArrayList<>();
|
||||||
|
for (int i = 0; i < float16ArrayJson.size(); i++) {
|
||||||
|
Float floatValue = Float.parseFloat(float16ArrayJson.getString(i));
|
||||||
|
float16Vector.add(floatValue);
|
||||||
|
}
|
||||||
|
Float16Vec float16Vec = new Float16Vec(float16Vector);
|
||||||
|
ByteBuffer data = (ByteBuffer) float16Vec.getData();
|
||||||
|
return data.array();
|
||||||
|
case BinaryVector:
|
||||||
|
return column.asBytes();
|
||||||
|
case FloatVector:
|
||||||
|
JSONArray arrayJson = JSON.parseArray(column.asString());
|
||||||
|
return arrayJson.stream().map(item -> Float.parseFloat(String.valueOf(item))).collect(Collectors.toList());
|
||||||
|
case SparseFloatVector:
|
||||||
|
//[3:0.5, 24:0.8, 76:0.2]
|
||||||
|
try {
|
||||||
|
JSONArray sparseFloatArray = JSON.parseArray(column.asString());
|
||||||
|
TreeMap<Long, Float> mapValue = new TreeMap<>();
|
||||||
|
for (int i = 0; i < sparseFloatArray.size(); i++) {
|
||||||
|
String value = sparseFloatArray.getString(i);
|
||||||
|
String[] split = value.split(":");
|
||||||
|
Long key = Long.parseLong(split[0]);
|
||||||
|
Float val = Float.parseFloat(split[1]);
|
||||||
|
mapValue.put(key, val);
|
||||||
|
}
|
||||||
|
return mapValue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("parse column[{}] SparseFloatVector value error, value should like [3:0.5, 24:0.8, 76:0.2], but get:{}", milvusColumn.getName(), column.asString());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
case JSON:
|
||||||
|
return column.asString();
|
||||||
|
case Array:
|
||||||
|
JSONArray parseArray = JSON.parseArray(column.asString());
|
||||||
|
return parseArray.stream().map(item -> String.valueOf(item)).collect(Collectors.toList());
|
||||||
|
default:
|
||||||
|
throw new RuntimeException(String.format("Unsupported data type[%s]", type));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,95 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
|
||||||
|
import com.google.gson.JsonObject;
|
||||||
|
import io.milvus.v2.client.ConnectConfig;
|
||||||
|
import io.milvus.v2.client.MilvusClientV2;
|
||||||
|
import io.milvus.v2.service.collection.request.CreateCollectionReq;
|
||||||
|
import io.milvus.v2.service.collection.request.DropCollectionReq;
|
||||||
|
import io.milvus.v2.service.collection.request.HasCollectionReq;
|
||||||
|
import io.milvus.v2.service.partition.request.CreatePartitionReq;
|
||||||
|
import io.milvus.v2.service.partition.request.HasPartitionReq;
|
||||||
|
import io.milvus.v2.service.vector.request.InsertReq;
|
||||||
|
import io.milvus.v2.service.vector.request.UpsertReq;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ziming(子茗)
|
||||||
|
* @date 12/27/24
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class MilvusClient {
|
||||||
|
private MilvusClientV2 milvusClientV2;
|
||||||
|
|
||||||
|
public MilvusClient(Configuration conf) {
|
||||||
|
// connect to milvus
|
||||||
|
ConnectConfig connectConfig = ConnectConfig.builder().uri(conf.getString(KeyConstant.ENDPOINT)).build();
|
||||||
|
String token = null;
|
||||||
|
if (conf.getString(KeyConstant.TOKEN) != null) {
|
||||||
|
token = conf.getString(KeyConstant.TOKEN);
|
||||||
|
} else {
|
||||||
|
token = conf.getString(KeyConstant.USERNAME) + ":" + conf.getString(KeyConstant.PASSWORD);
|
||||||
|
}
|
||||||
|
connectConfig.setToken(token);
|
||||||
|
String database = conf.getString(KeyConstant.DATABASE);
|
||||||
|
if (StringUtils.isNotBlank(database)) {
|
||||||
|
log.info("use database {}", database);
|
||||||
|
connectConfig.setDbName(conf.getString(KeyConstant.DATABASE));
|
||||||
|
}
|
||||||
|
Integer connectTimeOut = conf.getInt(KeyConstant.CONNECT_TIMEOUT_MS);
|
||||||
|
if (connectTimeOut != null) {
|
||||||
|
connectConfig.setConnectTimeoutMs(connectTimeOut);
|
||||||
|
}
|
||||||
|
this.milvusClientV2 = new MilvusClientV2(connectConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void upsert(String collection, String partition, List<JsonObject> data) {
|
||||||
|
UpsertReq upsertReq = UpsertReq.builder().collectionName(collection).data(data).build();
|
||||||
|
if (StringUtils.isNotEmpty(partition)) {
|
||||||
|
upsertReq.setPartitionName(partition);
|
||||||
|
}
|
||||||
|
milvusClientV2.upsert(upsertReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void insert(String collection, String partition, List<JsonObject> data) {
|
||||||
|
InsertReq insertReq = InsertReq.builder().collectionName(collection).data(data).build();
|
||||||
|
if (StringUtils.isNotEmpty(partition)) {
|
||||||
|
insertReq.setPartitionName(partition);
|
||||||
|
}
|
||||||
|
milvusClientV2.insert(insertReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean hasCollection(String collection) {
|
||||||
|
HasCollectionReq build = HasCollectionReq.builder().collectionName(collection).build();
|
||||||
|
return milvusClientV2.hasCollection(build);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void createCollection(String collection, CreateCollectionReq.CollectionSchema schema) {
|
||||||
|
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder().collectionName(collection).collectionSchema(schema).build();
|
||||||
|
milvusClientV2.createCollection(createCollectionReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dropCollection(String collection) {
|
||||||
|
DropCollectionReq request = DropCollectionReq.builder().collectionName(collection).build();
|
||||||
|
milvusClientV2.dropCollection(request);
|
||||||
|
}
|
||||||
|
public Boolean hasPartition(String collection, String partition) {
|
||||||
|
HasPartitionReq hasPartitionReq = HasPartitionReq.builder().collectionName(collection).partitionName(partition).build();
|
||||||
|
return milvusClientV2.hasPartition(hasPartitionReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void createPartition(String collectionName, String partitionName) {
|
||||||
|
CreatePartitionReq createPartitionReq = CreatePartitionReq.builder().collectionName(collectionName).partitionName(partitionName).build();
|
||||||
|
milvusClientV2.createPartition(createPartitionReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
log.info("Closing Milvus client");
|
||||||
|
milvusClientV2.close();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,112 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
|
import io.milvus.v2.common.DataType;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ziming(子茗)
|
||||||
|
* @date 12/27/24
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
public class MilvusColumn {
|
||||||
|
private String name;
|
||||||
|
private String type;
|
||||||
|
private DataType milvusTypeEnum;
|
||||||
|
private Boolean isPrimaryKey;
|
||||||
|
private Integer dimension;
|
||||||
|
private Boolean isPartitionKey;
|
||||||
|
private Integer maxLength;
|
||||||
|
private Boolean isAutoId;
|
||||||
|
private Integer maxCapacity;
|
||||||
|
private String elementType;
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setType(String type) {
|
||||||
|
this.type = type;
|
||||||
|
for (DataType item : DataType.values()) {
|
||||||
|
if (item.name().equalsIgnoreCase(type)) {
|
||||||
|
this.milvusTypeEnum = item;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this.milvusTypeEnum == null) {
|
||||||
|
throw new RuntimeException("Unsupported type: " + type + " supported types: " + Arrays.toString(DataType.values()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getDimension() {
|
||||||
|
return dimension;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDimension(Integer dimension) {
|
||||||
|
this.dimension = dimension;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxLength() {
|
||||||
|
return maxLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxLength(Integer maxLength) {
|
||||||
|
this.maxLength = maxLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getPrimaryKey() {
|
||||||
|
return isPrimaryKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getPartitionKey() {
|
||||||
|
return isPartitionKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartitionKey(Boolean partitionKey) {
|
||||||
|
isPartitionKey = partitionKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPrimaryKey(Boolean primaryKey) {
|
||||||
|
isPrimaryKey = primaryKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getAutoId() {
|
||||||
|
return isAutoId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAutoId(Boolean autoId) {
|
||||||
|
isAutoId = autoId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxCapacity() {
|
||||||
|
return maxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxCapacity(Integer maxCapacity) {
|
||||||
|
this.maxCapacity = maxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getElementType() {
|
||||||
|
return elementType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setElementType(String elementType) {
|
||||||
|
this.elementType = elementType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataType getMilvusTypeEnum() {
|
||||||
|
return milvusTypeEnum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMilvusTypeEnum(DataType milvusTypeEnum) {
|
||||||
|
this.milvusTypeEnum = milvusTypeEnum;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,102 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.writer.milvuswriter.enums.SchemaCreateModeEnum;
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.TypeReference;
|
||||||
|
|
||||||
|
import io.milvus.v2.common.DataType;
|
||||||
|
import io.milvus.v2.service.collection.request.AddFieldReq;
|
||||||
|
import io.milvus.v2.service.collection.request.CreateCollectionReq;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import static io.milvus.v2.common.DataType.valueOf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ziming(子茗)
|
||||||
|
* @date 12/27/24
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class MilvusCreateCollection {
|
||||||
|
|
||||||
|
private Configuration config;
|
||||||
|
|
||||||
|
MilvusCreateCollection(Configuration originalConfig) {
|
||||||
|
this.config = originalConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void createCollectionByMode(MilvusClient milvusClient) {
|
||||||
|
String collection = this.config.getString(KeyConstant.COLLECTION);
|
||||||
|
SchemaCreateModeEnum schemaCreateMode = SchemaCreateModeEnum.getEnum(this.config.getString(KeyConstant.SCHAME_CREATE_MODE));
|
||||||
|
List<MilvusColumn> milvusColumnMeta = JSON.parseObject(config.getString(KeyConstant.COLUMN), new TypeReference<List<MilvusColumn>>() {
|
||||||
|
});
|
||||||
|
Boolean hasCollection = milvusClient.hasCollection(collection);
|
||||||
|
if (schemaCreateMode == SchemaCreateModeEnum.CREATEIFNOTEXIT) {
|
||||||
|
// create collection
|
||||||
|
if (hasCollection) {
|
||||||
|
log.info("collection[{}] already exists, continue create", collection);
|
||||||
|
} else {
|
||||||
|
log.info("creating collection[{}]", collection);
|
||||||
|
CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
|
||||||
|
milvusClient.createCollection(collection, collectionSchema);
|
||||||
|
}
|
||||||
|
} else if (schemaCreateMode == SchemaCreateModeEnum.RECREATE) {
|
||||||
|
if (hasCollection) {
|
||||||
|
log.info("collection already exist, try to drop");
|
||||||
|
milvusClient.dropCollection(collection);
|
||||||
|
}
|
||||||
|
log.info("creating collection[{}]", collection);
|
||||||
|
CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
|
||||||
|
milvusClient.createCollection(collection, collectionSchema);
|
||||||
|
} else if (schemaCreateMode == SchemaCreateModeEnum.IGNORE && !hasCollection) {
|
||||||
|
log.error("Collection not exist, throw exception");
|
||||||
|
throw new RuntimeException("Collection not exist");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CreateCollectionReq.CollectionSchema prepareCollectionSchema(List<MilvusColumn> milvusColumnMeta) {
|
||||||
|
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
|
||||||
|
for (int i = 0; i < milvusColumnMeta.size(); i++) {
|
||||||
|
MilvusColumn milvusColumn = milvusColumnMeta.get(i);
|
||||||
|
AddFieldReq addFieldReq = AddFieldReq.builder()
|
||||||
|
.fieldName(milvusColumn.getName())
|
||||||
|
.dataType(valueOf(milvusColumn.getType()))
|
||||||
|
.build();
|
||||||
|
if (milvusColumn.getPrimaryKey() != null) {
|
||||||
|
addFieldReq.setIsPrimaryKey(milvusColumn.getPrimaryKey());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getDimension() != null) {
|
||||||
|
addFieldReq.setDimension(milvusColumn.getDimension());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getPartitionKey() != null) {
|
||||||
|
addFieldReq.setIsPartitionKey(milvusColumn.getPartitionKey());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getMaxLength() != null) {
|
||||||
|
addFieldReq.setMaxLength(milvusColumn.getMaxLength());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getAutoId() != null) {
|
||||||
|
addFieldReq.setAutoID(milvusColumn.getAutoId());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getMaxCapacity() != null) {
|
||||||
|
addFieldReq.setMaxCapacity(milvusColumn.getMaxCapacity());
|
||||||
|
}
|
||||||
|
if (milvusColumn.getElementType() != null) {
|
||||||
|
addFieldReq.setElementType(DataType.valueOf(milvusColumn.getElementType()));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
collectionSchema.addField(addFieldReq);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("add filed[{}] error", milvusColumn.getName());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Boolean enableDynamic = config.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA);
|
||||||
|
if (enableDynamic != null) {
|
||||||
|
collectionSchema.setEnableDynamicField(enableDynamic);
|
||||||
|
}
|
||||||
|
return collectionSchema;
|
||||||
|
}
|
||||||
|
}
|
@ -1,86 +0,0 @@
|
|||||||
package com.alibaba.datax.plugin.writer.milvuswriter;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
import com.google.gson.JsonObject;
|
|
||||||
import com.google.gson.JsonParser;
|
|
||||||
import io.milvus.v2.common.DataType;
|
|
||||||
import static io.milvus.v2.common.DataType.*;
|
|
||||||
import io.milvus.v2.service.collection.request.AddFieldReq;
|
|
||||||
import io.milvus.v2.service.collection.request.CreateCollectionReq;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class MilvusSinkConverter {
|
|
||||||
public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) {
|
|
||||||
JsonObject data = new JsonObject();
|
|
||||||
Gson gson = new Gson();
|
|
||||||
for(int i = 0; i < record.getColumnNumber(); i++) {
|
|
||||||
String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE);
|
|
||||||
String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME);
|
|
||||||
Object rawData = record.getColumn(i).getRawData();
|
|
||||||
Object field = convertToMilvusField(fieldType, rawData);
|
|
||||||
data.add(fieldName, gson.toJsonTree(field));
|
|
||||||
}
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Object convertToMilvusField(String type, Object rawData) {
|
|
||||||
Gson gson = new Gson();
|
|
||||||
switch (valueOf(type)) {
|
|
||||||
case Int32:
|
|
||||||
return Integer.parseInt(rawData.toString());
|
|
||||||
case Int64:
|
|
||||||
return Long.parseLong(rawData.toString());
|
|
||||||
case Float:
|
|
||||||
return java.lang.Float.parseFloat(rawData.toString());
|
|
||||||
case String:
|
|
||||||
case VarChar:
|
|
||||||
return rawData.toString();
|
|
||||||
case Bool:
|
|
||||||
return Boolean.parseBoolean(rawData.toString());
|
|
||||||
case FloatVector:
|
|
||||||
java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
|
|
||||||
return Arrays.stream(floats).collect(Collectors.toList());
|
|
||||||
case BinaryVector:
|
|
||||||
java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
|
|
||||||
return BufferUtils.toByteBuffer(binarys);
|
|
||||||
case Float16Vector:
|
|
||||||
case BFloat16Vector:
|
|
||||||
// all these data is byte format in milvus
|
|
||||||
ByteBuffer binaryVector = (ByteBuffer) rawData;
|
|
||||||
return gson.toJsonTree(binaryVector.array());
|
|
||||||
case SparseFloatVector:
|
|
||||||
return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject();
|
|
||||||
default:
|
|
||||||
throw new RuntimeException("Unsupported data type");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) {
|
|
||||||
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
|
|
||||||
for (int i = 0; i < milvusColumnMeta.size(); i++) {
|
|
||||||
AddFieldReq addFieldReq = AddFieldReq.builder()
|
|
||||||
.fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME))
|
|
||||||
.dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE)))
|
|
||||||
.build();
|
|
||||||
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) {
|
|
||||||
addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY));
|
|
||||||
}
|
|
||||||
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) {
|
|
||||||
addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION));
|
|
||||||
}
|
|
||||||
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) {
|
|
||||||
addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY));
|
|
||||||
}
|
|
||||||
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) {
|
|
||||||
addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
|
|
||||||
}
|
|
||||||
collectionSchema.addField(addFieldReq);
|
|
||||||
}
|
|
||||||
return collectionSchema;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,29 +1,49 @@
|
|||||||
package com.alibaba.datax.plugin.writer.milvuswriter;
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
|
||||||
import com.google.gson.JsonObject;
|
|
||||||
import io.milvus.v2.client.ConnectConfig;
|
|
||||||
import io.milvus.v2.client.MilvusClientV2;
|
|
||||||
import io.milvus.v2.common.DataType;
|
|
||||||
import io.milvus.v2.service.collection.request.AddFieldReq;
|
|
||||||
import io.milvus.v2.service.collection.request.CreateCollectionReq;
|
|
||||||
import io.milvus.v2.service.collection.request.HasCollectionReq;
|
|
||||||
import io.milvus.v2.service.vector.request.UpsertReq;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MilvusWriter extends Writer {
|
public class MilvusWriter extends Writer {
|
||||||
public static class Job extends Writer.Job {
|
public static class Job extends Writer.Job {
|
||||||
private Configuration originalConfig = null;
|
private Configuration originalConfig = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.originalConfig = super.getPluginJobConf();
|
||||||
|
originalConfig.getNecessaryValue(KeyConstant.ENDPOINT, MilvusWriterErrorCode.REQUIRED_VALUE);
|
||||||
|
originalConfig.getNecessaryValue(KeyConstant.COLUMN, MilvusWriterErrorCode.REQUIRED_VALUE);
|
||||||
|
originalConfig.getNecessaryValue(KeyConstant.COLLECTION, MilvusWriterErrorCode.REQUIRED_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
//collection create process
|
||||||
|
MilvusClient milvusClient = new MilvusClient(originalConfig);
|
||||||
|
try {
|
||||||
|
MilvusCreateCollection milvusCreateCollection = new MilvusCreateCollection(originalConfig);
|
||||||
|
milvusCreateCollection.createCollectionByMode(milvusClient);
|
||||||
|
String collection = originalConfig.getString(KeyConstant.COLLECTION);
|
||||||
|
String partition = originalConfig.getString(KeyConstant.PARTITION);
|
||||||
|
if (partition != null && !milvusClient.hasPartition(collection, partition)) {
|
||||||
|
log.info("collection[{}] not contain partition[{}],try to create partition", collection, partition);
|
||||||
|
milvusClient.createPartition(collection, partition);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw DataXException.asDataXException(MilvusWriterErrorCode.MILVUS_COLLECTION, e.getMessage(), e);
|
||||||
|
} finally {
|
||||||
|
milvusClient.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 切分任务。<br>
|
* 切分任务。<br>
|
||||||
*
|
*
|
||||||
@ -31,100 +51,60 @@ public class MilvusWriter extends Writer {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Configuration> split(int mandatoryNumber) {
|
public List<Configuration> split(int mandatoryNumber) {
|
||||||
List<Configuration> configList = new ArrayList<Configuration>();
|
List<Configuration> configList = new ArrayList<>();
|
||||||
for(int i = 0; i < mandatoryNumber; i++) {
|
for (int i = 0; i < mandatoryNumber; i++) {
|
||||||
configList.add(this.originalConfig.clone());
|
configList.add(this.originalConfig.clone());
|
||||||
}
|
}
|
||||||
return configList;
|
return configList;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void init() {
|
|
||||||
this.originalConfig = super.getPluginJobConf();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Task extends Writer.Task {
|
public static class Task extends Writer.Task {
|
||||||
|
|
||||||
private MilvusClientV2 milvusClientV2;
|
|
||||||
|
|
||||||
private MilvusSinkConverter milvusSinkConverter;
|
|
||||||
private MilvusBufferWriter milvusBufferWriter;
|
private MilvusBufferWriter milvusBufferWriter;
|
||||||
|
MilvusClient milvusClient;
|
||||||
private String collection = null;
|
|
||||||
private JSONArray milvusColumnMeta;
|
|
||||||
|
|
||||||
private String schemaCreateMode = "createWhenTableNotExit";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void startWrite(RecordReceiver lineReceiver) {
|
|
||||||
Record record = lineReceiver.getFromReader();
|
|
||||||
JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
|
|
||||||
milvusBufferWriter.write(data);
|
|
||||||
if(milvusBufferWriter.needCommit()){
|
|
||||||
log.info("Reached buffer limit, Committing data");
|
|
||||||
milvusBufferWriter.commit();
|
|
||||||
log.info("Data committed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
log.info("Initializing Milvus writer");
|
log.info("Initializing Milvus writer");
|
||||||
// get configuration
|
// get configuration
|
||||||
Configuration writerSliceConfig = this.getPluginJobConf();
|
Configuration writerSliceConfig = this.getPluginJobConf();
|
||||||
this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
|
this.milvusClient = new MilvusClient(writerSliceConfig);
|
||||||
this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN));
|
this.milvusBufferWriter = new MilvusBufferWriter(this.milvusClient, writerSliceConfig);
|
||||||
this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ?
|
|
||||||
"createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode);
|
|
||||||
int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
|
|
||||||
log.info("Collection:{}", this.collection);
|
|
||||||
// connect to milvus
|
|
||||||
ConnectConfig connectConfig = ConnectConfig.builder()
|
|
||||||
.uri(writerSliceConfig.getString(KeyConstant.URI))
|
|
||||||
.token(writerSliceConfig.getString(KeyConstant.TOKEN))
|
|
||||||
.build();
|
|
||||||
if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
|
|
||||||
log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
|
|
||||||
connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
|
|
||||||
}
|
|
||||||
this.milvusClientV2 = new MilvusClientV2(connectConfig);
|
|
||||||
this.milvusSinkConverter = new MilvusSinkConverter();
|
|
||||||
this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize);
|
|
||||||
log.info("Milvus writer initialized");
|
log.info("Milvus writer initialized");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startWrite(RecordReceiver lineReceiver) {
|
||||||
|
Record record = null;
|
||||||
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
|
milvusBufferWriter.add(record, this.getTaskPluginCollector());
|
||||||
|
if (milvusBufferWriter.needCommit()) {
|
||||||
|
log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
|
||||||
|
milvusBufferWriter.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (milvusBufferWriter.getDataCacheSize() > 0) {
|
||||||
|
log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
|
||||||
|
milvusBufferWriter.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prepare() {
|
public void prepare() {
|
||||||
super.prepare();
|
super.prepare();
|
||||||
Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
|
|
||||||
if (!hasCollection) {
|
|
||||||
log.info("Collection not exist");
|
|
||||||
if (schemaCreateMode.equals("createWhenTableNotExit")) {
|
|
||||||
// create collection
|
|
||||||
log.info("Creating collection:{}", this.collection);
|
|
||||||
CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta);
|
|
||||||
|
|
||||||
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
|
|
||||||
.collectionName(collection)
|
|
||||||
.collectionSchema(collectionSchema)
|
|
||||||
.build();
|
|
||||||
milvusClientV2.createCollection(createCollectionReq);
|
|
||||||
} else if (schemaCreateMode.equals("exception")) {
|
|
||||||
log.error("Collection not exist, throw exception");
|
|
||||||
throw new RuntimeException("Collection not exist");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
log.info("Closing Milvus writer, committing data and closing connection");
|
if (this.milvusClient != null) {
|
||||||
this.milvusBufferWriter.commit();
|
this.milvusClient.close();
|
||||||
this.milvusClientV2.close();
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,35 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.spi.ErrorCode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ziming(子茗)
|
||||||
|
* @date 12/27/24
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
public enum MilvusWriterErrorCode implements ErrorCode {
|
||||||
|
MILVUS_COLLECTION("MilvusWriter-01", "collection process error"),
|
||||||
|
REQUIRED_VALUE("MilvusWriter-02", "miss required parameter");
|
||||||
|
private final String code;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
MilvusWriterErrorCode(String code, String description) {
|
||||||
|
this.code = code;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCode() {
|
||||||
|
return this.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return this.description;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter.enums;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ziming(子茗)
|
||||||
|
* @date 12/27/24
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public enum SchemaCreateModeEnum {
|
||||||
|
CREATEIFNOTEXIT("createIfNotExist"),
|
||||||
|
IGNORE("ignore"),
|
||||||
|
RECREATE("recreate");
|
||||||
|
String type;
|
||||||
|
|
||||||
|
SchemaCreateModeEnum(String type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SchemaCreateModeEnum getEnum(String name) {
|
||||||
|
for (SchemaCreateModeEnum value : SchemaCreateModeEnum.values()) {
|
||||||
|
if (value.getType().equalsIgnoreCase(name)) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.info("use default CREATEIFNOTEXIT schame create mode");
|
||||||
|
return CREATEIFNOTEXIT;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,28 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.milvuswriter.enums;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public enum WriteModeEnum {
|
||||||
|
INSERT("insert"),
|
||||||
|
UPSERT("upsert");
|
||||||
|
String mode;
|
||||||
|
|
||||||
|
public String getMode() {
|
||||||
|
return mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
WriteModeEnum(String mode) {
|
||||||
|
this.mode = mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static WriteModeEnum getEnum(String mode) {
|
||||||
|
for (WriteModeEnum writeModeEnum : WriteModeEnum.values()) {
|
||||||
|
if (writeModeEnum.getMode().equalsIgnoreCase(mode)) {
|
||||||
|
return writeModeEnum;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.info("use default write mode upsert");
|
||||||
|
return UPSERT;
|
||||||
|
}
|
||||||
|
}
|
@ -1,15 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "mongodbwriter",
|
"name": "milvuswriter",
|
||||||
"parameter": {
|
"parameter": {
|
||||||
"address": [],
|
"endpoint": "",
|
||||||
"userName": "",
|
"username": "",
|
||||||
"userPassword": "",
|
"password": "",
|
||||||
"dbName": "",
|
"database": "",
|
||||||
"collectionName": "",
|
"collection": "",
|
||||||
"column": [],
|
"column": [],
|
||||||
"upsertInfo": {
|
"enableDynamicSchema": ""
|
||||||
"isUpsert": "",
|
|
||||||
"upsertKey": ""
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -546,5 +546,12 @@
|
|||||||
</includes>
|
</includes>
|
||||||
<outputDirectory>datax</outputDirectory>
|
<outputDirectory>datax</outputDirectory>
|
||||||
</fileSet>
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>milvuswriter/target/datax/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.*</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>datax</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
</fileSets>
|
</fileSets>
|
||||||
</assembly>
|
</assembly>
|
||||||
|
0
stream2milvus.json
Normal file
0
stream2milvus.json
Normal file
Loading…
Reference in New Issue
Block a user