共计 2027 个字符,预计需要花费 6 分钟才能阅读完成。
背景痛点:为什么需要重构 Agent 架构?
最近在开发智能 Agent 时,我发现随着功能增加,代码逐渐变成一团乱麻。具体表现为:

- Skill 耦合度过高:比如天气查询 Skill 直接调用了地图服务的内部方法,一旦地图 API 变更,所有相关 Skill 都需要修改
- 任务流僵化 :原系统采用硬编码流程,类似
if-else瀑布流,增加新业务必须修改核心调度代码 - 维护成本飙升:团队每新增一个 Skill,都可能导致现有功能异常
技术方案:DAG+ 事件总线的黄金组合
1. 用 DAG 管理 Skill 依赖
DAG(有向无环图)能完美表示 Skill 间的依赖关系。例如点餐 Agent 中:
graph LR
A[菜单查询] --> B[优惠计算]
B --> C[支付处理]
D[会员识别] --> B
2. 事件总线实现松耦合
采用发布 / 订阅模式后,Skill 只需声明自己产生和消费的事件类型。比如:
- 支付 Skill 发布
PaymentDone事件 - 订单系统订阅该事件触发后续操作
核心代码实现
Skill 基类设计(Python 3.10+)
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseSkill(ABC):
@property
@abstractmethod
def output_schema(self) -> Dict[str, Any]:
"""定义 Skill 的输出格式"""
@abstractmethod
def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""必须实现的执行方法"""
def __repr__(self) -> str:
return f"<{self.__class__.__name__}>"
DAG 调度器关键代码
from collections import deque
def topological_sort(skills: Dict[str, BaseSkill]) -> List[str]:
"""拓扑排序确保执行顺序正确"""
in_degree = {name: 0 for name in skills}
graph = {name: [] for name in skills}
# 构建图结构
for name, skill in skills.items():
for dep in skill.dependencies:
graph[dep].append(name)
in_degree[name] += 1
# Kahn 算法实现
queue = deque([name for name, degree in in_degree.items() if degree == 0])
result = []
while queue:
current = queue.popleft()
result.append(current)
for neighbor in graph[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(skills):
raise ValueError("存在循环依赖!")
return result
生产环境关键设计
超时熔断机制
import signal
from contextlib import contextmanager
@contextmanager
def timeout_handler(seconds: int, skill_name: str):
def raise_timeout(signum, frame):
raise TimeoutError(f"{skill_name}执行超时")
signal.signal(signal.SIGALRM, raise_timeout)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0) # 取消定时器
性能优化数据
我们在电商客服 Agent 上测试发现:
| 调度模式 | 100 个任务耗时(ms) |
|---|---|
| 串行执行 | 2450 |
| 并行 DAG | 620 |
避坑指南
- 循环依赖检测:
- 在 Skill 注册时强制执行拓扑排序
-
使用
pydantic校验依赖声明格式 -
版本兼容性:
@skill_registry.register(version="1.2.0") class NewPaymentSkill(BaseSkill): min_compatible_version = "1.1.0" -
内存泄漏排查:
- 使用
tracemalloc监控 Skill 内存占用 - 为每个 Skill 创建独立虚拟环境
思考题
- 如何设计 Skill 的热加载机制,实现不停机更新?
- 当多个 Skill 产生同名事件时,如何设计优先级机制?
- 对于超大规模 Skill 库(1000+),如何优化 DAG 的构建速度?
经过这次重构,我们的 Agent 系统终于实现了:新增 Skill 无需修改核心代码、依赖关系可视化、执行效率提升 3 倍。建议大家在设计初期就采用这种架构,避免后期重构的痛苦。
正文完
