几年前初识 Milvus 的契机,来源于开发一个图像相似检索的应用,当时市面上向量库的可选择项并不像现在这么多,且功能也仅限于单纯的向量检索。
鉴于最近有业务更新的需要,和打算重构一下之前做的RAG项目,再次有机会深入学习 Milvus,探索其在新功能和实际应用上的更多可能性。
本篇笔记,仅作为学习笔记,更多是记录一些 Milvus从2.0 到2.5 的变化, 和一些动手实践的记录,便于之后的查阅。
Collection是Milvus中的一个二维表格,具有固定的列和可变的行。每一列代表一个字段(field),每一行代表一个实体(entity)
Schema和字段 Collection需要一个schema来定义其结构
索引 在特定字段上创建索引可以提高搜索效率。建议为服务所依赖的所有字段创建索引,其中向量字段的索引是必需的。
分区(Partition) 分区是Collection的子集,与其父Collection共享相同的字段集
分片(Shard) 分片是Collection的水平切片。每个分片对应一个数据输入通道。
Shard vs Partition的区别 :
MilvusClient
定位:更高级封装,提供一体化的操作接口。简化了与 Milvus 的交互流程, 提供更直观和结构化的操作方式,便于新手快速上手,内置了对连接的管理和操作,减少手动处理的复杂性。
from pymilvus import MilvusClient
# 创建客户端并连接到 Milvus
client = MilvusClient(uri="http://localhost:19530")
# 创建集合
client.create_collection(
name="example_collection",
schema={"fields": [{"name": "vector", "type": "FLOAT_VECTOR", "params": {"dim": 128}}]}
)
# 插入数据
client.insert("example_collection", data={"vector": [[0.1] * 128, [0.2] * 128]})
# 搜索
results = client.search("example_collection", data=[[0.1] * 128])
print(results)
Connection
定位:基础连接操作,需要通过 connect 方法创建并维护连接。提供更底层的控制,适合灵活、自定义的操作。
from pymilvus import connections, Collection, FieldSchema, CollectionSchema
# 创建连接
connections.connect(alias="default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="vector", dtype="FLOAT_VECTOR", dim=128)
]
schema = CollectionSchema(fields, description="example collection")
collection = Collection(name="example_collection", schema=schema)
# 插入数据
collection.insert([[0.1] * 128, [0.2] * 128])
# 搜索
collection.load()
results = collection.search([[0.1] * 128], anns_field="vector", limit=10)
print(results)
Schema 用于定义collection及其字段的属性
设定 Schema 的目的是为了定义数据的逻辑结构和存储规则。Schema 是集合(Collection)和字段(Field)的元数据定义,它决定了如何存储和管理数据,同时保证数据操作的一致性、灵活性和高效性。
字段属性(Field Properties)
属性名称
描述
注意事项
name
字段名称
必须字段,类型为字符串。
dtype
字段的数据类型
必须字段。
description
字段的描述
可选字段,类型为字符串。
is_primary
是否将字段设为主键
类型为布尔值(true 或 false),主键字段必须指定。
auto_id
是否启用自动 ID 分配
必须为主键字段设置,类型为布尔值。
max_length
VARCHAR 字段的最大长度(以字节为单位)
必须为 VARCHAR 字段指定,范围为 1, 65,535。
dim
向量的维度
对于稠密向量字段,必须指定;对稀疏向量字段可以省略。范围为 1, 32,768。
is_partition_key
是否为分区键字段
类型为布尔值(true 或 false)。
Milvus 支持以下三类字段:
1. 主键字段
主键字段是集合的唯一标识,用于标识集合中的每条记录。
属性
描述
name
主键字段的名称
dtype
支持 INT64(整数主键)或 VARCHAR(字符串主键)
is_primary
必须设为 True,表示该字段是主键
auto_id
是否自动生成主键值(布尔值:True 或 False)
2. 标量字段
标量字段用于存储布尔值、整数、浮点数、字符串、JSON 数据或数组等。
数据类型
描述
BOOL
布尔值,支持 true 或 false
INT8
8 位整数
INT16
16 位整数
INT32
32 位整数
INT64
64 位整数
FLOAT
32 位单精度浮点数
DOUBLE
64 位双精度浮点数
VARCHAR
可变长度字符串,需指定最大长度(max_length)
JSON
JSON 数据类型,存储键值对
Array
数组类型,用于存储多个相同类型的数据
3. 向量字段
向量字段用于存储高维特征数据,是 Milvus 支持向量检索的基础。
数据类型
描述
FLOAT_VECTOR
32 位浮点型向量,常用于机器学习和深度学习模型的特征表示
BINARY_VECTOR
二进制向量,存储由 0 和 1 组成的序列,用于图像处理或紧凑特征表示
FLOAT16_VECTOR
16 位浮点型向量,用于深度学习和 GPU 计算的内存优化
BFLOAT16_VECTOR
16 位浮点型向量,具有与 FLOAT32 相同的指数范围,常用于深度学习优化
SPARSE_FLOAT_VECTOR
稀疏浮点型向量,存储非零元素及其索引,用于稀疏向量场景
其他支持的字段属性
is_partition_key
: 是否将字段作为分区键,支持 true
或 false
。dim
: 向量字段的维度,必需为向量字段定义。max_length
: VARCHAR
类型字段的最大字节长度。动态字段允许在插入数据时不提前定义字段,可以通过 enable_dynamic_field=True
在collection Schema 中启用
属性名称
描述
注意事项
fields
字段的集合
必须字段。
description
集合的描述
可选字段,类型为字符串。
partition_key_field
作为分区键的字段名
可选字段,类型为字符串。
enable_dynamic_field
是否启用动态字段
可选字段,类型为布尔值,默认值为 false。
为什么需要构造索引?
加速搜索:向量搜索的主要瓶颈在于计算高维向量之间的相似性。如果不使用索引,检索可能需要线性扫描整个数据集,耗时且资源消耗大;
支持大规模数据: 在百万甚至十亿级别的数据量中,直接搜索变得不可行。索引能够分层次或分块处理数据,从而支持大规模数据场景
资源效率:使用索引可以显著减少内存和计算资源的占用,尤其在处理频繁的实时查询时更加重要。
几种主要的索引类型:
密集向量(Dense Vectors)
索引名称
适合场景
优点
缺点
参数解释
原理
FLAT
小数据集,精确搜索
搜索结果精确,简单易用
随数据量增加,速度显著下降
无需额外参数
对所有向量进行线性扫描,计算查询向量与每个向量的距离,从而找到最相似的结果。
IVF_FLAT
中等规模数据集
搜索效率高,支持近似搜索
搜索精度依赖参数调整
nlist:桶的数量,影响召回率和性能;nprobe:查询时访问的桶数量,影响搜索的准确性和速度
将向量划分为多个簇(cluster),通过聚类算法(如 k-means)预先构建倒排文件,仅在相关簇中执行搜索,减少比较次数,提高效率。
IVF_SQ8
需要平衡存储和性能的场景
存储需求低
搜索精度较低
与 IVF_FLAT 相同
在 IVF_FLAT 的基础上对每个向量进行标量量化(Scalar Quantization, SQ),将原始浮点向量压缩为整数形式以节省存储空间。
IVF_PQ
超大规模数据集,存储敏感场景
存储需求显著降低
搜索精度较低
nlist:桶的数量;m:子向量的数量,影响压缩比和性能;nbits:每个子向量的编码位数,影响压缩质量
基于 IVF_FLAT,使用产品量化(Product Quantization, PQ)将向量分为多个子空间,并分别进行量化,从而显著减少存储需求。
中小规模数据集,高搜索精度场景
高性能,高维数据效果良好
索引构建时间长,占用内存大
efConstruction:控制构建时的候选集大小,影响构建速度和搜索精度;ef:搜索时的候选集大小,影响搜索性能和精度
使用分层的近似图(Navigable Small World Graph, NSW)表示向量之间的关系,通过图的导航找到查询向量的近似最近邻。
磁盘存储超大规模数据集
支持超大规模数据检索
查询速度较慢
search_list:控制查询范围,影响精度;pq_code_budget_gb:控制压缩的预算大小;build_threads:控制索引构建的线程数
结合倒排文件和磁盘访问优化,将部分索引和数据保存在磁盘上,通过减少内存占用实现超大规模数据检索。
支持的度量类型有欧几里得距离(L2)、内积(IP)、余弦相似度(COSINE)。
二进制向量(Binary Vectors)
二进制向量(布尔值、哈希签名),占用资源低,对低维数据搜索快,精度不太高的场景
索引名称
适合场景
优点
缺点
参数解释
原理
BIN_FLAT
小规模数据集,精确搜索
搜索结果精确,易于实现
随数据量增加,速度显著下降
无需额外参数
对所有二进制向量进行线性扫描,逐一计算查询向量与每个向量的距离(如汉明距离),从而找到最相似的结果。
BIN_IVF_FLAT
中等规模数据集
搜索效率高,支持近似搜索
搜索精度依赖参数调整
- nlist:桶的数量,影响召回率和性能
- nprobe:查询时访问的桶数量,影响搜索的准确性和速度
结合倒排文件和线性扫描,将二进制向量划分到不同的桶中,通过访问部分桶来加速检索过程。
支持的度量类型有 Jaccard 和 Hamming
稀疏向量(Sparse Vectors)
常用于文档搜索
索引名称
适合场景
优点
缺点
参数解释
原理
SPARSE_INVERTED_INDEX
文档检索、推荐系统
高效支持稀疏数据的相似性搜索
仅支持稀疏向量
- 无需额外参数
基于倒排索引(Inverted Index),将稀疏向量的非零值索引到特定位置,通过快速查找相关文档加速检索过程。
SPARSE_WAND
高性能文档检索、稀疏向量场景
支持稀疏数据,搜索效率更高
构建复杂度较高
- top_k:控制返回的搜索结果数量
基于 WAND(Weighted AND)算法优化倒排索引,对查询进行剪枝优化,仅访问最相关的文档,提高搜索性能。
支持内积(IP)度量
如何选择合适的index
from RAG搭建中,如何选择最合适的向量索引?
GPU-index
索引名称
适合场景
优点
缺点
参数解释
原理
GPU_BRUTE_FORCE
小批量搜索,高性能需求场景
结果精确,性能优于 CPU 的线性扫描
不支持范围搜索,资源消耗较高
无需额外参数
使用 GPU 并行计算能力对所有向量执行线性扫描,计算查询向量与每个向量的相似性,从而获得结果。
GPU_IVF_FLAT
大规模数据集搜索,高吞吐量需求场景
搜索效率高,结合 GPU 加速
搜索精度依赖参数调整
- nlist:桶的数量,影响召回率和性能
- nprobe:查询时访问的桶数量,影响搜索的准确性和速度
在 CPU IVF_FLAT 的基础上,利用 GPU 加速桶的分配和搜索过程,大幅提高检索速度。
GPU_IVF_PQ
大规模数据存储敏感场景,高吞吐量需求场景
存储需求显著降低,支持近似搜索
搜索精度较低
- nlist:桶的数量
- m:子向量数量,影响压缩比和性能
- nbits:每个子向量的编码位数,影响压缩质量
结合倒排文件与产品量化(PQ)技术,利用 GPU 提高量化和检索过程的速度,适合超大规模数据检索。
GPU_CAGRA
超高吞吐量和高召回率场景
高性能,高召回率,适合大批量查询
消耗更多内存,索引构建时间长
- intermediate_graph_degree:构建时图的中间连接数量,影响构建时间和召回率
- graph_degree:最终图的连接数量,影响搜索性能和内存占用
基于图的搜索算法(如 CAGRA),结合 GPU 的并行计算能力构建分层图结构,提高搜索的速度和精度。
用于将原始文本转换为结构化和可搜索的格式。它的主要作用是实现文本处理、索引构建和检索优化。
在本质上,analyzer 包含了一个tokenizer(分词器)和可选的多个filters(过滤器)。
用途: 将输入文本分解为更小的单元(tokens); 通过过滤器进一步处理分词后的 token,例如转为小写、移除停用词、仅保留特定字符等; 将处理后的文本用于全文检索和关键词匹配。
how to use:
schema.add_field(
field_name='text_field',
datatype=DataType.VARCHAR,
max_length=1000,
enable_analyzer=True,
analyzer_params={
"type": "standard", # 使用内置 analyzer
"stop_words": ["a", "an", "the"]
},
enable_match=True, # 启用关键词匹配
)
内置 Analyzer: 提供快速配置,适用于通用场景,如:
standard(标准分词器)/english(针对英文优化)/ chinese(针对中文优化)
自定义 Analyzer, 如:
analyzer_params = {
"tokenizer": "standard",
"filter": [
"lowercase", # 转为小写
{
"type": "stop", # 自定义停用词
"stop_words": ["of", "to", "the"]
}
]
}
不同于传统数据库的精确匹配查询,向量库主要用于相似性搜索。这种搜索基于向量间的距离,通常使用欧几里得距离或余弦相似度。以暴力遍历的最近邻查询为例(Nearest neighbor search),假设我们有n个d维向量,查询复杂度为$O(nd)$。当n和d都很大时,这种方法变得非常耗时。
而近似最近邻搜索(Approximate Nearest Neighbor search, ANN)则能将时间复杂度降低到亚线性,通常为$O(log n)$或更优。
how to ANN-search
构造索引
from pymilvus import MilvusClient, DataType
import numpy as np
# 创建Milvus客户端
client = MilvusClient(
uri="http://localhost:19530"
)
# 创建schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
# 添加字段到schema
schema.add_field(field_name="my_id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="my_vector", datatype=DataType.FLOAT_VECTOR, dim=5)
schema.add_field(field_name="my_varchar", datatype=DataType.VARCHAR, max_length=512)
# 准备索引参数
index_params = client.prepare_index_params()
# 添加索引
index_params.add_index(
field_name="my_id",
index_type="STL_SORT"
)
index_params.add_index(
field_name="my_vector",
index_type="AUTOINDEX",
metric_type="COSINE"
)
# 创建collection
client.create_collection(
collection_name="customized_setup_1",
schema=schema,
index_params=index_params
)`
数据插入
data = [
{
"my_id": i,
"my_vector": np.random.random(5).tolist(),
"my_varchar": f"example_text_{i}"
}
for i in range(100)
]
# 插入数据
client.insert(collection_name="customized_setup_1", data=data)
# 加载集合到内存
client.load_collection("customized_setup_1")
# 查看集合统计信息
stats = client.get_collection_stats("customized_setup_1")
print(f"Collection stats: {stats}")
基础ANN-search
# 查询向量
query_vector = np.random.random(5).tolist() # 模拟一个随机向量作为查询向量
# 搜索参数
search_params = {
"metric_type": "COSINE", # 与创建索引时保持一致
"params": {"nprobe": 10} # nprobe 决定搜索范围,值越高精度越高,但速度可能变慢
}
# 执行搜索
res = client.search(
collection_name="customized_setup_1",
anns_field="my_vector", # 指定向量字段
data=[query_vector], # 查询向量
limit=5, # 返回前 5 个结果
search_params=search_params
)
# 打印搜索结果
print("Search Results:")
for hits in res:
for hit in hits:
print(hit)
在指定的partition中search
# 构建partition,将数据插入partition
client.create_partition(
collection_name="customized_setup_1",
partition_name="partitionA"
)
res = client.list_partitions(
collection_name="customized_setup_1"
)
print(res)
"""
['_default', 'partitionA']
"""
data2 = [
{
"my_id": i,
"my_vector": np.random.random(5).tolist(), # 生成随机向量
"my_varchar": f"example_text_{i}"
}
for i in range(50)
]
client.insert(collection_name="customized_setup_1",partition_name="partitionA", data=data2)
# 在partition 中查询
res = client.search(
collection_name="customized_setup_1",
anns_field="my_vector",
data=[query_vector],
partition_names=["partitionA"],
limit=3,
search_params=search_params
)
print("Search Results:")
for hits in res:
for hit in hits:
print(hit)
通过 output Fields
指定输出的内容,在数据插入时,定义了my_vector
和my_varchar;
(默认只展示id)
# Use Output Fields
res = client.search(
collection_name="customized_setup_1",
anns_field="my_vector",
data=[query_vector],
limit=3,
search_params=search_params,
output_fields=["my_vector","my_varchar"]
)
print("Search Results:")
for hits in res:
for hit in hits:
print(hit)
"""
Search Results:
{'id': 7, 'distance': 0.9791999459266663, 'entity': {'my_vector': [0.7273301482200623, 0.25300344824790955, 0.8510549664497375, 0.6126695871353149, 0.5709182024002075], 'my_varchar': 'example_text_7'}}
{'id': 84, 'distance': 0.9768796563148499, 'entity': {'my_vector': [0.6703248620033264, 0.1109713464975357, 0.5905848145484924, 0.372805655002594, 0.4945228397846222], 'my_varchar': 'example_text_84'}}
{'id': 27, 'distance': 0.9755662083625793, 'entity': {'my_vector': [0.8758077025413513, 0.13442707061767578, 0.968295156955719, 0.25122591853141785, 0.7256166934967041], 'my_varchar': 'example_text_27'}}
"""
Limit and Offset
这两个参数结合使用时可以实现分页查询
# 第 1 页查询:获取第 1-10 个结果
res_page_1 = client.search(
collection_name="customized_setup_2",
anns_field="my_vector",
data=[query_vector],
limit=10, # 每页 10 条
offset=0, # 跳过 0 条
search_params=search_params
)
print("Page 1 Results:")
for hits in res_page_1:
for hit in hits:
print(hit)
# 第 2 页查询:获取第 11-20 个结果
res_page_2 = client.search(
collection_name="customized_setup_2",
anns_field="my_vector",
data=[query_vector],
limit=10, # 每页 10 条
offset=10, # 跳过前 10 条
search_params=search_params
)
print("Page 2 Results:")
for hits in res_page_2:
for hit in hits:
print(hit)
"""
Page 1 Results:
{'id': 7, 'distance': 0.9791999459266663, 'entity': {}}
{'id': 84, 'distance': 0.9768796563148499, 'entity': {}}
{'id': 27, 'distance': 0.9755662083625793, 'entity': {}}
{'id': 28, 'distance': 0.9715055227279663, 'entity': {}}
{'id': 38, 'distance': 0.9661443829536438, 'entity': {}}
{'id': 40, 'distance': 0.9624771475791931, 'entity': {}}
{'id': 93, 'distance': 0.9594350457191467, 'entity': {}}
{'id': 0, 'distance': 0.9587700366973877, 'entity': {}}
{'id': 73, 'distance': 0.9547507166862488, 'entity': {}}
{'id': 32, 'distance': 0.9311172366142273, 'entity': {}}
Page 2 Results:
{'id': 52, 'distance': 0.9296990036964417, 'entity': {}}
{'id': 26, 'distance': 0.9230980277061462, 'entity': {}}
{'id': 33, 'distance': 0.922787606716156, 'entity': {}}
{'id': 55, 'distance': 0.9211370348930359, 'entity': {}}
{'id': 30, 'distance': 0.9198154211044312, 'entity': {}}
{'id': 63, 'distance': 0.918975830078125, 'entity': {}}
{'id': 3, 'distance': 0.9085606932640076, 'entity': {}}
{'id': 37, 'distance': 0.9077025651931763, 'entity': {}}
{'id': 67, 'distance': 0.9050586223602295, 'entity': {}}
{'id': 4, 'distance': 0.9049072861671448, 'entity': {}}
"""
Filtered Search 是在 ANN 方法的基础上添加过滤条件的搜索模式,结合了元数据的过滤功能,能够缩小检索范围或实现一些复杂逻辑。
Filtered Search
数据准备
client = MilvusClient(uri="http://localhost:19530")
# 创建 schema
schema = client.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
index_params = client.prepare_index_params()
# 添加字段到 schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=5)
schema.add_field(field_name="color", datatype=DataType.VARCHAR, max_length=50)
schema.add_field(field_name="likes", datatype=DataType.INT64)
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX", # Automatically choose the best index type
metric_type="L2" # Use L2 distance as the metric
)
# Add a sort index for the id field
index_params.add_index(
field_name="id",
index_type="STL_SORT"
)
collection_name = "customized_setup_5"
client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params
)
# 准备数据
data = [
{"id": 0, "vector": [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], "color": "pink_8682", "likes": 165},
{"id": 1, "vector": [0.19886812562848388, 0.06023560599112088, 0.6976963061752597, 0.2614474506242501, 0.838729485096104], "color": "red_7025", "likes": 25},
{"id": 2, "vector": [0.43742130801983836, -0.5597502546264526, 0.6457887650909682, 0.7894058910881185, 0.20785793220625592], "color": "orange_6781", "likes": 764},
{"id": 3, "vector": [0.3172005263489739, 0.9719044792798428, -0.36981146090600725, -0.4860894583077995, 0.95791889146345], "color": "pink_9298", "likes": 234},
{"id": 4, "vector": [0.4452349528804562, -0.8757026943054742, 0.8220779437047674, 0.46406290649483184, 0.30337481143159106], "color": "red_4794", "likes": 122},
{"id": 5, "vector": [0.985825131989184, -0.8144651566660419, 0.6299267002202009, 0.1206906911183383, -0.1446277761879955], "color": "yellow_4222", "likes": 12},
{"id": 6, "vector": [0.8371977790571115, -0.015764369584852833, -0.31062937026679327, -0.562666951622192, -0.8984947637863987], "color": "red_9392", "likes": 58},
{"id": 7, "vector": [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], "color": "grey_8510", "likes": 775},
{"id": 8, "vector": [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], "color": "white_9381", "likes": 876},
{"id": 9, "vector": [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], "color": "purple_4976", "likes": 765},
]
# 插入数据
client.insert(
collection_name=collection_name,
data=data
)
# {'insert_count': 10, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'cost': 0}
filter-search
res_filtered = client.search(
collection_name=collection_name,
anns_field="vector",
data=[query_vector],
limit=5,
filter="color like 'red%' and likes > 50",
output_fields=["color", "likes"]
)
for hits in res_filtered:
for hit in hits:
print(hit)
"""
{'id': 4, 'distance': 1.4699335098266602, 'entity': {'color': 'red_4794', 'likes': 122}}
{'id': 6, 'distance': 3.989945888519287, 'entity': {'color': 'red_9392', 'likes': 58}}
"""
支持的metadata 过滤方式:
过滤类型
描述
示例
比较操作符
用于数值比较
filter="price > 500"
- >:大于
filter="inventory['quantity'] >= 250"
- <:小于
filter="sales_volume[0] < 100"
- ==:等于
filter="color == 'red_7025'"
- <=:小于等于
filter="price <= 900"
- >=:大于等于
filter="sales_volume[0] >= 150"
- !=:不等于
filter="color != 'yellow_4222'"
Term操作符
精确匹配或排除
filter='color in ["red_7025", "red_4794"]'
- in:匹配指定集合
filter='inventory["brand"] in ["Apple"]'
- not in:排除指定集合
filter='color not in ["yellow_4222", "grey_8510"]'
Match操作符
字符串匹配
filter='color like "red%"'
- like:通配符匹配
filter='inventory["brand"] like "S%"'
- TEXT_MATCH:文本高效匹配
filter='TEXT_MATCH(description, "Apple iPhone")'
算术操作符
数值计算
filter="price * 0.5 < 300"
- +:加
filter="sales_volume[0] + 50 > 200"
- -:减
filter="price - 100 > 500"
- *:乘
filter="price * 2 > 1000"
- /:除
filter="inventory['quantity'] / 2 > 100"
- **:幂运算
filter="price ** 2 > 250000"
- %:取余
filter="price % 100 == 0"
JSON操作符
用于JSON字段的高级过滤
filter='JSON_CONTAINS(inventory["previous_sales"], 232)'
- JSON_CONTAINS:包含指定元素
filter='JSON_CONTAINS_ALL(inventory["previous_sales"], [232, 254])'
- JSON_CONTAINS_ALL:包含所有指定元素
filter='JSON_CONTAINS_ANY(inventory["previous_sales"], [232, 275])'
- JSON_CONTAINS_ANY:包含任一元素
Array操作符
用于数组字段的高级过滤
filter='ARRAY_CONTAINS(sales_volume, 150)'
- ARRAY_CONTAINS:包含指定元素
filter='ARRAY_CONTAINS_ALL(sales_volume, [150, 150])'
- ARRAY_CONTAINS_ALL:包含所有指定元素
filter='ARRAY_CONTAINS_ANY(sales_volume, [150, 190])'
- ARRAY_CONTAINS_ANY:包含任一指定元素
- ARRAY_LENGTH:检查数组长度
filter='ARRAY_LENGTH(sales_volume) == 3'
逻辑操作符
组合多个过滤条件
filter='color like "red%" and price < 500'
- and 或 &&:所有条件均需满足
filter='price < 500 and sales_volume[0] > 100'
- or 或 `
- not:逻辑非
filter='not (price > 500)'
复杂表达式
通过括号调整优先级
filter='(price > 500 and inventory["brand"] in ["Apple"]) or color == "red_7025"'
Range search
通过设置一个范围(半径或区间),筛选出符合距离/相似度范围的向量
执行范围搜索请求时,Milvus 以 ANN 搜索结果中与查询向量最相似的向量为圆心,以搜索请求中指定的半径为外圈半径,以range_filter为内圈半径,画出两个同心圆。所有相似度得分在这两个同心圆形成的环形区域内的向量都将被返回
可能的使用场景:
Metric Type
距离范围
L2
range_filter <= 距离 < radius
IP
radius < 距离 <= range_filter
COSINE
radius < 距离 <= range_filter
JACCARD
range_filter <= 距离 < radius
HAMMING
range_filter <= 距离 < radius
query_vector = [0.5580376395471989, -0.8023495712049978, 0.38414012509913835, -0.36286205330961354, 0.9029438446296591]
# metric_type="L2"
res = client.search(
collection_name=collection_name,
data=[query_vector],
limit=10,
search_params={
"params": {
"radius": 0.9,
"range_filter": 0.1
}
}
)
for hits in res:
for hit in hits:
print(hit)
"""
{'id': 0, 'distance': 0.12999999523162842, 'entity': {}}
"""
将搜索结果根据某些属性或特征分组(group)
搜索结果中,基于特定字段或条件,将相似或具有共同特征的记录聚合在一起
Grouping Search
举个具体的例子,在 RAG(Retrieval-Augmented Generation)任务中,我们通常会将文档拆分成一定大小的文本块(chunks),以便在检索过程中实现更精准的语义匹配。然而,在生成回答时,我们并不总是局限于直接使用召回的这些文本块,而是希望尽可能覆盖更多的文档或是包含更多的文档上下文。因为检索阶段可能会从同一文档中召回多个文本块,而忽略其他文档的内容,从而导致搜索结果的多样性降低,影响答案的全面性和准确性。
数据准备
client = MilvusClient(uri="http://localhost:19530")
# 创建 schema
schema = client.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
index_params = client.prepare_index_params()
# 添加字段到 schema
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=5)
schema.add_field(field_name="chunk", datatype=DataType.VARCHAR, max_length=50)
schema.add_field(field_name="docId", datatype=DataType.INT64)
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX", # Automatically choose the best index type
metric_type="L2" # Use L2 distance as the metric
)
# Add a sort index for the id field
index_params.add_index(
field_name="id",
index_type="STL_SORT"
)
collection_name = "group_search_collection"
client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params
)
data = [
{"id": 0, "vector": [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], "chunk": "pink_8682", "docId": 1},
{"id": 1, "vector": [0.19886812562848388, 0.06023560599112088, 0.6976963061752597, 0.2614474506242501, 0.838729485096104], "chunk": "red_7025", "docId": 5},
{"id": 2, "vector": [0.43742130801983836, -0.5597502546264526, 0.6457887650909682, 0.7894058910881185, 0.20785793220625592], "chunk": "orange_6781", "docId": 2},
{"id": 3, "vector": [0.3172005263489739, 0.9719044792798428, -0.36981146090600725, -0.4860894583077995, 0.95791889146345], "chunk": "pink_9298", "docId": 3},
{"id": 4, "vector": [0.4452349528804562, -0.8757026943054742, 0.8220779437047674, 0.46406290649483184, 0.30337481143159106], "chunk": "red_4794", "docId": 3},
{"id": 5, "vector": [0.985825131989184, -0.8144651566660419, 0.6299267002202009, 0.1206906911183383, -0.1446277761879955], "chunk": "yellow_4222", "docId": 4},
{"id": 6, "vector": [0.8371977790571115, -0.015764369584852833, -0.31062937026679327, -0.562666951622192, -0.8984947637863987], "chunk": "red_9392", "docId": 1},
{"id": 7, "vector": [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], "chunk": "grey_8510", "docId": 2},
{"id": 8, "vector": [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], "chunk": "white_9381", "docId": 5},
{"id": 9, "vector": [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], "chunk": "purple_4976", "docId": 3},
]
client.insert(
collection_name=collection_name,
data=data
)
查询
query_vectors = [
[0.14529211512077012, 0.9147257273453546, 0.7965055218724449, 0.7009258593102812, 0.5605206522382088]]
# Group search results
res = client.search(
collection_name=collection_name,
data=query_vectors,
limit=3,
group_by_field="docId",
output_fields=["docId"]
)
# Retrieve the values in the `docId` column
doc_ids = [result['entity']['docId'] for result in res[0]]
doc_ids
# [5, 2, 3]
只返回top-3的doc结果
允许在一个collection中对多个向量字段(例如稠密向量和稀疏向量)同时进行搜索,并将不同搜索结果重新排序为单一结果集。
Hybrid Search refers to a search method that conducts multiple ANN searches simultaneously, reranks multiple sets of results from these ANN searches, and ultimately returns a single set of results.
Hybrid Search
主要工作流程
from pymilvus import (
MilvusClient, DataType
)
client = MilvusClient(
uri="http://localhost:19530"
)
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1000)
schema.add_field(field_name="sparse", datatype=DataType.SPARSE_FLOAT_VECTOR)
schema.add_field(field_name="dense", datatype=DataType.FLOAT_VECTOR, dim=5)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="dense",
index_name="dense_index",
index_type="IVF_FLAT",
metric_type="IP",
params={"nlist": 128},
)
index_params.add_index(
field_name="sparse",
index_name="sparse_index",
index_type="SPARSE_INVERTED_INDEX", # Index type for sparse vectors
metric_type="IP", # Currently, only IP (Inner Product) is supported for sparse vectors
params={"drop_ratio_build": 0.2}, # The ratio of small vector values to be dropped during indexing
)
client.create_collection(
collection_name="hybrid_search_collection",
schema=schema,
index_params=index_params
)
数据插入
# 插入数据
import random
# 随机生成稠密向量
def generate_random_dense_vector(dim):
return [random.uniform(-1, 1) for _ in range(dim)]
# 随机生成稀疏向量
def generate_random_sparse_vector(max_dim, num_non_zero):
indices = random.sample(range(max_dim), num_non_zero)
values = [random.uniform(0, 1) for _ in range(num_non_zero)]
return {idx: val for idx, val in zip(indices, values)}
# 数据
data = [
{
"id": 0,
"text": "Artificial intelligence was founded as an academic discipline in 1956.",
"sparse": generate_random_sparse_vector(10000, 5), # 稀疏向量
"dense": generate_random_dense_vector(5) # 稠密向量
},
{
"id": 1,
"text": "Alan Turing was the first person to conduct substantial research in AI.",
"sparse": generate_random_sparse_vector(10000, 5),
"dense": generate_random_dense_vector(5)
},
{
"id": 2,
"text": "Born in Maida Vale, London, Turing was raised in southern England.",
"sparse": generate_random_sparse_vector(10000, 5),
"dense": generate_random_dense_vector(5)
},
]
# 插入数据
res = client.insert(
collection_name="hybrid_search_collection",
data=data
)
混合搜索是通过在hybrid_search()
函数中创建多个AnnSearchRequest
来实现的,其中每个AnnSearchRequest
代表一个特定向量场的基本 ANN 搜索请求。
在混合搜索中,每个AnnSearchRequest
只支持一个查询向量。
from pymilvus import AnnSearchRequest
query_dense_vector = [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592]
search_param_1 = {
"data": [query_dense_vector],
"anns_field": "dense",
"param": {
"metric_type": "IP",
"params": {"nprobe": 10}
},
"limit": 2
}
request_1 = AnnSearchRequest(**search_param_1)
query_sparse_vector = {1609: 0.6255744191385809,
7058: 0.20234890590382482,
449: 0.01787891574781786,
9702: 0.710732808143222,
8827: 0.3979309078494506}
search_param_2 = {
"data": [query_sparse_vector],
"anns_field": "sparse",
"param": {
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
},
"limit": 2
}
request_2 = AnnSearchRequest(**search_param_2)
reqs = [request_1, request_2]
采用加权排名
from pymilvus import WeightedRanker
ranker= WeightedRanker(0.8, 0.3)
res = client.hybrid_search(
collection_name="hybrid_search_collection",
reqs=reqs,
ranker=ranker,
limit=2
)
for hits in res:
print("TopK results:")
for hit in hits:
print(hit)
"""
TopK results:
{'id': 1, 'distance': 0.5914705991744995, 'entity': {}}
{'id': 0, 'distance': 0.27949291467666626, 'entity': {}}
"""
采用RRF(Reciprocal Rank Fusion) ranker
from pymilvus import RRFRanker
# Default k value is 60
ranker = RRFRanker(k=100)
res = client.hybrid_search(
collection_name="hybrid_search_collection",
reqs=reqs,
ranker=ranker,
limit=2
)
for hits in res:
print("TopK results:")
for hit in hits:
print(hit)
"""
TopK results:
{'id': 1, 'distance': 0.009900989942252636, 'entity': {}}
{'id': 0, 'distance': 0.009803921915590763, 'entity': {}}
"""
Milvus 在 2.5 版本中引入了全文检索, 能够基于关键字或短语高效地搜索文本数据。
全文检索功能在 Milvus Standalone 和 Milvus Distributed 中可用,但在 Milvus Lite 中不可用
步骤:
Full Text Search
创建 Schema
from pymilvus import MilvusClient, DataType, Function, FunctionType
client = MilvusClient(uri="http://localhost:19530")
schema = client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1000, enable_analyzer=True)
schema.add_field(field_name="sparse", datatype=DataType.SPARSE_FLOAT_VECTOR)
SPARSE_FLOAT_VECTOR
字段,预留用于存储稀疏嵌入
定义一个将文本转换为稀疏向量表示的函数,然后将其添加到 Schema 中
bm25_function = Function(
name="text_bm25_emb", # Function name
input_field_names=["text"], # Name of the VARCHAR field containing raw text data
output_field_names=["sparse"], # Name of the SPARSE_FLOAT_VECTOR field reserved to store generated embeddings
function_type=FunctionType.BM25,
)
schema.add_function(bm25_function)
设置索引 create collection
index_params = client.prepare_index_params()
index_params.add_index(
field_name="sparse",
index_type="AUTOINDEX",
metric_type="BM25"
)
client.create_collection(
collection_name='demo',
schema=schema,
index_params=index_params
)
插入数据
client.insert('demo', [
{'text': 'information retrieval is a field of study.'},
{'text': 'information retrieval focuses on finding relevant information in large datasets.'},
{'text': 'data mining and information retrieval overlap in research.'},
])
执行全文查询
search_params = {
'params': {'drop_ratio_search': 0.2},
}
res = client.search(
collection_name='demo',
data=['whats the focus of information retrieval?'],
anns_field='sparse',
limit=2,
search_params=search_params,
output_fields=["text"]
)
for hits in res:
print("TopK results:")
for hit in hits:
print(hit)
"""
TopK results:
{'id': 454187296102665084, 'distance': 1.3352930545806885, 'entity': {'text': 'information retrieval is a field of study.'}}
{'id': 454187296102665085, 'distance': 0.29726022481918335, 'entity': {'text': 'information retrieval focuses on finding relevant information in large datasets.'}}
"""
Milvus 还支持通过查询进行元数据过滤,即类似于关系型数据库的数据查询和过滤
get
from pymilvus import MilvusClient
client = MilvusClient(
uri="http://localhost:19530"
)
res = client.get(
collection_name="query_collection",
ids=[0, 1, 2],
output_fields=["vector", "color"]
)
print(res)
"""
data: ["{'vector': [0.35803765, -0.6023496, 0.18414013, -0.26286206, 0.90294385], 'color': 'pink_8682', 'id': 0}", "{'vector': [0.19886813, 0.060235605, 0.6976963, 0.26144746, 0.8387295], 'color': 'red_7025', 'id': 1}", "{'vector': [0.43742132, -0.55975026, 0.6457888, 0.7894059, 0.20785794], 'color': 'orange_6781', 'id': 2}"]
"""
query
res = client.query(
collection_name="query_collection",
filter="color like \"red%\"",
output_fields=["vector", "color"],
limit=3
)
print(res)
"""
data: ["{'id': 1, 'vector': [0.19886813, 0.060235605, 0.6976963, 0.26144746, 0.8387295], 'color': 'red_7025'}", "{'id': 4, 'vector': [0.44523495, -0.8757027, 0.82207793, 0.4640629, 0.3033748], 'color': 'red_4794'}", "{'id': 6, 'vector': [0.8371978, -0.015764369, -0.31062937, -0.56266695, -0.8984948], 'color': 'red_9392'}"]
"""
QueryIterator
from pymilvus import connections, Collection
connections.connect(
uri="http://localhost:19530",
)
collection = Collection("query_collection")
iterator = collection.query_iterator(
batch_size=10,
expr="color like \"red%\"",
output_fields=["color"]
)
results = []
while True:
result = iterator.next()
if not result:
iterator.close()
break
print(result)
results += result
"""
[{'color': 'red_7025', 'id': 1}, {'color': 'red_4794', 'id': 4}, {'color': 'red_9392', 'id': 6}]
"""
在 Milvus 中,partition-key(分区键)是一种基于特定标量字段的搜索优化方案。通过将某个字段指定为分区键,Milvus 会根据该字段的值将数据自动分配到不同的分区中,与手动管理分区不同,使用分区键可以克服集合中分区数量的限制(最多 1,024 个)。
将数据分布到独立的分区中,同时支持对分区范围的查询限定,从而实现数据隔离,可以有效避免对非目标分区的数据进行查询操作。
自动管理分区 vs 手动创建和指定分区
特性
partition-key
手动分区
分区管理
自动生成,动态分区
必须手动创建,分区结构固定
分区数量限制
无理论限制,分区数量随字段值动态变化
最多 1,024 个分区
便捷性
插入和查询都更加自动化
插入和查询需要手动指定分区名称
适用场景
动态、多分区场景,如用户 ID、类别分类
静态、少量分区场景,如地理区域划分
Partition-key vs Meta-data Filtering
特性
partition-key
Meta-data Filtering
存储方式
数据物理隔离,按分区键存储
数据逻辑统一,存储在一个集合中
查询效率
通过分区键快速定位目标分区,效率高
全局扫描,效率较低
管理复杂性
自动管理,无需显式定义或维护分区
无需预定义分区,依赖查询时的过滤
动态性
分区结构相对稳定,适合分区逻辑较简单场景
更灵活,适合复杂和多变的查询条件
有无分区差异
通过partition-key 实现数据隔离查询的简单例子;
在这个例子中,每一个user只有查询自己数据的权限,当user选择将数据公开share,此时partition-key会被改成public
,作为单独的partition存在,从而实现数据隔离查询。
数据准备:
from pymilvus import MilvusClient, DataType, Function, FunctionType
client = MilvusClient(uri="http://localhost:19530")
schema = client.create_schema()
index_params = client.prepare_index_params()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="partition_key", datatype=DataType.VARCHAR, max_length=512, is_partition_key=True,description="Partition key for data isolation"),
schema.add_field(field_name="data_id", datatype=DataType.VARCHAR, max_length=128, description="Unique data identifier"),
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128, description="Vector representation"),
schema.add_field(field_name="metadata", datatype=DataType.JSON, description="Additional filtering metadata"),
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX", # Automatically choose the best index type
metric_type="L2" # Use L2 distance as the metric
)
# Add a sort index for the id field
index_params.add_index(
field_name="id",
index_type="STL_SORT"
)
client.create_collection(
collection_name="partitionkey_collection",
schema=schema,
index_params=index_params,
num_partitions=1024
)
构造一些假数据
import json
def generate_test_data(num_records, num_users, public_probability=0.2):
"""
生成测试数据,支持一定概率生成公共数据。
Args:
num_records (int): 数据总数
num_users (int): 用户数量
public_probability (float): 生成公共数据的概率 (0.0 到 1.0)
Returns:
list: 生成的测试数据,每条数据为一个字典
"""
data = []
for _ in range(num_records):
# 根据概率选择 partition_key 为 "public" 或随机用户 ID
if random.random() < public_probability:
partition_key = "public"
else:
partition_key = f"user_{random.randint(1, num_users)}"
record = {
"partition_key": partition_key, # 设置分区键
"data_id": f"data_{random.randint(1000, 9999)}", # 随机生成数据 ID
"vector": [random.uniform(0, 1) for _ in range(128)], # 随机生成128维向量
"metadata": json.dumps({
"category": random.choice(["image", "text", "video"]),
"tags": [random.choice(["AI", "ML", "NLP", "CV"]) for _ in range(2)],
"rating": random.uniform(1, 5),
})
}
data.append(record)
return data
data = generate_test_data(500,20)
client.insert(
collection_name="partitionkey_collection",
data=data
)
仅查询user_10
的数据
query_result = client.query(
collection_name = "partitionkey_collection",
filter='partition_key == "user_10"',
output_fields=["data_id", "partition_key"]
)
print(query_result)
"""
data: ["{'data_id': 'data_7887', 'partition_key': 'user_10', 'id': 454187296102665088}", "{'data_id': 'data_9004', 'partition_key': 'user_10', 'id': 454187296102665090}", "{'data_id': 'data_8970', 'partition_key': 'user_10', 'id': 454187296102665092}", "{'data_id': 'data_2031', 'partition_key': 'user_10', 'id': 454187296102665098}", "{'data_id': 'data_6972', 'partition_key': 'user_10', 'id': 454187296102665111}", "{'data_id': 'data_3463', 'partition_key': 'user_10', 'id': 454187296102665118}", "{'data_id': 'data_9998', 'partition_key': 'user_10', 'id': 454187296102665123}", "{'data_id': 'data_5411', 'partition_key': 'user_10', 'id': 454187296102665133}", "{'data_id': 'data_9952', 'partition_key': 'user_10', 'id': 454187296102665205}", "{'data_id': 'data_9002', 'partition_key': 'user_10', 'id': 454187296102665257}"] ...
"""
user_10 所能ANN搜索的内容(ppublic
and user_10
)
query_vectors = [random.uniform(0, 1) for _ in range(128)]
res = client.search(
collection_name = "partitionkey_collection",
data=[query_vectors],
filter='partition_key in ["public","user_10"]',
output_fields=["data_id", "partition_key"]
)
for hits in res:
for hit in hits:
print(hit)
"""
{'id': 454187296102665637, 'distance': 15.168614387512207, 'entity': {'partition_key': 'public', 'data_id': 'data_9024'}}
{'id': 454187296102665532, 'distance': 15.173531532287598, 'entity': {'partition_key': 'user_10', 'data_id': 'data_8461'}}
{'id': 454187296102665598, 'distance': 15.250306129455566, 'entity': {'partition_key': 'user_10', 'data_id': 'data_7539'}}
{'id': 454187296102665323, 'distance': 15.833619117736816, 'entity': {'partition_key': 'user_10', 'data_id': 'data_8433'}}
{'id': 454187296102665234, 'distance': 16.50188446044922, 'entity': {'partition_key': 'public', 'data_id': 'data_1534'}}
{'id': 454187296102665501, 'distance': 16.521953582763672, 'entity': {'partition_key': 'public', 'data_id': 'data_9944'}}
{'id': 454187296102665463, 'distance': 16.524085998535156, 'entity': {'partition_key': 'public', 'data_id': 'data_6430'}}
{'id': 454187296102665453, 'distance': 16.64727020263672, 'entity': {'partition_key': 'public', 'data_id': 'data_8403'}}
{'id': 454187296102665629, 'distance': 16.649581909179688, 'entity': {'partition_key': 'public', 'data_id': 'data_5141'}}
{'id': 454187296102665631, 'distance': 16.809789657592773, 'entity': {'partition_key': 'public', 'data_id': 'data_3505'}}
"""
在2.4 版本之后支持了对于embedding模型和reranking模型的集成。
个人认为没有太大必要。
之前使用的是milvus-insight,不过不再维护了。
官方主推的是attu。
(待更新)
原网址: 访问
创建于: 2025-08-27 17:18:49
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论