diff --git a/milvuswriter/doc/milvuswriter.md b/milvuswriter/doc/milvuswriter.md new file mode 100644 index 00000000..3788c62a --- /dev/null +++ b/milvuswriter/doc/milvuswriter.md @@ -0,0 +1,273 @@ +# DataX milvuswriter + + +--- + + +## 1 快速介绍 + +milvuswriter 插件实现了写入数据到 milvus集合的功能; 面向ETL开发工程师,使用 milvuswriter 从数仓导入数据到 milvus, 同时 milvuswriter 亦可以作为数据迁移工具为DBA等用户提供服务。 + + +## 2 实现原理 + +milvuswriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 `upsert/insert `方式写入数据到milvus, 并通过batchSize累积的方式进行数据提交。 +
+ + 注意:upsert写入方式(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。 + insert写入方式: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 这里提供一份从内存产生数据导入到 milvus的配置样例。 + +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column" : [ + { + "value": 1, + "type": "long" + }, + { + "value": "[1.1,1.2,1.3]", + "type": "string" + }, + { + "value": 100, + "type": "long" + }, + { + "value": 200, + "type": "long" + }, + { + "value": 300, + "type": "long" + }, + { + "value": 3.14159, + "type": "double" + }, + { + "value": 3.1415926, + "type": "double" + }, + { + "value": "testvarcharvalue", + "type": "string" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "[1.123,1.2456,1.3789]", + "type": "string" + }, + { + "value": "[2.123,2.2456,2.3789]", + "type": "string" + }, + { + "value": "12345678", + "type": "string" + }, + { + "value": "{\"a\":1,\"b\":2,\"c\":3}", + "type": "string" + }, + { + "value": "[1,2,3,4]", + "type": "string" + } + ], + "sliceRecordCount": 1 + } + }, + "writer": { + "parameter": { + "schemaCreateMode": "createIfNotExist", + "connectTimeoutMs": 60000, + "writeMode": "upsert", + "collection": "demo01", + "type": "milvus", + "token": "xxxxxxx", + "endpoint": "https://xxxxxxxx.com:443", + "batchSize": 1024, + "column": [ + { + "name": "id", + "type": "Int64", + "primaryKey": "true" + }, + { + "name": "floatvector", + "type": "FloatVector", + "dimension": "3" + }, + { + "name": "int8col", + "type": "Int8" + }, + { + "name": "int16col", + "type": "Int16" + }, + { + "name": "int32col", + "type": "Int32" + }, + { + "name": "floatcol", + "type": "Float" + }, + { + "name": "doublecol", + "type": "Double" + }, + { + "name": "varcharcol", + "type": "VarChar" + }, + { + "name": "boolcol", + "type": "Bool" + }, + { + "name": "bfloat16vectorcol", + "type": "BFloat16Vector", + "dimension": "3" + }, + { + "name": "float16vectorcol", + "type": "Float16Vector", + "dimension": "3" + }, + { + "name": "binaryvectorcol", + "type": "BinaryVector", + "dimension": "64" + }, + { + "name": "jsoncol", + "type": "JSON" + }, + { + "name": "intarraycol", + "maxCapacity": "8", + "type": "Array", + "elementType": "Int32" + } + ] + }, + "name": "milvuswriter" + } + } + ], + "setting": { + "errorLimit": { + "record": "0" + }, + "speed": { + "concurrent": 2, + "channel": 2 + } + } + } +} + +``` + + +### 3.2 参数说明 + +* **endpoint** + * 描述:milvus数据库的连接信息,包含地址和端口,例如https://xxxxxxxx.com:443 + + 注意:1、在一个数据库上只能配置一个 endpoint 值 + 2、一个milvus 写入任务仅能配置一个 endpoint + * 必选:是
+ * 默认值:无
+* *schemaCreateMode* + * 描述: 集合创建的模式,同步时milvus集合不存在的处理方式, 根据配置的column属性进行创建 + * 取值 + * createIfNotExist: 如果集合不存在,则创建集合,如果集合存在,则不执行任何操作 + * ignore: 如果集合不存在,任务异常报错,如果集合存在,则不执行任何操作 + * recreate: 如果集合不存在,则创建集合,如果集合存在,则删除集合重建集合 + * 必选:否
+ * 默认值:createIfNotExist
+* **connectTimeoutMs** + * 描述:与milvus交互是客户端的连接超时时间,单位毫秒
+ * 必选:否
+ * 默认值:10000
+* **token** + * 描述:milvus实例认证的token秘钥,与username认证方式二选一配置
+ * 必选:否
+ * 默认值:无
+* **username** + * 描述:目的milvus数据库的用户名, 与token二选一配置
+ * 必选:否
+ * 默认值:无
+* **password** + * 描述:目的milvus数据库的密码
+ * 必选:否
+ * 默认值:无
+* *writeMode* + * 描述: 写入milvus集合的写入方式 + * 取值 + * upsert(推荐): 在非autid表场景下根据主键更新 Collection 中的某个 Entity;autid表场景下会将 Entity 中的主键替换为自动生成的主键并插入数据。 + * insert: 多用于autid表插入数据milvus自动生成主键, 非autoid表下使用insert会导致数据重复。 + * 必选:是
+ * 默认值:upsert
+* **collection** + * 描述:目的集合名称。 只能配置一个milvus的集合名称。 + * 必选:是
+ * 默认值:无
+* **batchSize** + * 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与milvus的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
+ * 必选:否
+ * 默认值:1024
+ +* **column** + * 描述:目的集合需要写入数据的字段,字段内容用json格式描述,字段之间用英文逗号分隔。字段属性必填name、type, 其他属性在需要schemaCreateMode创建集合按需填入,例如: + + "column": [ + { + "name": "id", + "type": "Int64", + "primaryKey": "true" + }, + { + "name": "floatvector", + "type": "FloatVector", + "dimension": "3" + }] + * 必选:是
+ * 默认值:否
+### 3.3 支持同步milvus字段类型 + Bool, + Int8, + Int16, + Int32, + Int64, + Float, + Double, + String, + VarChar, + Array, + JSON, + BinaryVector, + FloatVector, + Float16Vector, + BFloat16Vector, + SparseFloatVector + diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml new file mode 100644 index 00000000..16c00560 --- /dev/null +++ b/milvuswriter/pom.xml @@ -0,0 +1,125 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvuswriter + + + UTF-8 + official + 1.8 + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.49 + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.projectlombok + lombok + 1.18.30 + + + guava + com.google.guava + 32.0.1-jre + + + io.milvus + milvus-sdk-java + 2.5.2 + + + org.mockito + mockito-core + 3.3.3 + test + + + junit + junit + 4.11 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + org.powermock + powermock-module-junit4 + 2.0.9 + test + + + org.powermock + powermock-api-mockito2 + 2.0.9 + test + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml new file mode 100644 index 00000000..62357b4a --- /dev/null +++ b/milvuswriter/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/milvuswriter + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/milvuswriter + + + + + + false + plugin/writer/milvuswriter/libs + runtime + + + diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java new file mode 100644 index 00000000..28f1ff13 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +public class KeyConstant { + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String ENDPOINT = "endpoint"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String BATCH_SIZE = "batchSize"; + public static final String COLUMN = "column"; + public static final String SCHAME_CREATE_MODE = "schemaCreateMode"; + public static final String WRITE_MODE = "writeMode"; + public static final String PARTITION = "partition"; + public static final String CONNECT_TIMEOUT_MS = "connectTimeoutMs"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java new file mode 100644 index 00000000..b78728e4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -0,0 +1,166 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.plugin.writer.milvuswriter.enums.WriteModeEnum; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.vector.request.data.BFloat16Vec; +import io.milvus.v2.service.vector.request.data.Float16Vec; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; + +@Slf4j +public class MilvusBufferWriter { + + private final MilvusClient milvusClient; + private final String collection; + private final Integer batchSize; + private List dataCache; + private List milvusColumnMeta; + private WriteModeEnum writeMode; + private String partition; + + public MilvusBufferWriter(MilvusClient milvusClient, Configuration writerSliceConfig) { + this.milvusClient = milvusClient; + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + this.dataCache = new ArrayList<>(batchSize); + this.milvusColumnMeta = JSON.parseObject(writerSliceConfig.getString(KeyConstant.COLUMN), new TypeReference>() { + }); + this.writeMode = WriteModeEnum.getEnum(writerSliceConfig.getString(KeyConstant.WRITE_MODE)); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION); + } + + public void add(Record record, TaskPluginCollector taskPluginCollector) { + try { + JsonObject data = this.convertByType(milvusColumnMeta, record); + dataCache.add(data); + } catch (Exception e) { + taskPluginCollector.collectDirtyRecord(record, String.format("parse record error errorMessage: %s", e.getMessage())); + } + } + + public Boolean needCommit() { + return dataCache.size() >= batchSize; + } + + public void commit() { + if (dataCache.isEmpty()) { + log.info("dataCache is empty, skip commit"); + return; + } + if (writeMode == WriteModeEnum.INSERT) { + milvusClient.insert(collection, partition, dataCache); + } else { + milvusClient.upsert(collection, partition, dataCache); + } + dataCache = new ArrayList<>(batchSize); + } + + public int getDataCacheSize() { + return dataCache.size(); + } + + private JsonObject convertByType(List milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for (int i = 0; i < record.getColumnNumber(); i++) { + MilvusColumn milvusColumn = milvusColumnMeta.get(i); + DataType fieldType = milvusColumn.getMilvusTypeEnum(); + String fieldName = milvusColumn.getName(); + Column column = record.getColumn(i); + try { + Object field = convertToMilvusField(fieldType, column, milvusColumn); + data.add(fieldName, gson.toJsonTree(field)); + } catch (Exception e) { + log.error("parse error for column: {} errorMessage: {}", fieldName, e.getMessage(), e); + throw e; + } + } + return data; + } + + //值需要跟这里匹配:io.milvus.param.ParamUtils#checkFieldData(io.milvus.param.collection.FieldType, java.util.List, boolean) + private Object convertToMilvusField(DataType type, Column column, MilvusColumn milvusColumn) { + if (column.getRawData() == null) { + return null; + } + switch (type) { + case Int8: + case Int16: + case Int32: + case Int64: + return column.asLong(); + case Float: + case Double: + return column.asDouble(); + case String: + case VarChar: + return column.asString(); + case Bool: + return column.asBoolean(); + case BFloat16Vector: + JSONArray bFloat16ArrayJson = JSON.parseArray(column.asString()); + List bfloat16Vector = new ArrayList<>(); + for (int i = 0; i < bFloat16ArrayJson.size(); i++) { + Float value = Float.parseFloat(bFloat16ArrayJson.getString(i)); + bfloat16Vector.add(value); + } + BFloat16Vec bFloat16Vec = new BFloat16Vec(bfloat16Vector); + ByteBuffer byteBuffer = (ByteBuffer) bFloat16Vec.getData(); + return byteBuffer.array(); + case Float16Vector: + JSONArray float16ArrayJson = JSON.parseArray(column.asString()); + List float16Vector = new ArrayList<>(); + for (int i = 0; i < float16ArrayJson.size(); i++) { + Float floatValue = Float.parseFloat(float16ArrayJson.getString(i)); + float16Vector.add(floatValue); + } + Float16Vec float16Vec = new Float16Vec(float16Vector); + ByteBuffer data = (ByteBuffer) float16Vec.getData(); + return data.array(); + case BinaryVector: + return column.asBytes(); + case FloatVector: + JSONArray arrayJson = JSON.parseArray(column.asString()); + return arrayJson.stream().map(item -> Float.parseFloat(String.valueOf(item))).collect(Collectors.toList()); + case SparseFloatVector: + //[3:0.5, 24:0.8, 76:0.2] + try { + JSONArray sparseFloatArray = JSON.parseArray(column.asString()); + TreeMap mapValue = new TreeMap<>(); + for (int i = 0; i < sparseFloatArray.size(); i++) { + String value = sparseFloatArray.getString(i); + String[] split = value.split(":"); + Long key = Long.parseLong(split[0]); + Float val = Float.parseFloat(split[1]); + mapValue.put(key, val); + } + return mapValue; + } catch (Exception e) { + log.error("parse column[{}] SparseFloatVector value error, value should like [3:0.5, 24:0.8, 76:0.2], but get:{}", milvusColumn.getName(), column.asString()); + throw e; + } + case JSON: + return column.asString(); + case Array: + JSONArray parseArray = JSON.parseArray(column.asString()); + return parseArray.stream().map(item -> String.valueOf(item)).collect(Collectors.toList()); + default: + throw new RuntimeException(String.format("Unsupported data type[%s]", type)); + } + } +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java new file mode 100644 index 00000000..1bf4743b --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusClient.java @@ -0,0 +1,95 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.util.List; + +import com.alibaba.datax.common.util.Configuration; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DropCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.partition.request.CreatePartitionReq; +import io.milvus.v2.service.partition.request.HasPartitionReq; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public class MilvusClient { + private MilvusClientV2 milvusClientV2; + + public MilvusClient(Configuration conf) { + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder().uri(conf.getString(KeyConstant.ENDPOINT)).build(); + String token = null; + if (conf.getString(KeyConstant.TOKEN) != null) { + token = conf.getString(KeyConstant.TOKEN); + } else { + token = conf.getString(KeyConstant.USERNAME) + ":" + conf.getString(KeyConstant.PASSWORD); + } + connectConfig.setToken(token); + String database = conf.getString(KeyConstant.DATABASE); + if (StringUtils.isNotBlank(database)) { + log.info("use database {}", database); + connectConfig.setDbName(conf.getString(KeyConstant.DATABASE)); + } + Integer connectTimeOut = conf.getInt(KeyConstant.CONNECT_TIMEOUT_MS); + if (connectTimeOut != null) { + connectConfig.setConnectTimeoutMs(connectTimeOut); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + } + + public void upsert(String collection, String partition, List data) { + UpsertReq upsertReq = UpsertReq.builder().collectionName(collection).data(data).build(); + if (StringUtils.isNotEmpty(partition)) { + upsertReq.setPartitionName(partition); + } + milvusClientV2.upsert(upsertReq); + } + + public void insert(String collection, String partition, List data) { + InsertReq insertReq = InsertReq.builder().collectionName(collection).data(data).build(); + if (StringUtils.isNotEmpty(partition)) { + insertReq.setPartitionName(partition); + } + milvusClientV2.insert(insertReq); + } + + public Boolean hasCollection(String collection) { + HasCollectionReq build = HasCollectionReq.builder().collectionName(collection).build(); + return milvusClientV2.hasCollection(build); + } + + public void createCollection(String collection, CreateCollectionReq.CollectionSchema schema) { + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder().collectionName(collection).collectionSchema(schema).build(); + milvusClientV2.createCollection(createCollectionReq); + } + + public void dropCollection(String collection) { + DropCollectionReq request = DropCollectionReq.builder().collectionName(collection).build(); + milvusClientV2.dropCollection(request); + } + public Boolean hasPartition(String collection, String partition) { + HasPartitionReq hasPartitionReq = HasPartitionReq.builder().collectionName(collection).partitionName(partition).build(); + return milvusClientV2.hasPartition(hasPartitionReq); + } + + public void createPartition(String collectionName, String partitionName) { + CreatePartitionReq createPartitionReq = CreatePartitionReq.builder().collectionName(collectionName).partitionName(partitionName).build(); + milvusClientV2.createPartition(createPartitionReq); + } + + public void close() { + log.info("Closing Milvus client"); + milvusClientV2.close(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java new file mode 100644 index 00000000..06070248 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusColumn.java @@ -0,0 +1,112 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import io.milvus.v2.common.DataType; + +import java.util.Arrays; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +public class MilvusColumn { + private String name; + private String type; + private DataType milvusTypeEnum; + private Boolean isPrimaryKey; + private Integer dimension; + private Boolean isPartitionKey; + private Integer maxLength; + private Boolean isAutoId; + private Integer maxCapacity; + private String elementType; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + for (DataType item : DataType.values()) { + if (item.name().equalsIgnoreCase(type)) { + this.milvusTypeEnum = item; + break; + } + } + if (this.milvusTypeEnum == null) { + throw new RuntimeException("Unsupported type: " + type + " supported types: " + Arrays.toString(DataType.values())); + } + } + + public Integer getDimension() { + return dimension; + } + + public void setDimension(Integer dimension) { + this.dimension = dimension; + } + + public Integer getMaxLength() { + return maxLength; + } + + public void setMaxLength(Integer maxLength) { + this.maxLength = maxLength; + } + + public Boolean getPrimaryKey() { + return isPrimaryKey; + } + + public Boolean getPartitionKey() { + return isPartitionKey; + } + + public void setPartitionKey(Boolean partitionKey) { + isPartitionKey = partitionKey; + } + + public void setPrimaryKey(Boolean primaryKey) { + isPrimaryKey = primaryKey; + } + + public Boolean getAutoId() { + return isAutoId; + } + + public void setAutoId(Boolean autoId) { + isAutoId = autoId; + } + + public Integer getMaxCapacity() { + return maxCapacity; + } + + public void setMaxCapacity(Integer maxCapacity) { + this.maxCapacity = maxCapacity; + } + + public String getElementType() { + return elementType; + } + + public void setElementType(String elementType) { + this.elementType = elementType; + } + + public DataType getMilvusTypeEnum() { + return milvusTypeEnum; + } + + public void setMilvusTypeEnum(DataType milvusTypeEnum) { + this.milvusTypeEnum = milvusTypeEnum; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java new file mode 100644 index 00000000..84e296f4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusCreateCollection.java @@ -0,0 +1,102 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.util.List; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.milvuswriter.enums.SchemaCreateModeEnum; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; + +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import lombok.extern.slf4j.Slf4j; + +import static io.milvus.v2.common.DataType.valueOf; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public class MilvusCreateCollection { + + private Configuration config; + + MilvusCreateCollection(Configuration originalConfig) { + this.config = originalConfig; + } + + public void createCollectionByMode(MilvusClient milvusClient) { + String collection = this.config.getString(KeyConstant.COLLECTION); + SchemaCreateModeEnum schemaCreateMode = SchemaCreateModeEnum.getEnum(this.config.getString(KeyConstant.SCHAME_CREATE_MODE)); + List milvusColumnMeta = JSON.parseObject(config.getString(KeyConstant.COLUMN), new TypeReference>() { + }); + Boolean hasCollection = milvusClient.hasCollection(collection); + if (schemaCreateMode == SchemaCreateModeEnum.CREATEIFNOTEXIT) { + // create collection + if (hasCollection) { + log.info("collection[{}] already exists, continue create", collection); + } else { + log.info("creating collection[{}]", collection); + CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta); + milvusClient.createCollection(collection, collectionSchema); + } + } else if (schemaCreateMode == SchemaCreateModeEnum.RECREATE) { + if (hasCollection) { + log.info("collection already exist, try to drop"); + milvusClient.dropCollection(collection); + } + log.info("creating collection[{}]", collection); + CreateCollectionReq.CollectionSchema collectionSchema = prepareCollectionSchema(milvusColumnMeta); + milvusClient.createCollection(collection, collectionSchema); + } else if (schemaCreateMode == SchemaCreateModeEnum.IGNORE && !hasCollection) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + + private CreateCollectionReq.CollectionSchema prepareCollectionSchema(List milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + MilvusColumn milvusColumn = milvusColumnMeta.get(i); + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumn.getName()) + .dataType(valueOf(milvusColumn.getType())) + .build(); + if (milvusColumn.getPrimaryKey() != null) { + addFieldReq.setIsPrimaryKey(milvusColumn.getPrimaryKey()); + } + if (milvusColumn.getDimension() != null) { + addFieldReq.setDimension(milvusColumn.getDimension()); + } + if (milvusColumn.getPartitionKey() != null) { + addFieldReq.setIsPartitionKey(milvusColumn.getPartitionKey()); + } + if (milvusColumn.getMaxLength() != null) { + addFieldReq.setMaxLength(milvusColumn.getMaxLength()); + } + if (milvusColumn.getAutoId() != null) { + addFieldReq.setAutoID(milvusColumn.getAutoId()); + } + if (milvusColumn.getMaxCapacity() != null) { + addFieldReq.setMaxCapacity(milvusColumn.getMaxCapacity()); + } + if (milvusColumn.getElementType() != null) { + addFieldReq.setElementType(DataType.valueOf(milvusColumn.getElementType())); + } + try { + collectionSchema.addField(addFieldReq); + } catch (Exception e) { + log.error("add filed[{}] error", milvusColumn.getName()); + throw e; + } + } + Boolean enableDynamic = config.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA); + if (enableDynamic != null) { + collectionSchema.setEnableDynamicField(enableDynamic); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java new file mode 100644 index 00000000..764a9d94 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -0,0 +1,110 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusWriter extends Writer { + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + originalConfig.getNecessaryValue(KeyConstant.ENDPOINT, MilvusWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(KeyConstant.COLUMN, MilvusWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(KeyConstant.COLLECTION, MilvusWriterErrorCode.REQUIRED_VALUE); + } + + @Override + public void prepare() { + //collection create process + MilvusClient milvusClient = new MilvusClient(originalConfig); + try { + MilvusCreateCollection milvusCreateCollection = new MilvusCreateCollection(originalConfig); + milvusCreateCollection.createCollectionByMode(milvusClient); + String collection = originalConfig.getString(KeyConstant.COLLECTION); + String partition = originalConfig.getString(KeyConstant.PARTITION); + if (partition != null && !milvusClient.hasPartition(collection, partition)) { + log.info("collection[{}] not contain partition[{}],try to create partition", collection, partition); + milvusClient.createPartition(collection, partition); + } + } catch (Exception e) { + throw DataXException.asDataXException(MilvusWriterErrorCode.MILVUS_COLLECTION, e.getMessage(), e); + } finally { + milvusClient.close(); + } + } + + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList<>(); + for (int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void destroy() { + + } + } + + public static class Task extends Writer.Task { + + private MilvusBufferWriter milvusBufferWriter; + MilvusClient milvusClient; + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.milvusClient = new MilvusClient(writerSliceConfig); + this.milvusBufferWriter = new MilvusBufferWriter(this.milvusClient, writerSliceConfig); + log.info("Milvus writer initialized"); + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = null; + while ((record = lineReceiver.getFromReader()) != null) { + milvusBufferWriter.add(record, this.getTaskPluginCollector()); + if (milvusBufferWriter.needCommit()) { + log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize()); + milvusBufferWriter.commit(); + } + } + if (milvusBufferWriter.getDataCacheSize() > 0) { + log.info("begin committing data size[{}]", milvusBufferWriter.getDataCacheSize()); + milvusBufferWriter.commit(); + } + } + + @Override + public void prepare() { + super.prepare(); + } + + @Override + public void destroy() { + if (this.milvusClient != null) { + this.milvusClient.close(); + } + } + } +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java new file mode 100644 index 00000000..7264160f --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriterErrorCode.java @@ -0,0 +1,35 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +public enum MilvusWriterErrorCode implements ErrorCode { + MILVUS_COLLECTION("MilvusWriter-01", "collection process error"), + REQUIRED_VALUE("MilvusWriter-02", "miss required parameter"); + private final String code; + private final String description; + + MilvusWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java new file mode 100644 index 00000000..b8c88bf1 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/SchemaCreateModeEnum.java @@ -0,0 +1,34 @@ +package com.alibaba.datax.plugin.writer.milvuswriter.enums; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author ziming(子茗) + * @date 12/27/24 + * @description + */ +@Slf4j +public enum SchemaCreateModeEnum { + CREATEIFNOTEXIT("createIfNotExist"), + IGNORE("ignore"), + RECREATE("recreate"); + String type; + + SchemaCreateModeEnum(String type) { + this.type = type; + } + + public String getType() { + return type; + } + + public static SchemaCreateModeEnum getEnum(String name) { + for (SchemaCreateModeEnum value : SchemaCreateModeEnum.values()) { + if (value.getType().equalsIgnoreCase(name)) { + return value; + } + } + log.info("use default CREATEIFNOTEXIT schame create mode"); + return CREATEIFNOTEXIT; + } +} \ No newline at end of file diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java new file mode 100644 index 00000000..0098dbad --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/enums/WriteModeEnum.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.writer.milvuswriter.enums; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public enum WriteModeEnum { + INSERT("insert"), + UPSERT("upsert"); + String mode; + + public String getMode() { + return mode; + } + + WriteModeEnum(String mode) { + this.mode = mode; + } + + public static WriteModeEnum getEnum(String mode) { + for (WriteModeEnum writeModeEnum : WriteModeEnum.values()) { + if (writeModeEnum.getMode().equalsIgnoreCase(mode)) { + return writeModeEnum; + } + } + log.info("use default write mode upsert"); + return UPSERT; + } +} diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json new file mode 100644 index 00000000..8b912309 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvuswriter", + "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter", + "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.", + "developer": "nianliuu" +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..33bd941a --- /dev/null +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,12 @@ +{ + "name": "milvuswriter", + "parameter": { + "endpoint": "", + "username": "", + "password": "", + "database": "", + "collection": "", + "column": [], + "enableDynamicSchema": "" + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 624109f7..c0f9cdf4 100644 --- a/package.xml +++ b/package.xml @@ -546,5 +546,12 @@ datax + + milvuswriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index c7f43f17..1b364a75 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ adbmysqlwriter sybasewriter neo4jwriter + milvuswriter plugin-rdbms-util plugin-unstructured-storage-util