llamaindex 最新发布Workflows 1.0轻量级编排框架,不用依赖llamindex本身。创建事件驱动型自定义代理工作流的方法。

框架的主要特点
-
async-first : -
工作流围绕 python 的 async 函数功能构建 -
处理来自 asyncio 队列的传入事件,并将新事件发射到其他队列。 -
这也意味着工作流在您的异步应用程序中效果最好,如FastAPI,Jupyter Notebooks等。 -
事件驱动 : -
工作流由步骤和事件组成。 -
围绕事件和步骤组织代码,可以更容易地进行推理和测试。 -
状态管理: -
工作流的每次运行都是自包含的,这意味着您可以启动工作流,在其中保存信息,序列化工作流的状态并在以后恢复。 -
可观察性 : -
工作流自动用于可观察性,这意味着您可以使用类似工具 Arize Phoenix和OpenTelemetry从盒子里出来。
应用
-
AI Agents:创建智能系统,可以推理,决策和跨多个步骤采取行动 -
文档处理管道:构建通过各种处理阶段摄取、分析、总结和路由文档的系统 -
多模态AI应用 :协调不同的AI模型(LLM,视觉模型等)和数据格式,以解决复杂的任务 -
研究助理 :开发可以搜索、分析、综合信息并提供全面答案的工作流程 -
基于角色的多代理系统 :构建代理系统,可以通过在需要时相互移交控制来协调多个代理。 -
内容生成系统 :创建管道,通过人工在线审批生成、审查、编辑和发布内容 -
**客户支持自动化 **:构建能够理解、分类和响应客户查询的智能路由系统
异步优先的事件驱动架构可以轻松构建在不同功能之间路由的应用程序,实现并行处理模式,在复杂序列上循环,并在多个步骤中保持状态。使您的AI应用程序生产就绪所需的所有功能。
Workflows1.0实战
import asyncio
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent
class MyEvent(Event):
msg: list[str]
class MyWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> MyEvent:
num_runs = await ctx.get("num_runs", default=0)
num_runs += 1
await ctx.set("num_runs", num_runs)
return MyEvent(msg=[ev.input_msg] * num_runs)
@step
async def process(self, ctx: Context, ev: MyEvent) -> StopEvent:
data_length = len("".join(ev.msg))
new_msg = f"Processed {len(ev.msg)} times, data length: {data_length}"
return StopEvent(result=new_msg)
async def main():
workflow = MyWorkflow()
# [optional] provide a context object to the workflow
ctx = Context(workflow)
result = await workflow.run(input_msg="Hello, world!", ctx=ctx)
print("Workflow result:", result)
# re-running with the same context will retain the state
result = await workflow.run(input_msg="Hello, world!", ctx=ctx)
print("Workflow result:", result)
if __name__ == "__main__":
asyncio.run(main())
-
StartEvent:表示编排状态的开始
-
StopEvent:表示编排状态的结束
-
Context:用于共享编排中每一步的信息
https://github.com/run-llama/workflows-py


