From 906bf3ded923e7d87df34c952a87aa86d43321fb Mon Sep 17 00:00:00 2001 From: Nian Liu Date: Wed, 27 Nov 2024 17:52:09 +0800 Subject: [PATCH 1/4] add milvus writer plugin --- milvuswriter/pom.xml | 109 +++++++++++++++ milvuswriter/src/main/assembly/package.xml | 36 +++++ .../writer/milvuswriter/BufferUtils.java | 130 ++++++++++++++++++ .../writer/milvuswriter/KeyConstant.java | 20 +++ .../milvuswriter/MilvusBufferWriter.java | 43 ++++++ .../milvuswriter/MilvusSinkConverter.java | 86 ++++++++++++ .../writer/milvuswriter/MilvusWriter.java | 130 ++++++++++++++++++ milvuswriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 15 ++ pom.xml | 1 + 10 files changed, 576 insertions(+) create mode 100644 milvuswriter/pom.xml create mode 100644 milvuswriter/src/main/assembly/package.xml create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java create mode 100644 milvuswriter/src/main/resources/plugin.json create mode 100644 milvuswriter/src/main/resources/plugin_job_template.json diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml new file mode 100644 index 00000000..b889cbe4 --- /dev/null +++ b/milvuswriter/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvuswriter + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml new file mode 100644 index 00000000..62357b4a --- /dev/null +++ b/milvuswriter/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/milvuswriter + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/milvuswriter + + + + + + false + plugin/writer/milvuswriter/libs + runtime + + + diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java new file mode 100644 index 00000000..89153a6a --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java new file mode 100644 index 00000000..cc636404 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String AUTO_ID = "autoId"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; + public static final String BATCH_SIZE = "batchSize"; + public static final String COLUMN = "column"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NAME = "name"; + public static final String VECTOR_DIMENSION = "dimension"; + public static final String IS_PRIMARY_KEY = "isPrimaryKey"; +// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception" + public static final String schemaCreateMode = "schemaCreateMode"; + public static final String IS_PARTITION_KEY = "isPartitionKey"; + public static final String MAX_LENGTH = "maxLength"; +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java new file mode 100644 index 00000000..da686af2 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -0,0 +1,43 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusBufferWriter { + + private final MilvusClientV2 milvusClientV2; + private final String collection; + private final Integer batchSize; + private List dataCache; + + public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){ + this.milvusClientV2 = milvusClientV2; + this.collection = collection; + this.batchSize = batchSize; + this.dataCache = new ArrayList<>(); + } + public void write(JsonObject data){ + dataCache.add(data); + } + public Boolean needCommit(){ + return dataCache.size() >= batchSize; + } + public void commit(){ + if(dataCache.isEmpty()){ + log.info("dataCache is empty, skip commit"); + return; + } + UpsertReq upsertReq = UpsertReq.builder() + .collectionName(collection) + .data(dataCache) + .build(); + milvusClientV2.upsert(upsertReq); + dataCache = new ArrayList<>(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java new file mode 100644 index 00000000..390f95e5 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.milvus.v2.common.DataType; +import static io.milvus.v2.common.DataType.*; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class MilvusSinkConverter { + public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for(int i = 0; i < record.getColumnNumber(); i++) { + String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); + String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); + Object rawData = record.getColumn(i).getRawData(); + Object field = convertToMilvusField(fieldType, rawData); + data.add(fieldName, gson.toJsonTree(field)); + } + return data; + } + + private Object convertToMilvusField(String type, Object rawData) { + Gson gson = new Gson(); + switch (valueOf(type)) { + case Int32: + return Integer.parseInt(rawData.toString()); + case Int64: + return Long.parseLong(rawData.toString()); + case Float: + return java.lang.Float.parseFloat(rawData.toString()); + case String: + case VarChar: + return rawData.toString(); + case Bool: + return Boolean.parseBoolean(rawData.toString()); + case FloatVector: + java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); + return Arrays.stream(floats).collect(Collectors.toList()); + case BinaryVector: + java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); + return BufferUtils.toByteBuffer(binarys); + case Float16Vector: + case BFloat16Vector: + // all these data is byte format in milvus + ByteBuffer binaryVector = (ByteBuffer) rawData; + return gson.toJsonTree(binaryVector.array()); + case SparseFloatVector: + return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); + default: + throw new RuntimeException("Unsupported data type"); + } + } + + public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME)) + .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE))) + .build(); + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) { + addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) { + addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) { + addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + collectionSchema.addField(addFieldReq); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java new file mode 100644 index 00000000..c9b5a1bc --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -0,0 +1,130 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusWriter extends Writer { + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Writer.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSinkConverter milvusSinkConverter; + private MilvusBufferWriter milvusBufferWriter; + + private String collection = null; + private JSONArray milvusColumnMeta; + + private String schemaCreateMode = "createWhenTableNotExit"; + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = lineReceiver.getFromReader(); + JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); + milvusBufferWriter.write(data); + if(milvusBufferWriter.needCommit()){ + log.info("Reached buffer limit, Committing data"); + milvusBufferWriter.commit(); + log.info("Data committed"); + } + } + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); + this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ? + "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode); + int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSinkConverter = new MilvusSinkConverter(); + this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.info("Collection not exist"); + if (schemaCreateMode.equals("createWhenTableNotExit")) { + // create collection + log.info("Creating collection:{}", this.collection); + CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); + + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() + .collectionName(collection) + .collectionSchema(collectionSchema) + .build(); + milvusClientV2.createCollection(createCollectionReq); + } else if (schemaCreateMode.equals("exception")) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + } + + @Override + public void destroy() { + log.info("Closing Milvus writer, committing data and closing connection"); + this.milvusBufferWriter.commit(); + this.milvusClientV2.close(); + } + } +} diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json new file mode 100644 index 00000000..8b912309 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvuswriter", + "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter", + "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.", + "developer": "nianliuu" +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..d4ba4bf1 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,15 @@ +{ + "name": "mongodbwriter", + "parameter": { + "address": [], + "userName": "", + "userPassword": "", + "dbName": "", + "collectionName": "", + "column": [], + "upsertInfo": { + "isUpsert": "", + "upsertKey": "" + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index c7f43f17..1b364a75 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ adbmysqlwriter sybasewriter neo4jwriter + milvuswriter plugin-rdbms-util plugin-unstructured-storage-util From b3c1c07b50f4564eceabe6ac5da66ea028097b86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=90=E8=8C=97?= Date: Sat, 8 Feb 2025 14:25:26 +0800 Subject: [PATCH 2/4] to #62452156 feat:add milvus writer --- milvusjob.json | 0 milvuswriter/doc/milvuswriter.md | 273 ++++++++++++++++++ milvuswriter/pom.xml | 66 +++-- .../writer/milvuswriter/BufferUtils.java | 130 --------- .../writer/milvuswriter/KeyConstant.java | 21 +- .../milvuswriter/MilvusBufferWriter.java | 163 +++++++++-- .../writer/milvuswriter/MilvusClient.java | 95 ++++++ .../writer/milvuswriter/MilvusColumn.java | 112 +++++++ .../milvuswriter/MilvusCreateCollection.java | 102 +++++++ .../milvuswriter/MilvusSinkConverter.java | 86 ------ .../writer/milvuswriter/MilvusWriter.java | 136 ++++----- .../milvuswriter/MilvusWriterErrorCode.java | 35 +++ .../enums/SchemaCreateModeEnum.java | 34 +++ .../milvuswriter/enums/WriteModeEnum.java | 28 ++ .../main/resources/plugin_job_template.json | 17 +- package.xml | 7 + stream2milvus.json | 0 17 files changed, 944 insertions(+), 361 deletions(-) create mode 100644 milvusjob.json create mode 100644 milvuswriter/doc/milvuswriter.md delete mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java delete mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java create mode 100644 stream2milvus.json diff --git a/milvusjob.json b/milvusjob.json new file mode 100644 index 00000000..e69de29b diff --git a/milvuswriter/doc/milvuswriter.md b/milvuswriter/doc/milvuswriter.md new file mode 100644 index 00000000..3788c62a --- /dev/null +++ b/milvuswriter/doc/milvuswriter.md @@ -0,0 +1,273 @@ +# DataX milvuswriter + + +--- + + +## 1 快速介绍 + +milvuswriter 插件实现了写入数据到 milvus集合的功能; 面向ETL开发工程师,使用 milvuswriter 从数仓导入数据到 milvus, 同时 milvuswriter 亦可以作为数据迁移工具为DBA等用户提供服务。 + + +## 2 实现原理 + +milvuswriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 `upsert/insert `方式写入数据到milvus, 并通过batchSize累积的方式进行数据提交。 +
+ + 注意:upsert写入方式(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。 + insert写入方式: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 这里提供一份从内存产生数据导入到 milvus的配置样例。 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": 1, + "type": "long" + }, + { + "value": "[1.1,1.2,1.3]", + "type": "string" + }, + { + "value": 100, + "type": "long" + }, + { + "value": 200, + "type": "long" + }, + { + "value": 300, + "type": "long" + }, + { + "value": 3.14159, + "type": "double" + }, + { + "value": 3.1415926, + "type": "double" + }, + { + "value": "testvarcharvalue", + "type": "string" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "[1.123,1.2456,1.3789]", + "type": "string" + }, + { + "value": "[2.123,2.2456,2.3789]", + "type": "string" + }, + { + "value": "12345678", + "type": "string" + }, + { + "value": "{\"a\":1,\"b\":2,\"c\":3}", + "type": "string" + }, + { + "value": "[1,2,3,4]", + "type": "string" + } + ], + "sliceRecordCount": 1 + } + }, + "writer": { + "parameter": { + "schemaCreateMode": "createIfNotExist", + "connectTimeoutMs": 60000, + "writeMode": "upsert", + "collection": "demo01", + "type": "milvus", + "token": "xxxxxxx", + "endpoint": "https://xxxxxxxx.com:443", + "batchSize": 1024, + "column": [ + { + "name": "id", + "type": "Int64", + "primaryKey": "true" + }, + { + "name": "floatvector", + "type": "FloatVector", + "dimension": "3" + }, + { + "name": "int8col", + "type": "Int8" + }, + { + "name": "int16col", + "type": "Int16" + }, + { + "name": "int32col", + "type": "Int32" + }, + { + "name": "floatcol", + "type": "Float" + }, + { + "name": "doublecol", + "type": "Double" + }, + { + "name": "varcharcol", + "type": "VarChar" + }, + { + "name": "boolcol", + "type": "Bool" + }, + { + "name": "bfloat16vectorcol", + "type": "BFloat16Vector", + "dimension": "3" + }, + { + "name": "float16vectorcol", + "type": "Float16Vector", + "dimension": "3" + }, + { + "name": "binaryvectorcol", + "type": "BinaryVector", + "dimension": "64" + }, + { + "name": "jsoncol", + "type": "JSON" + }, + { + "name": "intarraycol", + "maxCapacity": "8", + "type": "Array", + "elementType": "Int32" + } + ] + }, + "name": "milvuswriter" + } + } + ], + "setting": { + "errorLimit": { + "record": "0" + }, + "speed": { + "concurrent": 2, + "channel": 2 + } + } + } +} + +``` + + +### 3.2 参数说明 + +* **endpoint** + * 描述:milvus数据库的连接信息,包含地址和端口,例如https://xxxxxxxx.com:443 + + 注意:1、在一个数据库上只能配置一个 endpoint 值 + 2、一个milvus 写入任务仅能配置一个 endpoint + * 必选:是
+ * 默认值:无
+* *schemaCreateMode* + * 描述: 集合创建的模式,同步时milvus集合不存在的处理方式, 根据配置的column属性进行创建 + * 取值 + * createIfNotExist: 如果集合不存在,则创建集合,如果集合存在,则不执行任何操作 + * ignore: 如果集合不存在,任务异常报错,如果集合存在,则不执行任何操作 + * recreate: 如果集合不存在,则创建集合,如果集合存在,则删除集合重建集合 + * 必选:否
+ * 默认值:createIfNotExist
+* **connectTimeoutMs** + * 描述:与milvus交互是客户端的连接超时时间,单位毫秒
+ * 必选:否
+ * 默认值:10000
+* **token** + * 描述:milvus实例认证的token秘钥,与username认证方式二选一配置
+ * 必选:否
+ * 默认值:无
+* **username** + * 描述:目的milvus数据库的用户名, 与token二选一配置
+ * 必选:否
+ * 默认值:无
+* **password** + * 描述:目的milvus数据库的密码
+ * 必选:否
+ * 默认值:无
+* *writeMode* + * 描述: 写入milvus集合的写入方式 + * 取值 + * upsert(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。 + * insert: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。 + * 必选:是
+ * 默认值:upsert
+* **collection** + * 描述:目的集合名称。 只能配置一个milvus的集合名称。 + * 必选:是
+ * 默认值:无
+* **batchSize** + * 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与milvus的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
+ * 必选:否
+ * 默认值:1024
+ +* **column** + * 描述:目的集合需要写入数据的字段,字段内容用json格式描述,字段之间用英文逗号分隔。字段属性必填name、type, 其他属性在需要schemaCreateMode创建集合按需填入,例如: + + "column": [ + { + "name": "id", + "type": "Int64", + "primaryKey": "true" + }, + { + "name": "floatvector", + "type": "FloatVector", + "dimension": "3" + }] + * 必选:是
+ * 默认值:否
+### 3.3 支持同步milvus字段类型 + Bool, + Int8, + Int16, + Int32, + Int64, + Float, + Double, + String, + VarChar, + Array, + JSON, + BinaryVector, + FloatVector, + Float16Vector, + BFloat16Vector, + SparseFloatVector + diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml index b889cbe4..16c00560 100644 --- a/milvuswriter/pom.xml +++ b/milvuswriter/pom.xml @@ -16,32 +16,48 @@ official 1.8 - - - - guava - com.google.guava - 32.0.1-jre - - - - + + com.alibaba.fastjson2 + fastjson2 + 2.0.49 + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.projectlombok + lombok + 1.18.30 + + + guava + com.google.guava + 32.0.1-jre + io.milvus milvus-sdk-java - 2.4.8 + 2.5.2 - org.jetbrains.kotlin - kotlin-test-junit5 - 2.0.0 + org.mockito + mockito-core + 3.3.3 test - org.junit.jupiter - junit-jupiter - 5.10.0 + junit + junit + 4.11 test @@ -50,16 +66,16 @@ 2.0.0 - com.alibaba.datax - datax-common - 0.0.1-SNAPSHOT - compile + org.powermock + powermock-module-junit4 + 2.0.9 + test - org.projectlombok - lombok - 1.18.30 - provided + org.powermock + powermock-api-mockito2 + 2.0.9 + test diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java deleted file mode 100644 index 89153a6a..00000000 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.datax.plugin.writer.milvuswriter; - -import java.nio.Buffer; -import java.nio.ByteBuffer; - -public class BufferUtils { - - public static ByteBuffer toByteBuffer(Short[] shortArray) { - ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); - - for (Short value : shortArray) { - byteBuffer.putShort(value); - } - - // Compatible compilation and running versions are not consistent - // Flip the buffer to prepare for reading - ((Buffer) byteBuffer).flip(); - - return byteBuffer; - } - - public static Short[] toShortArray(ByteBuffer byteBuffer) { - Short[] shortArray = new Short[byteBuffer.capacity() / 2]; - - for (int i = 0; i < shortArray.length; i++) { - shortArray[i] = byteBuffer.getShort(); - } - - return shortArray; - } - - public static ByteBuffer toByteBuffer(Float[] floatArray) { - ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); - - for (float value : floatArray) { - byteBuffer.putFloat(value); - } - - ((Buffer) byteBuffer).flip(); - - return byteBuffer; - } - - public static Float[] toFloatArray(ByteBuffer byteBuffer) { - Float[] floatArray = new Float[byteBuffer.capacity() / 4]; - - for (int i = 0; i < floatArray.length; i++) { - floatArray[i] = byteBuffer.getFloat(); - } - - return floatArray; - } - - public static ByteBuffer toByteBuffer(Double[] doubleArray) { - ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); - - for (double value : doubleArray) { - byteBuffer.putDouble(value); - } - - ((Buffer) byteBuffer).flip(); - - return byteBuffer; - } - - public static Double[] toDoubleArray(ByteBuffer byteBuffer) { - Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; - - for (int i = 0; i < doubleArray.length; i++) { - doubleArray[i] = byteBuffer.getDouble(); - } - - return doubleArray; - } - - public static ByteBuffer toByteBuffer(Integer[] intArray) { - ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); - - for (int value : intArray) { - byteBuffer.putInt(value); - } - - ((Buffer) byteBuffer).flip(); - - return byteBuffer; - } - - public static Integer[] toIntArray(ByteBuffer byteBuffer) { - Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; - - for (int i = 0; i < intArray.length; i++) { - intArray[i] = byteBuffer.getInt(); - } - - return intArray; - } -} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java index cc636404..28f1ff13 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -1,20 +1,17 @@ package com.alibaba.datax.plugin.writer.milvuswriter; public class KeyConstant { - public static final String URI = "uri"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String ENDPOINT = "endpoint"; public static final String TOKEN = "token"; public static final String DATABASE = "database"; public static final String COLLECTION = "collection"; - public static final String AUTO_ID = "autoId"; - public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; public static final String BATCH_SIZE = "batchSize"; public static final String COLUMN = "column"; - public static final String COLUMN_TYPE = "type"; - public static final String COLUMN_NAME = "name"; - public static final String VECTOR_DIMENSION = "dimension"; - public static final String IS_PRIMARY_KEY = "isPrimaryKey"; -// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception" - public static final String schemaCreateMode = "schemaCreateMode"; - public static final String IS_PARTITION_KEY = "isPartitionKey"; - public static final String MAX_LENGTH = "maxLength"; -} + public static final String SCHAME_CREATE_MODE = "schemaCreateMode"; + public static final String WRITE_MODE = "writeMode"; + public static final String PARTITION = "partition"; + public static final String CONNECT_TIMEOUT_MS = "connectTimeoutMs"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java index da686af2..b78728e4 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -1,43 +1,166 @@ package com.alibaba.datax.plugin.writer.milvuswriter; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.plugin.writer.milvuswriter.enums.WriteModeEnum; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; import com.google.gson.JsonObject; -import io.milvus.v2.client.MilvusClientV2; -import io.milvus.v2.service.vector.request.UpsertReq; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.vector.request.data.BFloat16Vec; +import io.milvus.v2.service.vector.request.data.Float16Vec; import lombok.extern.slf4j.Slf4j; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; @Slf4j public class MilvusBufferWriter { - private final MilvusClientV2 milvusClientV2; + private final MilvusClient milvusClient; private final String collection; private final Integer batchSize; private List dataCache; + private List milvusColumnMeta; + private WriteModeEnum writeMode; + private String partition; - public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){ - this.milvusClientV2 = milvusClientV2; - this.collection = collection; - this.batchSize = batchSize; - this.dataCache = new ArrayList<>(); + public MilvusBufferWriter(MilvusClient milvusClient, Configuration writerSliceConfig) { + this.milvusClient = milvusClient; + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + this.dataCache = new ArrayList<>(batchSize); + this.milvusColumnMeta = JSON.parseObject(writerSliceConfig.getString(KeyConstant.COLUMN), new TypeReference>() { + }); + this.writeMode = WriteModeEnum.getEnum(writerSliceConfig.getString(KeyConstant.WRITE_MODE)); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION); } - public void write(JsonObject data){ - dataCache.add(data); + + public void add(Record record, TaskPluginCollector taskPluginCollector) { + try { + JsonObject data = this.convertByType(milvusColumnMeta, record); + dataCache.add(data); + } catch (Exception e) { + taskPluginCollector.collectDirtyRecord(record, String.format("parse record error errorMessage: %s", e.getMessage())); + } } - public Boolean needCommit(){ + + public Boolean needCommit() { return dataCache.size() >= batchSize; } - public void commit(){ - if(dataCache.isEmpty()){ + + public void commit() { + if (dataCache.isEmpty()) { log.info("dataCache is empty, skip commit"); return; } - UpsertReq upsertReq = UpsertReq.builder() - .collectionName(collection) - .data(dataCache) - .build(); - milvusClientV2.upsert(upsertReq); - dataCache = new ArrayList<>(); + if (writeMode == WriteModeEnum.INSERT) { + milvusClient.insert(collection, partition, dataCache); + } else { + milvusClient.upsert(collection, partition, dataCache); + } + dataCache = new ArrayList<>(batchSize); } -} + + public int getDataCacheSize() { + return dataCache.size(); + } + + private JsonObject convertByType(List milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for (int i = 0; i < record.getColumnNumber(); i++) { + MilvusColumn milvusColumn = milvusColumnMeta.get(i); + DataType fieldType = milvusColumn.getMilvusTypeEnum(); + String fieldName = milvusColumn.getName(); + Column column = record.getColumn(i); + try { + Object field = convertToMilvusField(fieldType, column, milvusColumn); + data.add(fieldName, gson.toJsonTree(field)); + } catch (Exception e) { + log.error("parse error for column: {} errorMessage: {}", fieldName, e.getMessage(), e); + throw e; + } + } + return data; + } + + //值需要跟这里匹配:io.milvus.param.ParamUtils#checkFieldData(io.milvus.param.collection.FieldType, java.util.List, boolean) + private Object convertToMilvusField(DataType type, Column column, MilvusColumn milvusColumn) { + if (column.getRawData() == null) { + return null; + } + switch (type) { + case Int8: + case Int16: + case Int32: + case Int64: + return column.asLong(); + case Float: + case Double: + return column.asDouble(); + case String: + case VarChar: + return column.asString(); + case Bool: + return column.asBoolean(); + case BFloat16Vector: + JSONArray bFloat16ArrayJson = JSON.parseArray(column.asString()); + List bfloat16Vector = new ArrayList<>(); + for (int i = 0; i < bFloat16ArrayJson.size(); i++) { + Float value = Float.parseFloat(bFloat16ArrayJson.getString(i)); + bfloat16Vector.add(value); + } + BFloat16Vec bFloat16Vec = new BFloat16Vec(bfloat16Vector); + ByteBuffer byteBuffer = (ByteBuffer) bFloat16Vec.getData(); + return byteBuffer.array(); + case Float16Vector: + JSONArray float16ArrayJson = JSON.parseArray(column.asString()); + List float16Vector = new ArrayList<>(); + for (int i = 0; i < float16ArrayJson.size(); i++) { + Float floatValue = Float.parseFloat(float16ArrayJson.getString(i)); + float16Vector.add(floatValue); + } + Float16Vec float16Vec = new Float16Vec(float16Vector); + ByteBuffer data = (ByteBuffer) float16Vec.getData(); + return data.array(); + case BinaryVector: + return column.asBytes(); + case FloatVector: + JSONArray arrayJson = JSON.parseArray(column.asString()); + return arrayJson.stream().map(item -> Float.parseFloat(String.valueOf(item))).collect(Collectors.toList()); + case SparseFloatVector: + //[3:0.5, 24:0.8, 76:0.2] + try { + JSONArray sparseFloatArray = JSON.parseArray(column.asString()); + TreeMap mapValue = new TreeMap<>(); + for (int i = 0; i < sparseFloatArray.size(); i++) { + String value = sparseFloatArray.getString(i); + String[] split = value.split(":"); + Long key = Long.parseLong(split[0]); + Float val = Float.parseFloat(split[1]); + mapValue.put(key, val); + } + return mapValue; + } catch (Exception e) { + log.error("parse column[{}] SparseFloatVector value error, value should like [3:0.5, 24:0.8, 76:0.2], but get:{}", milvusColumn.getName(), column.asString()); + throw e; + } + case JSON: + return column.asString(); + case Array: + JSONArray parseArray = JSON.parseArray(column.asString()); + return parseArray.stream().map(item -> String.valueOf(item)).collect(Collectors.toList()); + default: + throw new RuntimeException(String.format("Unsupported data type[%s]", type)); + } + } +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java new file mode 100644 index 00000000..1bf4743b --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java @@ -0,0 +1,95 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.util.List; + +import com.alibaba.datax.common.util.Configuration; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DropCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.partition.request.CreatePartitionReq; +import io.milvus.v2.service.partition.request.HasPartitionReq; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public class MilvusClient { + private MilvusClientV2 milvusClientV2; + + public MilvusClient(Configuration conf) { + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder().uri(conf.getString(KeyConstant.ENDPOINT)).build(); + String token = null; + if (conf.getString(KeyConstant.TOKEN) != null) { + token = conf.getString(KeyConstant.TOKEN); + } else { + token = conf.getString(KeyConstant.USERNAME) + ":" + conf.getString(KeyConstant.PASSWORD); + } + connectConfig.setToken(token); + String database = conf.getString(KeyConstant.DATABASE); + if (StringUtils.isNotBlank(database)) { + log.info("use database {}", database); + connectConfig.setDbName(conf.getString(KeyConstant.DATABASE)); + } + Integer connectTimeOut = conf.getInt(KeyConstant.CONNECT_TIMEOUT_MS); + if (connectTimeOut != null) { + connectConfig.setConnectTimeoutMs(connectTimeOut); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + } + + public void upsert(String collection, String partition, List data) { + UpsertReq upsertReq = UpsertReq.builder().collectionName(collection).data(data).build(); + if (StringUtils.isNotEmpty(partition)) { + upsertReq.setPartitionName(partition); + } + milvusClientV2.upsert(upsertReq); + } + + public void insert(String collection, String partition, List data) { + InsertReq insertReq = InsertReq.builder().collectionName(collection).data(data).build(); + if (StringUtils.isNotEmpty(partition)) { + insertReq.setPartitionName(partition); + } + milvusClientV2.insert(insertReq); + } + + public Boolean hasCollection(String collection) { + HasCollectionReq build = HasCollectionReq.builder().collectionName(collection).build(); + return milvusClientV2.hasCollection(build); + } + + public void createCollection(String collection, CreateCollectionReq.CollectionSchema schema) { + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder().collectionName(collection).collectionSchema(schema).build(); + milvusClientV2.createCollection(createCollectionReq); + } + + public void dropCollection(String collection) { + DropCollectionReq request = DropCollectionReq.builder().collectionName(collection).build(); + milvusClientV2.dropCollection(request); + } + public Boolean hasPartition(String collection, String partition) { + HasPartitionReq hasPartitionReq = HasPartitionReq.builder().collectionName(collection).partitionName(partition).build(); + return milvusClientV2.hasPartition(hasPartitionReq); + } + + public void createPartition(String collectionName, String partitionName) { + CreatePartitionReq createPartitionReq = CreatePartitionReq.builder().collectionName(collectionName).partitionName(partitionName).build(); + milvusClientV2.createPartition(createPartitionReq); + } + + public void close() { + log.info("Closing Milvus client"); + milvusClientV2.close(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java new file mode 100644 index 00000000..06070248 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java @@ -0,0 +1,112 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import io.milvus.v2.common.DataType; + +import java.util.Arrays; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +public class MilvusColumn { + private String name; + private String type; + private DataType milvusTypeEnum; + private Boolean isPrimaryKey; + private Integer dimension; + private Boolean isPartitionKey; + private Integer maxLength; + private Boolean isAutoId; + private Integer maxCapacity; + private String elementType; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + for (DataType item : DataType.values()) { + if (item.name().equalsIgnoreCase(type)) { + this.milvusTypeEnum = item; + break; + } + } + if (this.milvusTypeEnum == null) { + throw new RuntimeException("Unsupported type: " + type + " supported types: " + Arrays.toString(DataType.values())); + } + } + + public Integer getDimension() { + return dimension; + } + + public void setDimension(Integer dimension) { + this.dimension = dimension; + } + + public Integer getMaxLength() { + return maxLength; + } + + public void setMaxLength(Integer maxLength) { + this.maxLength = maxLength; + } + + public Boolean getPrimaryKey() { + return isPrimaryKey; + } + + public Boolean getPartitionKey() { + return isPartitionKey; + } + + public void setPartitionKey(Boolean partitionKey) { + isPartitionKey = partitionKey; + } + + public void setPrimaryKey(Boolean primaryKey) { + isPrimaryKey = primaryKey; + } + + public Boolean getAutoId() { + return isAutoId; + } + + public void setAutoId(Boolean autoId) { + isAutoId = autoId; + } + + public Integer getMaxCapacity() { + return maxCapacity; + } + + public void setMaxCapacity(Integer maxCapacity) { + this.maxCapacity = maxCapacity; + } + + public String getElementType() { + return elementType; + } + + public void setElementType(String elementType) { + this.elementType = elementType; + } + + public DataType getMilvusTypeEnum() { + return milvusTypeEnum; + } + + public void setMilvusTypeEnum(DataType milvusTypeEnum) { + this.milvusTypeEnum = milvusTypeEnum; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java new file mode 100644 index 00000000..84e296f4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java @@ -0,0 +1,102 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.util.List; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.milvuswriter.enums.SchemaCreateModeEnum; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; + +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import lombok.extern.slf4j.Slf4j; + +import static io.milvus.v2.common.DataType.valueOf; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public class MilvusCreateCollection { + + private Configuration config; + + MilvusCreateCollection(Configuration originalConfig) { + this.config = originalConfig; + } + + public void createCollectionByMode(MilvusClient milvusClient) { + String collection = this.config.getString(KeyConstant.COLLECTION); + SchemaCreateModeEnum schemaCreateMode = SchemaCreateModeEnum.getEnum(this.config.getString(KeyConstant.SCHAME_CREATE_MODE)); + List milvusColumnMeta = JSON.parseObject(config.getString(KeyConstant.COLUMN), new TypeReference>() { + }); + Boolean hasCollection = milvusClient.hasCollection(collection); + if (schemaCreateMode == SchemaCreateModeEnum.CREATEIFNOTEXIT) { + // create collection + if (hasCollection) { + log.info("collection[{}] already exists, continue create", collection); + } else { + log.info("creating collection[{}]", collection); + CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta); + milvusClient.createCollection(collection, collectionSchema); + } + } else if (schemaCreateMode == SchemaCreateModeEnum.RECREATE) { + if (hasCollection) { + log.info("collection already exist, try to drop"); + milvusClient.dropCollection(collection); + } + log.info("creating collection[{}]", collection); + CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta); + milvusClient.createCollection(collection, collectionSchema); + } else if (schemaCreateMode == SchemaCreateModeEnum.IGNORE && !hasCollection) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + + private CreateCollectionReq.CollectionSchema prepareCollectionSchema(List milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + MilvusColumn milvusColumn = milvusColumnMeta.get(i); + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumn.getName()) + .dataType(valueOf(milvusColumn.getType())) + .build(); + if (milvusColumn.getPrimaryKey() != null) { + addFieldReq.setIsPrimaryKey(milvusColumn.getPrimaryKey()); + } + if (milvusColumn.getDimension() != null) { + addFieldReq.setDimension(milvusColumn.getDimension()); + } + if (milvusColumn.getPartitionKey() != null) { + addFieldReq.setIsPartitionKey(milvusColumn.getPartitionKey()); + } + if (milvusColumn.getMaxLength() != null) { + addFieldReq.setMaxLength(milvusColumn.getMaxLength()); + } + if (milvusColumn.getAutoId() != null) { + addFieldReq.setAutoID(milvusColumn.getAutoId()); + } + if (milvusColumn.getMaxCapacity() != null) { + addFieldReq.setMaxCapacity(milvusColumn.getMaxCapacity()); + } + if (milvusColumn.getElementType() != null) { + addFieldReq.setElementType(DataType.valueOf(milvusColumn.getElementType())); + } + try { + collectionSchema.addField(addFieldReq); + } catch (Exception e) { + log.error("add filed[{}] error", milvusColumn.getName()); + throw e; + } + } + Boolean enableDynamic = config.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA); + if (enableDynamic != null) { + collectionSchema.setEnableDynamicField(enableDynamic); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java deleted file mode 100644 index 390f95e5..00000000 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.alibaba.datax.plugin.writer.milvuswriter; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.fastjson2.JSONArray; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.milvus.v2.common.DataType; -import static io.milvus.v2.common.DataType.*; -import io.milvus.v2.service.collection.request.AddFieldReq; -import io.milvus.v2.service.collection.request.CreateCollectionReq; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.stream.Collectors; - -public class MilvusSinkConverter { - public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { - JsonObject data = new JsonObject(); - Gson gson = new Gson(); - for(int i = 0; i < record.getColumnNumber(); i++) { - String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); - String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); - Object rawData = record.getColumn(i).getRawData(); - Object field = convertToMilvusField(fieldType, rawData); - data.add(fieldName, gson.toJsonTree(field)); - } - return data; - } - - private Object convertToMilvusField(String type, Object rawData) { - Gson gson = new Gson(); - switch (valueOf(type)) { - case Int32: - return Integer.parseInt(rawData.toString()); - case Int64: - return Long.parseLong(rawData.toString()); - case Float: - return java.lang.Float.parseFloat(rawData.toString()); - case String: - case VarChar: - return rawData.toString(); - case Bool: - return Boolean.parseBoolean(rawData.toString()); - case FloatVector: - java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); - return Arrays.stream(floats).collect(Collectors.toList()); - case BinaryVector: - java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); - return BufferUtils.toByteBuffer(binarys); - case Float16Vector: - case BFloat16Vector: - // all these data is byte format in milvus - ByteBuffer binaryVector = (ByteBuffer) rawData; - return gson.toJsonTree(binaryVector.array()); - case SparseFloatVector: - return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); - default: - throw new RuntimeException("Unsupported data type"); - } - } - - public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { - CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); - for (int i = 0; i < milvusColumnMeta.size(); i++) { - AddFieldReq addFieldReq = AddFieldReq.builder() - .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME)) - .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE))) - .build(); - if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) { - addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY)); - } - if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) { - addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION)); - } - if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) { - addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY)); - } - if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { - addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); - } - collectionSchema.addField(addFieldReq); - } - return collectionSchema; - } -} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java index c9b5a1bc..764a9d94 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -1,29 +1,49 @@ package com.alibaba.datax.plugin.writer.milvuswriter; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.google.gson.JsonObject; -import io.milvus.v2.client.ConnectConfig; -import io.milvus.v2.client.MilvusClientV2; -import io.milvus.v2.common.DataType; -import io.milvus.v2.service.collection.request.AddFieldReq; -import io.milvus.v2.service.collection.request.CreateCollectionReq; -import io.milvus.v2.service.collection.request.HasCollectionReq; -import io.milvus.v2.service.vector.request.UpsertReq; + import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; -import java.util.Collections; import java.util.List; @Slf4j public class MilvusWriter extends Writer { public static class Job extends Writer.Job { private Configuration originalConfig = null; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + originalConfig.getNecessaryValue(KeyConstant.ENDPOINT, MilvusWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(KeyConstant.COLUMN, MilvusWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(KeyConstant.COLLECTION, MilvusWriterErrorCode.REQUIRED_VALUE); + } + + @Override + public void prepare() { + //collection create process + MilvusClient milvusClient = new MilvusClient(originalConfig); + try { + MilvusCreateCollection milvusCreateCollection = new MilvusCreateCollection(originalConfig); + milvusCreateCollection.createCollectionByMode(milvusClient); + String collection = originalConfig.getString(KeyConstant.COLLECTION); + String partition = originalConfig.getString(KeyConstant.PARTITION); + if (partition != null && !milvusClient.hasPartition(collection, partition)) { + log.info("collection[{}] not contain partition[{}],try to create partition", collection, partition); + milvusClient.createPartition(collection, partition); + } + } catch (Exception e) { + throw DataXException.asDataXException(MilvusWriterErrorCode.MILVUS_COLLECTION, e.getMessage(), e); + } finally { + milvusClient.close(); + } + } + /** * 切分任务。
* @@ -31,100 +51,60 @@ public class MilvusWriter extends Writer { */ @Override public List split(int mandatoryNumber) { - List configList = new ArrayList(); - for(int i = 0; i < mandatoryNumber; i++) { + List configList = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { configList.add(this.originalConfig.clone()); } return configList; } - @Override - public void init() { - this.originalConfig = super.getPluginJobConf(); - } - @Override public void destroy() { } } + public static class Task extends Writer.Task { - private MilvusClientV2 milvusClientV2; - - private MilvusSinkConverter milvusSinkConverter; private MilvusBufferWriter milvusBufferWriter; - - private String collection = null; - private JSONArray milvusColumnMeta; - - private String schemaCreateMode = "createWhenTableNotExit"; - - @Override - public void startWrite(RecordReceiver lineReceiver) { - Record record = lineReceiver.getFromReader(); - JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); - milvusBufferWriter.write(data); - if(milvusBufferWriter.needCommit()){ - log.info("Reached buffer limit, Committing data"); - milvusBufferWriter.commit(); - log.info("Data committed"); - } - } + MilvusClient milvusClient; @Override public void init() { log.info("Initializing Milvus writer"); // get configuration Configuration writerSliceConfig = this.getPluginJobConf(); - this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); - this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); - this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ? - "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode); - int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); - log.info("Collection:{}", this.collection); - // connect to milvus - ConnectConfig connectConfig = ConnectConfig.builder() - .uri(writerSliceConfig.getString(KeyConstant.URI)) - .token(writerSliceConfig.getString(KeyConstant.TOKEN)) - .build(); - if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { - log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); - connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); - } - this.milvusClientV2 = new MilvusClientV2(connectConfig); - this.milvusSinkConverter = new MilvusSinkConverter(); - this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize); + this.milvusClient = new MilvusClient(writerSliceConfig); + this.milvusBufferWriter = new MilvusBufferWriter(this.milvusClient, writerSliceConfig); log.info("Milvus writer initialized"); } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = null; + while ((record = lineReceiver.getFromReader()) != null) { + milvusBufferWriter.add(record, this.getTaskPluginCollector()); + if (milvusBufferWriter.needCommit()) { + log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize()); + milvusBufferWriter.commit(); + } + } + if (milvusBufferWriter.getDataCacheSize() > 0) { + log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize()); + milvusBufferWriter.commit(); + } + } + @Override public void prepare() { super.prepare(); - Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); - if (!hasCollection) { - log.info("Collection not exist"); - if (schemaCreateMode.equals("createWhenTableNotExit")) { - // create collection - log.info("Creating collection:{}", this.collection); - CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); - - CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() - .collectionName(collection) - .collectionSchema(collectionSchema) - .build(); - milvusClientV2.createCollection(createCollectionReq); - } else if (schemaCreateMode.equals("exception")) { - log.error("Collection not exist, throw exception"); - throw new RuntimeException("Collection not exist"); - } - } } @Override public void destroy() { - log.info("Closing Milvus writer, committing data and closing connection"); - this.milvusBufferWriter.commit(); - this.milvusClientV2.close(); + if (this.milvusClient != null) { + this.milvusClient.close(); + } } } -} +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java new file mode 100644 index 00000000..7264160f --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java @@ -0,0 +1,35 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +public enum MilvusWriterErrorCode implements ErrorCode { + MILVUS_COLLECTION("MilvusWriter-01", "collection process error"), + REQUIRED_VALUE("MilvusWriter-02", "miss required parameter"); + private final String code; + private final String description; + + MilvusWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java new file mode 100644 index 00000000..b8c88bf1 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java @@ -0,0 +1,34 @@ +package com.alibaba.datax.plugin.writer.milvuswriter.enums; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public enum SchemaCreateModeEnum { + CREATEIFNOTEXIT("createIfNotExist"), + IGNORE("ignore"), + RECREATE("recreate"); + String type; + + SchemaCreateModeEnum(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public static SchemaCreateModeEnum getEnum(String name) { + for (SchemaCreateModeEnum value : SchemaCreateModeEnum.values()) { + if (value.getType().equalsIgnoreCase(name)) { + return value; + } + } + log.info("use default CREATEIFNOTEXIT schame create mode"); + return CREATEIFNOTEXIT; + } +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java new file mode 100644 index 00000000..0098dbad --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.writer.milvuswriter.enums; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public enum WriteModeEnum { + INSERT("insert"), + UPSERT("upsert"); + String mode; + + public String getMode() { + return mode; + } + + WriteModeEnum(String mode) { + this.mode = mode; + } + + public static WriteModeEnum getEnum(String mode) { + for (WriteModeEnum writeModeEnum : WriteModeEnum.values()) { + if (writeModeEnum.getMode().equalsIgnoreCase(mode)) { + return writeModeEnum; + } + } + log.info("use default write mode upsert"); + return UPSERT; + } +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json index d4ba4bf1..33bd941a 100644 --- a/milvuswriter/src/main/resources/plugin_job_template.json +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -1,15 +1,12 @@ { - "name": "mongodbwriter", + "name": "milvuswriter", "parameter": { - "address": [], - "userName": "", - "userPassword": "", - "dbName": "", - "collectionName": "", + "endpoint": "", + "username": "", + "password": "", + "database": "", + "collection": "", "column": [], - "upsertInfo": { - "isUpsert": "", - "upsertKey": "" - } + "enableDynamicSchema": "" } } \ No newline at end of file diff --git a/package.xml b/package.xml index 624109f7..c0f9cdf4 100644 --- a/package.xml +++ b/package.xml @@ -546,5 +546,12 @@ datax + + milvuswriter/target/datax/ + + **/*.* + + datax + diff --git a/stream2milvus.json b/stream2milvus.json new file mode 100644 index 00000000..e69de29b From 14068ce1b5afc0b47e69a1084c21a74a07de6352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=90=E8=8C=97?= Date: Sat, 8 Feb 2025 14:53:33 +0800 Subject: [PATCH 3/4] to #62452156 feat:add milvus writer remove stream2milvus.json --- stream2milvus.json | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 stream2milvus.json diff --git a/stream2milvus.json b/stream2milvus.json deleted file mode 100644 index e69de29b..00000000 From 14c81a9b93680995f12b8caa39ec7cba26209341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=90=E8=8C=97?= Date: Sat, 8 Feb 2025 14:55:52 +0800 Subject: [PATCH 4/4] to #62452156 feat:add milvus writer remove milvusjob.json --- milvusjob.json | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 milvusjob.json diff --git a/milvusjob.json b/milvusjob.json deleted file mode 100644 index e69de29b..00000000