01
单 Agent 与多 Agent 的读写设计有何异同?

02
Milvus如何用四档一致性控制数据对外可见的时机


03
实验:Bounded 查不到,Strong 查得到
实验设计思路
-
preload预写:提前写入大批量数据,制造WAL(Write-Ahead Log)历史积压; -
storm writers后台写入:用多个后台线程持续高速写入数据,维持Query Node的追赶压力。
#!/usr/bin/env python3import argparseimport itertoolsimport randomimport threadingimport timeimport uuidfrom contextlib import suppressfrom pymilvus import DataType, MilvusClientdef make_vector(seed, dim): rng = random.Random(seed) vec = [rng.uniform(-1.0, 1.0) for _ in range(dim)] norm = sum(x * x for x in vec) ** 0.5 or 1.0 return [x / norm for x in vec]def make_records(start_id, count, dim, marker, round_no): return [ { "id": start_id + i, "vector": make_vector(start_id + i, dim), "marker": marker, "round": round_no, } for i in range(count) ]def create_collection(client, name, dim): if client.has_collection(name): client.drop_collection(name) schema = client.create_schema(auto_id=False, enable_dynamic_field=False) schema.add_field("id", DataType.INT64, is_primary=True) schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim) schema.add_field("marker", DataType.VARCHAR, max_length=128) schema.add_field("round", DataType.INT64) index_params = client.prepare_index_params() index_params.add_index( field_name="vector", index_type="AUTOINDEX", metric_type="COSINE", ) client.create_collection( collection_name=name, schema=schema, index_params=index_params, consistency_level="Bounded", ) client.load_collection(name)def search_marker(client, name, vector, marker, consistency, timeout): result = client.search( collection_name=name, data=[vector], anns_field="vector", search_params={"metric_type": "COSINE"}, filter=f'marker == "{marker}"', limit=1, output_fields=["id", "marker", "round"], consistency_level=consistency, timeout=timeout, ) hits = result[0] if result else [] return len(hits), hitsdef writer_storm(uri, name, dim, stop_event, id_counter, batch_size, sleep_seconds): client = MilvusClient(uri=uri) while not stop_event.is_set(): start_id = next(id_counter) records = make_records(start_id, batch_size, dim, "storm", -1) with suppress(Exception): client.insert(collection_name=name, data=records) if sleep_seconds > 0: time.sleep(sleep_seconds)def main(): parser = argparse.ArgumentParser() parser.add_argument("--uri", default="http://localhost:19530") parser.add_argument("--collection", default="") parser.add_argument("--dim", type=int, default=16) parser.add_argument("--attempts", type=int, default=200) parser.add_argument("--bounded-timeout", type=float, default=2.0) parser.add_argument("--strong-timeout", type=float, default=30.0) parser.add_argument("--storm-writers", type=int, default=2) parser.add_argument("--storm-batch-size", type=int, default=2000) parser.add_argument("--storm-sleep", type=float, default=0.0) parser.add_argument("--preload", type=int, default=5000) parser.add_argument("--keep", action="store_true") args = parser.parse_args() collection = args.collection or f"consistency_probe_{int(time.time())}_{uuid.uuid4().hex[:8]}" writer = MilvusClient(uri=args.uri) bounded_reader = MilvusClient(uri=args.uri) strong_reader = MilvusClient(uri=args.uri) stop_event = threading.Event() storm_threads = [] storm_id_counter = itertools.count(10_000_000, args.storm_batch_size) print(f"uri={args.uri}") print(f"collection={collection}") try: create_collection(writer, collection, args.dim) if args.preload > 0: print(f"preload {args.preload} rows") writer.insert( collection_name=collection, data=make_records(1_000_000, args.preload, args.dim, "preload", -2), ) _, _ = search_marker( strong_reader, collection, make_vector(1_000_000, args.dim), "preload", "Strong", args.strong_timeout, ) for _ in range(args.storm_writers): thread = threading.Thread( target=writer_storm, args=( args.uri, collection, args.dim, stop_event, storm_id_counter, args.storm_batch_size, args.storm_sleep, ), daemon=True, ) thread.start() storm_threads.append(thread) for attempt in range(args.attempts): marker = f"probe_{attempt}_{uuid.uuid4().hex[:12]}" record_id = attempt + 1 vector = make_vector(record_id, args.dim) record = { "id": record_id, "vector": vector, "marker": marker, "round": attempt, } insert_start = time.perf_counter() writer.insert(collection_name=collection, data=[record]) insert_ms = (time.perf_counter() - insert_start) * 1000 bounded_start = time.perf_counter() bounded_count, bounded_hits = search_marker( bounded_reader, collection, vector, marker, "Bounded", args.bounded_timeout, ) bounded_ms = (time.perf_counter() - bounded_start) * 1000 strong_start = time.perf_counter() strong_count, strong_hits = search_marker( strong_reader, collection, vector, marker, "Strong", args.strong_timeout, ) strong_ms = (time.perf_counter() - strong_start) * 1000 print( f"attempt={attempt:03d} insert={insert_ms:.1f}ms " f"bounded={bounded_count}({bounded_ms:.1f}ms) " f"strong={strong_count}({strong_ms:.1f}ms)" ) if bounded_count == 0 and strong_count > 0: print("nREPRODUCED: Bounded missed the just-inserted row, Strong found it.") print(f"marker={marker}") print(f"strong_hit={strong_hits[0] if strong_hits else None}") return if bounded_hits and not strong_hits: print("Unexpected: Bounded found the row but Strong did not; check service config.") print("nNot reproduced. QueryNode likely consumed the insert before each Bounded search.") print("Try increasing --storm-writers/--storm-batch-size/--attempts, or run against a cluster under write load.") finally: stop_event.set() for thread in storm_threads: thread.join(timeout=1) if args.keep: print(f"kept collection={collection}") else: with suppress(Exception): writer.drop_collection(collection) print(f"dropped collection={collection}")if __name__ == "__main__": main()
运行命令(替换uri为自身Milvus服务地址):
python probe.py --uri http://localhost:19530 --storm-writers 2 --storm-batch-size 2000 --preload 5000
运行结果:(与文档中报错URL对应的服务地址一致):
uri=http://192.168.4.115:19530collection=consistency_probe_1777278755_71fb2959preload 5000 rowsattempt=000 insert=47.7ms bounded=0(100.7ms) strong=1(171.7ms)REPRODUCED: Bounded missed the just-inserted row, Strong found it.marker=probe_0_96fadc07d29estrong_hit={'id': 1, 'distance': 1.0, 'entity': {'marker': 'probe_0_96fadc07d29e', 'round': 0, 'id': 1}}dropped collection=consistency_probe_1777278755_71fb2959
04
不是所有场景都需要 Strong



