diff --git a/milvusjob.json b/milvusjob.json
new file mode 100644
index 00000000..e69de29b
diff --git a/milvuswriter/doc/milvuswriter.md b/milvuswriter/doc/milvuswriter.md
new file mode 100644
index 00000000..3788c62a
--- /dev/null
+++ b/milvuswriter/doc/milvuswriter.md
@@ -0,0 +1,273 @@
+# DataX milvuswriter
+
+
+---
+
+
+## 1 快速介绍
+
+milvuswriter 插件实现了写入数据到 milvus集合的功能; 面向ETL开发工程师,使用 milvuswriter 从数仓导入数据到 milvus, 同时 milvuswriter 亦可以作为数据迁移工具为DBA等用户提供服务。
+
+
+## 2 实现原理
+
+milvuswriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 `upsert/insert `方式写入数据到milvus, 并通过batchSize累积的方式进行数据提交。
+
+
+ 注意:upsert写入方式(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
+ insert写入方式: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 这里提供一份从内存产生数据导入到 milvus的配置样例。
+
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column" : [
+ {
+ "value": 1,
+ "type": "long"
+ },
+ {
+ "value": "[1.1,1.2,1.3]",
+ "type": "string"
+ },
+ {
+ "value": 100,
+ "type": "long"
+ },
+ {
+ "value": 200,
+ "type": "long"
+ },
+ {
+ "value": 300,
+ "type": "long"
+ },
+ {
+ "value": 3.14159,
+ "type": "double"
+ },
+ {
+ "value": 3.1415926,
+ "type": "double"
+ },
+ {
+ "value": "testvarcharvalue",
+ "type": "string"
+ },
+ {
+ "value": true,
+ "type": "bool"
+ },
+ {
+ "value": "[1.123,1.2456,1.3789]",
+ "type": "string"
+ },
+ {
+ "value": "[2.123,2.2456,2.3789]",
+ "type": "string"
+ },
+ {
+ "value": "12345678",
+ "type": "string"
+ },
+ {
+ "value": "{\"a\":1,\"b\":2,\"c\":3}",
+ "type": "string"
+ },
+ {
+ "value": "[1,2,3,4]",
+ "type": "string"
+ }
+ ],
+ "sliceRecordCount": 1
+ }
+ },
+ "writer": {
+ "parameter": {
+ "schemaCreateMode": "createIfNotExist",
+ "connectTimeoutMs": 60000,
+ "writeMode": "upsert",
+ "collection": "demo01",
+ "type": "milvus",
+ "token": "xxxxxxx",
+ "endpoint": "https://xxxxxxxx.com:443",
+ "batchSize": 1024,
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "primaryKey": "true"
+ },
+ {
+ "name": "floatvector",
+ "type": "FloatVector",
+ "dimension": "3"
+ },
+ {
+ "name": "int8col",
+ "type": "Int8"
+ },
+ {
+ "name": "int16col",
+ "type": "Int16"
+ },
+ {
+ "name": "int32col",
+ "type": "Int32"
+ },
+ {
+ "name": "floatcol",
+ "type": "Float"
+ },
+ {
+ "name": "doublecol",
+ "type": "Double"
+ },
+ {
+ "name": "varcharcol",
+ "type": "VarChar"
+ },
+ {
+ "name": "boolcol",
+ "type": "Bool"
+ },
+ {
+ "name": "bfloat16vectorcol",
+ "type": "BFloat16Vector",
+ "dimension": "3"
+ },
+ {
+ "name": "float16vectorcol",
+ "type": "Float16Vector",
+ "dimension": "3"
+ },
+ {
+ "name": "binaryvectorcol",
+ "type": "BinaryVector",
+ "dimension": "64"
+ },
+ {
+ "name": "jsoncol",
+ "type": "JSON"
+ },
+ {
+ "name": "intarraycol",
+ "maxCapacity": "8",
+ "type": "Array",
+ "elementType": "Int32"
+ }
+ ]
+ },
+ "name": "milvuswriter"
+ }
+ }
+ ],
+ "setting": {
+ "errorLimit": {
+ "record": "0"
+ },
+ "speed": {
+ "concurrent": 2,
+ "channel": 2
+ }
+ }
+ }
+}
+
+```
+
+
+### 3.2 参数说明
+
+* **endpoint**
+ * 描述:milvus数据库的连接信息,包含地址和端口,例如https://xxxxxxxx.com:443
+
+ 注意:1、在一个数据库上只能配置一个 endpoint 值
+ 2、一个milvus 写入任务仅能配置一个 endpoint
+ * 必选:是
+ * 默认值:无
+* *schemaCreateMode*
+ * 描述: 集合创建的模式,同步时milvus集合不存在的处理方式, 根据配置的column属性进行创建
+ * 取值
+ * createIfNotExist: 如果集合不存在,则创建集合,如果集合存在,则不执行任何操作
+ * ignore: 如果集合不存在,任务异常报错,如果集合存在,则不执行任何操作
+ * recreate: 如果集合不存在,则创建集合,如果集合存在,则删除集合重建集合
+ * 必选:否
+ * 默认值:createIfNotExist
+* **connectTimeoutMs**
+ * 描述:与milvus交互是客户端的连接超时时间,单位毫秒
+ * 必选:否
+ * 默认值:10000
+* **token**
+ * 描述:milvus实例认证的token秘钥,与username认证方式二选一配置
+ * 必选:否
+ * 默认值:无
+* **username**
+ * 描述:目的milvus数据库的用户名, 与token二选一配置
+ * 必选:否
+ * 默认值:无
+* **password**
+ * 描述:目的milvus数据库的密码
+ * 必选:否
+ * 默认值:无
+* *writeMode*
+ * 描述: 写入milvus集合的写入方式
+ * 取值
+ * upsert(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。
+ * insert: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。
+ * 必选:是
+ * 默认值:upsert
+* **collection**
+ * 描述:目的集合名称。 只能配置一个milvus的集合名称。
+ * 必选:是
+ * 默认值:无
+* **batchSize**
+ * 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与milvus的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
+ * 必选:否
+ * 默认值:1024
+
+* **column**
+ * 描述:目的集合需要写入数据的字段,字段内容用json格式描述,字段之间用英文逗号分隔。字段属性必填name、type, 其他属性在需要schemaCreateMode创建集合按需填入,例如:
+
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "primaryKey": "true"
+ },
+ {
+ "name": "floatvector",
+ "type": "FloatVector",
+ "dimension": "3"
+ }]
+ * 必选:是
+ * 默认值:否
+### 3.3 支持同步milvus字段类型
+ Bool,
+ Int8,
+ Int16,
+ Int32,
+ Int64,
+ Float,
+ Double,
+ String,
+ VarChar,
+ Array,
+ JSON,
+ BinaryVector,
+ FloatVector,
+ Float16Vector,
+ BFloat16Vector,
+ SparseFloatVector
+
diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml
index b889cbe4..16c00560 100644
--- a/milvuswriter/pom.xml
+++ b/milvuswriter/pom.xml
@@ -16,32 +16,48 @@
official
1.8
-
-
-
- guava
- com.google.guava
- 32.0.1-jre
-
-
-
-
+
+ com.alibaba.fastjson2
+ fastjson2
+ 2.0.49
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+
+
+ guava
+ com.google.guava
+ 32.0.1-jre
+
io.milvus
milvus-sdk-java
- 2.4.8
+ 2.5.2
- org.jetbrains.kotlin
- kotlin-test-junit5
- 2.0.0
+ org.mockito
+ mockito-core
+ 3.3.3
test
- org.junit.jupiter
- junit-jupiter
- 5.10.0
+ junit
+ junit
+ 4.11
test
@@ -50,16 +66,16 @@
2.0.0
- com.alibaba.datax
- datax-common
- 0.0.1-SNAPSHOT
- compile
+ org.powermock
+ powermock-module-junit4
+ 2.0.9
+ test
- org.projectlombok
- lombok
- 1.18.30
- provided
+ org.powermock
+ powermock-api-mockito2
+ 2.0.9
+ test
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java
deleted file mode 100644
index 89153a6a..00000000
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.datax.plugin.writer.milvuswriter;
-
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-
-public class BufferUtils {
-
- public static ByteBuffer toByteBuffer(Short[] shortArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
-
- for (Short value : shortArray) {
- byteBuffer.putShort(value);
- }
-
- // Compatible compilation and running versions are not consistent
- // Flip the buffer to prepare for reading
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
-
- public static Short[] toShortArray(ByteBuffer byteBuffer) {
- Short[] shortArray = new Short[byteBuffer.capacity() / 2];
-
- for (int i = 0; i < shortArray.length; i++) {
- shortArray[i] = byteBuffer.getShort();
- }
-
- return shortArray;
- }
-
- public static ByteBuffer toByteBuffer(Float[] floatArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
-
- for (float value : floatArray) {
- byteBuffer.putFloat(value);
- }
-
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
-
- public static Float[] toFloatArray(ByteBuffer byteBuffer) {
- Float[] floatArray = new Float[byteBuffer.capacity() / 4];
-
- for (int i = 0; i < floatArray.length; i++) {
- floatArray[i] = byteBuffer.getFloat();
- }
-
- return floatArray;
- }
-
- public static ByteBuffer toByteBuffer(Double[] doubleArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
-
- for (double value : doubleArray) {
- byteBuffer.putDouble(value);
- }
-
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
-
- public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
- Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
-
- for (int i = 0; i < doubleArray.length; i++) {
- doubleArray[i] = byteBuffer.getDouble();
- }
-
- return doubleArray;
- }
-
- public static ByteBuffer toByteBuffer(Integer[] intArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
-
- for (int value : intArray) {
- byteBuffer.putInt(value);
- }
-
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
-
- public static Integer[] toIntArray(ByteBuffer byteBuffer) {
- Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
-
- for (int i = 0; i < intArray.length; i++) {
- intArray[i] = byteBuffer.getInt();
- }
-
- return intArray;
- }
-}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
index cc636404..28f1ff13 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
@@ -1,20 +1,17 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
public class KeyConstant {
- public static final String URI = "uri";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String ENDPOINT = "endpoint";
public static final String TOKEN = "token";
public static final String DATABASE = "database";
public static final String COLLECTION = "collection";
- public static final String AUTO_ID = "autoId";
- public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
public static final String BATCH_SIZE = "batchSize";
public static final String COLUMN = "column";
- public static final String COLUMN_TYPE = "type";
- public static final String COLUMN_NAME = "name";
- public static final String VECTOR_DIMENSION = "dimension";
- public static final String IS_PRIMARY_KEY = "isPrimaryKey";
-// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception"
- public static final String schemaCreateMode = "schemaCreateMode";
- public static final String IS_PARTITION_KEY = "isPartitionKey";
- public static final String MAX_LENGTH = "maxLength";
-}
+ public static final String SCHAME_CREATE_MODE = "schemaCreateMode";
+ public static final String WRITE_MODE = "writeMode";
+ public static final String PARTITION = "partition";
+ public static final String CONNECT_TIMEOUT_MS = "connectTimeoutMs";
+ public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
+}
\ No newline at end of file
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
index da686af2..b78728e4 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
@@ -1,43 +1,166 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.plugin.writer.milvuswriter.enums.WriteModeEnum;
+import com.alibaba.fastjson2.JSONArray;
+import com.google.gson.Gson;
import com.google.gson.JsonObject;
-import io.milvus.v2.client.MilvusClientV2;
-import io.milvus.v2.service.vector.request.UpsertReq;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.vector.request.data.BFloat16Vec;
+import io.milvus.v2.service.vector.request.data.Float16Vec;
import lombok.extern.slf4j.Slf4j;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
@Slf4j
public class MilvusBufferWriter {
- private final MilvusClientV2 milvusClientV2;
+ private final MilvusClient milvusClient;
private final String collection;
private final Integer batchSize;
private List dataCache;
+ private List milvusColumnMeta;
+ private WriteModeEnum writeMode;
+ private String partition;
- public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){
- this.milvusClientV2 = milvusClientV2;
- this.collection = collection;
- this.batchSize = batchSize;
- this.dataCache = new ArrayList<>();
+ public MilvusBufferWriter(MilvusClient milvusClient, Configuration writerSliceConfig) {
+ this.milvusClient = milvusClient;
+ this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
+ this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
+ this.dataCache = new ArrayList<>(batchSize);
+ this.milvusColumnMeta = JSON.parseObject(writerSliceConfig.getString(KeyConstant.COLUMN), new TypeReference>() {
+ });
+ this.writeMode = WriteModeEnum.getEnum(writerSliceConfig.getString(KeyConstant.WRITE_MODE));
+ this.partition = writerSliceConfig.getString(KeyConstant.PARTITION);
}
- public void write(JsonObject data){
- dataCache.add(data);
+
+ public void add(Record record, TaskPluginCollector taskPluginCollector) {
+ try {
+ JsonObject data = this.convertByType(milvusColumnMeta, record);
+ dataCache.add(data);
+ } catch (Exception e) {
+ taskPluginCollector.collectDirtyRecord(record, String.format("parse record error errorMessage: %s", e.getMessage()));
+ }
}
- public Boolean needCommit(){
+
+ public Boolean needCommit() {
return dataCache.size() >= batchSize;
}
- public void commit(){
- if(dataCache.isEmpty()){
+
+ public void commit() {
+ if (dataCache.isEmpty()) {
log.info("dataCache is empty, skip commit");
return;
}
- UpsertReq upsertReq = UpsertReq.builder()
- .collectionName(collection)
- .data(dataCache)
- .build();
- milvusClientV2.upsert(upsertReq);
- dataCache = new ArrayList<>();
+ if (writeMode == WriteModeEnum.INSERT) {
+ milvusClient.insert(collection, partition, dataCache);
+ } else {
+ milvusClient.upsert(collection, partition, dataCache);
+ }
+ dataCache = new ArrayList<>(batchSize);
}
-}
+
+ public int getDataCacheSize() {
+ return dataCache.size();
+ }
+
+ private JsonObject convertByType(List milvusColumnMeta, Record record) {
+ JsonObject data = new JsonObject();
+ Gson gson = new Gson();
+ for (int i = 0; i < record.getColumnNumber(); i++) {
+ MilvusColumn milvusColumn = milvusColumnMeta.get(i);
+ DataType fieldType = milvusColumn.getMilvusTypeEnum();
+ String fieldName = milvusColumn.getName();
+ Column column = record.getColumn(i);
+ try {
+ Object field = convertToMilvusField(fieldType, column, milvusColumn);
+ data.add(fieldName, gson.toJsonTree(field));
+ } catch (Exception e) {
+ log.error("parse error for column: {} errorMessage: {}", fieldName, e.getMessage(), e);
+ throw e;
+ }
+ }
+ return data;
+ }
+
+ //值需要跟这里匹配:io.milvus.param.ParamUtils#checkFieldData(io.milvus.param.collection.FieldType, java.util.List>, boolean)
+ private Object convertToMilvusField(DataType type, Column column, MilvusColumn milvusColumn) {
+ if (column.getRawData() == null) {
+ return null;
+ }
+ switch (type) {
+ case Int8:
+ case Int16:
+ case Int32:
+ case Int64:
+ return column.asLong();
+ case Float:
+ case Double:
+ return column.asDouble();
+ case String:
+ case VarChar:
+ return column.asString();
+ case Bool:
+ return column.asBoolean();
+ case BFloat16Vector:
+ JSONArray bFloat16ArrayJson = JSON.parseArray(column.asString());
+ List bfloat16Vector = new ArrayList<>();
+ for (int i = 0; i < bFloat16ArrayJson.size(); i++) {
+ Float value = Float.parseFloat(bFloat16ArrayJson.getString(i));
+ bfloat16Vector.add(value);
+ }
+ BFloat16Vec bFloat16Vec = new BFloat16Vec(bfloat16Vector);
+ ByteBuffer byteBuffer = (ByteBuffer) bFloat16Vec.getData();
+ return byteBuffer.array();
+ case Float16Vector:
+ JSONArray float16ArrayJson = JSON.parseArray(column.asString());
+ List float16Vector = new ArrayList<>();
+ for (int i = 0; i < float16ArrayJson.size(); i++) {
+ Float floatValue = Float.parseFloat(float16ArrayJson.getString(i));
+ float16Vector.add(floatValue);
+ }
+ Float16Vec float16Vec = new Float16Vec(float16Vector);
+ ByteBuffer data = (ByteBuffer) float16Vec.getData();
+ return data.array();
+ case BinaryVector:
+ return column.asBytes();
+ case FloatVector:
+ JSONArray arrayJson = JSON.parseArray(column.asString());
+ return arrayJson.stream().map(item -> Float.parseFloat(String.valueOf(item))).collect(Collectors.toList());
+ case SparseFloatVector:
+ //[3:0.5, 24:0.8, 76:0.2]
+ try {
+ JSONArray sparseFloatArray = JSON.parseArray(column.asString());
+ TreeMap mapValue = new TreeMap<>();
+ for (int i = 0; i < sparseFloatArray.size(); i++) {
+ String value = sparseFloatArray.getString(i);
+ String[] split = value.split(":");
+ Long key = Long.parseLong(split[0]);
+ Float val = Float.parseFloat(split[1]);
+ mapValue.put(key, val);
+ }
+ return mapValue;
+ } catch (Exception e) {
+ log.error("parse column[{}] SparseFloatVector value error, value should like [3:0.5, 24:0.8, 76:0.2], but get:{}", milvusColumn.getName(), column.asString());
+ throw e;
+ }
+ case JSON:
+ return column.asString();
+ case Array:
+ JSONArray parseArray = JSON.parseArray(column.asString());
+ return parseArray.stream().map(item -> String.valueOf(item)).collect(Collectors.toList());
+ default:
+ throw new RuntimeException(String.format("Unsupported data type[%s]", type));
+ }
+ }
+}
\ No newline at end of file
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java
new file mode 100644
index 00000000..1bf4743b
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java
@@ -0,0 +1,95 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import java.util.List;
+
+import com.alibaba.datax.common.util.Configuration;
+
+import com.google.gson.JsonObject;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DropCollectionReq;
+import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.partition.request.CreatePartitionReq;
+import io.milvus.v2.service.partition.request.HasPartitionReq;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.UpsertReq;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @author ziming(子茗)
+ * @date 12/27/24
+ * @description
+ */
+@Slf4j
+public class MilvusClient {
+ private MilvusClientV2 milvusClientV2;
+
+ public MilvusClient(Configuration conf) {
+ // connect to milvus
+ ConnectConfig connectConfig = ConnectConfig.builder().uri(conf.getString(KeyConstant.ENDPOINT)).build();
+ String token = null;
+ if (conf.getString(KeyConstant.TOKEN) != null) {
+ token = conf.getString(KeyConstant.TOKEN);
+ } else {
+ token = conf.getString(KeyConstant.USERNAME) + ":" + conf.getString(KeyConstant.PASSWORD);
+ }
+ connectConfig.setToken(token);
+ String database = conf.getString(KeyConstant.DATABASE);
+ if (StringUtils.isNotBlank(database)) {
+ log.info("use database {}", database);
+ connectConfig.setDbName(conf.getString(KeyConstant.DATABASE));
+ }
+ Integer connectTimeOut = conf.getInt(KeyConstant.CONNECT_TIMEOUT_MS);
+ if (connectTimeOut != null) {
+ connectConfig.setConnectTimeoutMs(connectTimeOut);
+ }
+ this.milvusClientV2 = new MilvusClientV2(connectConfig);
+ }
+
+ public void upsert(String collection, String partition, List data) {
+ UpsertReq upsertReq = UpsertReq.builder().collectionName(collection).data(data).build();
+ if (StringUtils.isNotEmpty(partition)) {
+ upsertReq.setPartitionName(partition);
+ }
+ milvusClientV2.upsert(upsertReq);
+ }
+
+ public void insert(String collection, String partition, List data) {
+ InsertReq insertReq = InsertReq.builder().collectionName(collection).data(data).build();
+ if (StringUtils.isNotEmpty(partition)) {
+ insertReq.setPartitionName(partition);
+ }
+ milvusClientV2.insert(insertReq);
+ }
+
+ public Boolean hasCollection(String collection) {
+ HasCollectionReq build = HasCollectionReq.builder().collectionName(collection).build();
+ return milvusClientV2.hasCollection(build);
+ }
+
+ public void createCollection(String collection, CreateCollectionReq.CollectionSchema schema) {
+ CreateCollectionReq createCollectionReq = CreateCollectionReq.builder().collectionName(collection).collectionSchema(schema).build();
+ milvusClientV2.createCollection(createCollectionReq);
+ }
+
+ public void dropCollection(String collection) {
+ DropCollectionReq request = DropCollectionReq.builder().collectionName(collection).build();
+ milvusClientV2.dropCollection(request);
+ }
+ public Boolean hasPartition(String collection, String partition) {
+ HasPartitionReq hasPartitionReq = HasPartitionReq.builder().collectionName(collection).partitionName(partition).build();
+ return milvusClientV2.hasPartition(hasPartitionReq);
+ }
+
+ public void createPartition(String collectionName, String partitionName) {
+ CreatePartitionReq createPartitionReq = CreatePartitionReq.builder().collectionName(collectionName).partitionName(partitionName).build();
+ milvusClientV2.createPartition(createPartitionReq);
+ }
+
+ public void close() {
+ log.info("Closing Milvus client");
+ milvusClientV2.close();
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java
new file mode 100644
index 00000000..06070248
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java
@@ -0,0 +1,112 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import io.milvus.v2.common.DataType;
+
+import java.util.Arrays;
+
+/**
+ * @author ziming(子茗)
+ * @date 12/27/24
+ * @description
+ */
+public class MilvusColumn {
+ private String name;
+ private String type;
+ private DataType milvusTypeEnum;
+ private Boolean isPrimaryKey;
+ private Integer dimension;
+ private Boolean isPartitionKey;
+ private Integer maxLength;
+ private Boolean isAutoId;
+ private Integer maxCapacity;
+ private String elementType;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ for (DataType item : DataType.values()) {
+ if (item.name().equalsIgnoreCase(type)) {
+ this.milvusTypeEnum = item;
+ break;
+ }
+ }
+ if (this.milvusTypeEnum == null) {
+ throw new RuntimeException("Unsupported type: " + type + " supported types: " + Arrays.toString(DataType.values()));
+ }
+ }
+
+ public Integer getDimension() {
+ return dimension;
+ }
+
+ public void setDimension(Integer dimension) {
+ this.dimension = dimension;
+ }
+
+ public Integer getMaxLength() {
+ return maxLength;
+ }
+
+ public void setMaxLength(Integer maxLength) {
+ this.maxLength = maxLength;
+ }
+
+ public Boolean getPrimaryKey() {
+ return isPrimaryKey;
+ }
+
+ public Boolean getPartitionKey() {
+ return isPartitionKey;
+ }
+
+ public void setPartitionKey(Boolean partitionKey) {
+ isPartitionKey = partitionKey;
+ }
+
+ public void setPrimaryKey(Boolean primaryKey) {
+ isPrimaryKey = primaryKey;
+ }
+
+ public Boolean getAutoId() {
+ return isAutoId;
+ }
+
+ public void setAutoId(Boolean autoId) {
+ isAutoId = autoId;
+ }
+
+ public Integer getMaxCapacity() {
+ return maxCapacity;
+ }
+
+ public void setMaxCapacity(Integer maxCapacity) {
+ this.maxCapacity = maxCapacity;
+ }
+
+ public String getElementType() {
+ return elementType;
+ }
+
+ public void setElementType(String elementType) {
+ this.elementType = elementType;
+ }
+
+ public DataType getMilvusTypeEnum() {
+ return milvusTypeEnum;
+ }
+
+ public void setMilvusTypeEnum(DataType milvusTypeEnum) {
+ this.milvusTypeEnum = milvusTypeEnum;
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java
new file mode 100644
index 00000000..84e296f4
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java
@@ -0,0 +1,102 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import java.util.List;
+
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.writer.milvuswriter.enums.SchemaCreateModeEnum;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.collection.request.AddFieldReq;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import lombok.extern.slf4j.Slf4j;
+
+import static io.milvus.v2.common.DataType.valueOf;
+
+/**
+ * @author ziming(子茗)
+ * @date 12/27/24
+ * @description
+ */
+@Slf4j
+public class MilvusCreateCollection {
+
+ private Configuration config;
+
+ MilvusCreateCollection(Configuration originalConfig) {
+ this.config = originalConfig;
+ }
+
+ public void createCollectionByMode(MilvusClient milvusClient) {
+ String collection = this.config.getString(KeyConstant.COLLECTION);
+ SchemaCreateModeEnum schemaCreateMode = SchemaCreateModeEnum.getEnum(this.config.getString(KeyConstant.SCHAME_CREATE_MODE));
+ List milvusColumnMeta = JSON.parseObject(config.getString(KeyConstant.COLUMN), new TypeReference>() {
+ });
+ Boolean hasCollection = milvusClient.hasCollection(collection);
+ if (schemaCreateMode == SchemaCreateModeEnum.CREATEIFNOTEXIT) {
+ // create collection
+ if (hasCollection) {
+ log.info("collection[{}] already exists, continue create", collection);
+ } else {
+ log.info("creating collection[{}]", collection);
+ CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
+ milvusClient.createCollection(collection, collectionSchema);
+ }
+ } else if (schemaCreateMode == SchemaCreateModeEnum.RECREATE) {
+ if (hasCollection) {
+ log.info("collection already exist, try to drop");
+ milvusClient.dropCollection(collection);
+ }
+ log.info("creating collection[{}]", collection);
+ CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta);
+ milvusClient.createCollection(collection, collectionSchema);
+ } else if (schemaCreateMode == SchemaCreateModeEnum.IGNORE && !hasCollection) {
+ log.error("Collection not exist, throw exception");
+ throw new RuntimeException("Collection not exist");
+ }
+ }
+
+ private CreateCollectionReq.CollectionSchema prepareCollectionSchema(List milvusColumnMeta) {
+ CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
+ for (int i = 0; i < milvusColumnMeta.size(); i++) {
+ MilvusColumn milvusColumn = milvusColumnMeta.get(i);
+ AddFieldReq addFieldReq = AddFieldReq.builder()
+ .fieldName(milvusColumn.getName())
+ .dataType(valueOf(milvusColumn.getType()))
+ .build();
+ if (milvusColumn.getPrimaryKey() != null) {
+ addFieldReq.setIsPrimaryKey(milvusColumn.getPrimaryKey());
+ }
+ if (milvusColumn.getDimension() != null) {
+ addFieldReq.setDimension(milvusColumn.getDimension());
+ }
+ if (milvusColumn.getPartitionKey() != null) {
+ addFieldReq.setIsPartitionKey(milvusColumn.getPartitionKey());
+ }
+ if (milvusColumn.getMaxLength() != null) {
+ addFieldReq.setMaxLength(milvusColumn.getMaxLength());
+ }
+ if (milvusColumn.getAutoId() != null) {
+ addFieldReq.setAutoID(milvusColumn.getAutoId());
+ }
+ if (milvusColumn.getMaxCapacity() != null) {
+ addFieldReq.setMaxCapacity(milvusColumn.getMaxCapacity());
+ }
+ if (milvusColumn.getElementType() != null) {
+ addFieldReq.setElementType(DataType.valueOf(milvusColumn.getElementType()));
+ }
+ try {
+ collectionSchema.addField(addFieldReq);
+ } catch (Exception e) {
+ log.error("add filed[{}] error", milvusColumn.getName());
+ throw e;
+ }
+ }
+ Boolean enableDynamic = config.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA);
+ if (enableDynamic != null) {
+ collectionSchema.setEnableDynamicField(enableDynamic);
+ }
+ return collectionSchema;
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
deleted file mode 100644
index 390f95e5..00000000
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.alibaba.datax.plugin.writer.milvuswriter;
-
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.fastjson2.JSONArray;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.milvus.v2.common.DataType;
-import static io.milvus.v2.common.DataType.*;
-import io.milvus.v2.service.collection.request.AddFieldReq;
-import io.milvus.v2.service.collection.request.CreateCollectionReq;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.stream.Collectors;
-
-public class MilvusSinkConverter {
- public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) {
- JsonObject data = new JsonObject();
- Gson gson = new Gson();
- for(int i = 0; i < record.getColumnNumber(); i++) {
- String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE);
- String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME);
- Object rawData = record.getColumn(i).getRawData();
- Object field = convertToMilvusField(fieldType, rawData);
- data.add(fieldName, gson.toJsonTree(field));
- }
- return data;
- }
-
- private Object convertToMilvusField(String type, Object rawData) {
- Gson gson = new Gson();
- switch (valueOf(type)) {
- case Int32:
- return Integer.parseInt(rawData.toString());
- case Int64:
- return Long.parseLong(rawData.toString());
- case Float:
- return java.lang.Float.parseFloat(rawData.toString());
- case String:
- case VarChar:
- return rawData.toString();
- case Bool:
- return Boolean.parseBoolean(rawData.toString());
- case FloatVector:
- java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
- return Arrays.stream(floats).collect(Collectors.toList());
- case BinaryVector:
- java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
- return BufferUtils.toByteBuffer(binarys);
- case Float16Vector:
- case BFloat16Vector:
- // all these data is byte format in milvus
- ByteBuffer binaryVector = (ByteBuffer) rawData;
- return gson.toJsonTree(binaryVector.array());
- case SparseFloatVector:
- return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject();
- default:
- throw new RuntimeException("Unsupported data type");
- }
- }
-
- public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) {
- CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
- for (int i = 0; i < milvusColumnMeta.size(); i++) {
- AddFieldReq addFieldReq = AddFieldReq.builder()
- .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME))
- .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE)))
- .build();
- if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) {
- addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY));
- }
- if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) {
- addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION));
- }
- if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) {
- addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY));
- }
- if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) {
- addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
- }
- collectionSchema.addField(addFieldReq);
- }
- return collectionSchema;
- }
-}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
index c9b5a1bc..764a9d94 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
@@ -1,29 +1,49 @@
package com.alibaba.datax.plugin.writer.milvuswriter;
import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
-import com.google.gson.JsonObject;
-import io.milvus.v2.client.ConnectConfig;
-import io.milvus.v2.client.MilvusClientV2;
-import io.milvus.v2.common.DataType;
-import io.milvus.v2.service.collection.request.AddFieldReq;
-import io.milvus.v2.service.collection.request.CreateCollectionReq;
-import io.milvus.v2.service.collection.request.HasCollectionReq;
-import io.milvus.v2.service.vector.request.UpsertReq;
+
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
@Slf4j
public class MilvusWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ originalConfig.getNecessaryValue(KeyConstant.ENDPOINT, MilvusWriterErrorCode.REQUIRED_VALUE);
+ originalConfig.getNecessaryValue(KeyConstant.COLUMN, MilvusWriterErrorCode.REQUIRED_VALUE);
+ originalConfig.getNecessaryValue(KeyConstant.COLLECTION, MilvusWriterErrorCode.REQUIRED_VALUE);
+ }
+
+ @Override
+ public void prepare() {
+ //collection create process
+ MilvusClient milvusClient = new MilvusClient(originalConfig);
+ try {
+ MilvusCreateCollection milvusCreateCollection = new MilvusCreateCollection(originalConfig);
+ milvusCreateCollection.createCollectionByMode(milvusClient);
+ String collection = originalConfig.getString(KeyConstant.COLLECTION);
+ String partition = originalConfig.getString(KeyConstant.PARTITION);
+ if (partition != null && !milvusClient.hasPartition(collection, partition)) {
+ log.info("collection[{}] not contain partition[{}],try to create partition", collection, partition);
+ milvusClient.createPartition(collection, partition);
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(MilvusWriterErrorCode.MILVUS_COLLECTION, e.getMessage(), e);
+ } finally {
+ milvusClient.close();
+ }
+ }
+
/**
* 切分任务。
*
@@ -31,100 +51,60 @@ public class MilvusWriter extends Writer {
*/
@Override
public List split(int mandatoryNumber) {
- List configList = new ArrayList();
- for(int i = 0; i < mandatoryNumber; i++) {
+ List configList = new ArrayList<>();
+ for (int i = 0; i < mandatoryNumber; i++) {
configList.add(this.originalConfig.clone());
}
return configList;
}
- @Override
- public void init() {
- this.originalConfig = super.getPluginJobConf();
- }
-
@Override
public void destroy() {
}
}
+
public static class Task extends Writer.Task {
- private MilvusClientV2 milvusClientV2;
-
- private MilvusSinkConverter milvusSinkConverter;
private MilvusBufferWriter milvusBufferWriter;
-
- private String collection = null;
- private JSONArray milvusColumnMeta;
-
- private String schemaCreateMode = "createWhenTableNotExit";
-
- @Override
- public void startWrite(RecordReceiver lineReceiver) {
- Record record = lineReceiver.getFromReader();
- JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
- milvusBufferWriter.write(data);
- if(milvusBufferWriter.needCommit()){
- log.info("Reached buffer limit, Committing data");
- milvusBufferWriter.commit();
- log.info("Data committed");
- }
- }
+ MilvusClient milvusClient;
@Override
public void init() {
log.info("Initializing Milvus writer");
// get configuration
Configuration writerSliceConfig = this.getPluginJobConf();
- this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
- this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN));
- this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ?
- "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode);
- int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
- log.info("Collection:{}", this.collection);
- // connect to milvus
- ConnectConfig connectConfig = ConnectConfig.builder()
- .uri(writerSliceConfig.getString(KeyConstant.URI))
- .token(writerSliceConfig.getString(KeyConstant.TOKEN))
- .build();
- if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
- log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
- connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
- }
- this.milvusClientV2 = new MilvusClientV2(connectConfig);
- this.milvusSinkConverter = new MilvusSinkConverter();
- this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize);
+ this.milvusClient = new MilvusClient(writerSliceConfig);
+ this.milvusBufferWriter = new MilvusBufferWriter(this.milvusClient, writerSliceConfig);
log.info("Milvus writer initialized");
}
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ Record record = null;
+ while ((record = lineReceiver.getFromReader()) != null) {
+ milvusBufferWriter.add(record, this.getTaskPluginCollector());
+ if (milvusBufferWriter.needCommit()) {
+ log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
+ milvusBufferWriter.commit();
+ }
+ }
+ if (milvusBufferWriter.getDataCacheSize() > 0) {
+ log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize());
+ milvusBufferWriter.commit();
+ }
+ }
+
@Override
public void prepare() {
super.prepare();
- Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
- if (!hasCollection) {
- log.info("Collection not exist");
- if (schemaCreateMode.equals("createWhenTableNotExit")) {
- // create collection
- log.info("Creating collection:{}", this.collection);
- CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta);
-
- CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
- .collectionName(collection)
- .collectionSchema(collectionSchema)
- .build();
- milvusClientV2.createCollection(createCollectionReq);
- } else if (schemaCreateMode.equals("exception")) {
- log.error("Collection not exist, throw exception");
- throw new RuntimeException("Collection not exist");
- }
- }
}
@Override
public void destroy() {
- log.info("Closing Milvus writer, committing data and closing connection");
- this.milvusBufferWriter.commit();
- this.milvusClientV2.close();
+ if (this.milvusClient != null) {
+ this.milvusClient.close();
+ }
}
}
-}
+}
\ No newline at end of file
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java
new file mode 100644
index 00000000..7264160f
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java
@@ -0,0 +1,35 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * @author ziming(子茗)
+ * @date 12/27/24
+ * @description
+ */
+public enum MilvusWriterErrorCode implements ErrorCode {
+ MILVUS_COLLECTION("MilvusWriter-01", "collection process error"),
+ REQUIRED_VALUE("MilvusWriter-02", "miss required parameter");
+ private final String code;
+ private final String description;
+
+ MilvusWriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java
new file mode 100644
index 00000000..b8c88bf1
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java
@@ -0,0 +1,34 @@
+package com.alibaba.datax.plugin.writer.milvuswriter.enums;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author ziming(子茗)
+ * @date 12/27/24
+ * @description
+ */
+@Slf4j
+public enum SchemaCreateModeEnum {
+ CREATEIFNOTEXIT("createIfNotExist"),
+ IGNORE("ignore"),
+ RECREATE("recreate");
+ String type;
+
+ SchemaCreateModeEnum(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static SchemaCreateModeEnum getEnum(String name) {
+ for (SchemaCreateModeEnum value : SchemaCreateModeEnum.values()) {
+ if (value.getType().equalsIgnoreCase(name)) {
+ return value;
+ }
+ }
+ log.info("use default CREATEIFNOTEXIT schame create mode");
+ return CREATEIFNOTEXIT;
+ }
+}
\ No newline at end of file
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java
new file mode 100644
index 00000000..0098dbad
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java
@@ -0,0 +1,28 @@
+package com.alibaba.datax.plugin.writer.milvuswriter.enums;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public enum WriteModeEnum {
+ INSERT("insert"),
+ UPSERT("upsert");
+ String mode;
+
+ public String getMode() {
+ return mode;
+ }
+
+ WriteModeEnum(String mode) {
+ this.mode = mode;
+ }
+
+ public static WriteModeEnum getEnum(String mode) {
+ for (WriteModeEnum writeModeEnum : WriteModeEnum.values()) {
+ if (writeModeEnum.getMode().equalsIgnoreCase(mode)) {
+ return writeModeEnum;
+ }
+ }
+ log.info("use default write mode upsert");
+ return UPSERT;
+ }
+}
diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json
index d4ba4bf1..33bd941a 100644
--- a/milvuswriter/src/main/resources/plugin_job_template.json
+++ b/milvuswriter/src/main/resources/plugin_job_template.json
@@ -1,15 +1,12 @@
{
- "name": "mongodbwriter",
+ "name": "milvuswriter",
"parameter": {
- "address": [],
- "userName": "",
- "userPassword": "",
- "dbName": "",
- "collectionName": "",
+ "endpoint": "",
+ "username": "",
+ "password": "",
+ "database": "",
+ "collection": "",
"column": [],
- "upsertInfo": {
- "isUpsert": "",
- "upsertKey": ""
- }
+ "enableDynamicSchema": ""
}
}
\ No newline at end of file
diff --git a/package.xml b/package.xml
index 624109f7..c0f9cdf4 100644
--- a/package.xml
+++ b/package.xml
@@ -546,5 +546,12 @@
datax
+
+ milvuswriter/target/datax/
+
+ **/*.*
+
+ datax
+
diff --git a/stream2milvus.json b/stream2milvus.json
new file mode 100644
index 00000000..e69de29b