共计 3503 个字符,预计需要花费 9 分钟才能阅读完成。
背景痛点:为什么需要模块化 Agent
最近在开发一个电商客服 Agent 时,发现当功能越来越多后,代码变得难以维护。比如处理退货请求时,需要同时调用:

- 订单验证(检查是否在退货期内)
- 库存同步(生成补货任务)
- 风控检测(判断是否存在欺诈模式)
传统单体 Agent 的典型问题:
- 代码臃肿 :所有逻辑堆在一个类里,单个文件超过 3000 行
- 技能耦合 :修改库存查询逻辑可能意外影响风控模块
- 扩展困难 :每新增一个功能都要修改核心调度代码
架构方案对比
尝试过三种解决方案:
| 方案 | 扩展性 | 维护成本 | 执行效率 | 适用场景 |
|---|---|---|---|---|
| 纯函数调用 | ★★☆ | ★★★ | ★★★★★ | 简单流程(<5 个步骤) |
| 消息队列(如 RabbitMQ) | ★★★★ | ★★☆ | ★★☆ | 跨服务通信 |
| Skill-Subagent | ★★★★★ | ★★☆ | ★★★★ | 复杂业务流 |
最终选择分层模型的核心优势:
- 每个 Skill 可以独立开发测试
- Subagent 按需启停节省资源
- 依赖关系可视化(通过 DAG)
核心实现详解
Skill 基类设计
from abc import ABC, abstractmethod
from typing import Any, Dict
from enum import Enum
class SkillPriority(Enum):
HIGH = 0
NORMAL = 1
LOW = 2
class BaseSkill(ABC):
def __init__(self):
self.metadata = {
'author': 'unknown',
'version': '1.0',
'timeout': 5.0, # 默认超时 5 秒
'priority': SkillPriority.NORMAL
}
@abstractmethod
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""必须实现的技能逻辑"""
pass
def validate_input(self, input_data: Dict) -> bool:
"""默认的参数校验方法"""
return True
Subagent 生命周期管理
关键实现点:
- 使用上下文管理器实现资源自动清理
- 心跳检测防止僵尸进程
import threading
import time
class Subagent:
def __init__(self, skill: BaseSkill):
self.skill = skill
self._active = False
self._heartbeat_thread = None
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def start(self):
self._active = True
self._heartbeat_thread = threading.Thread(
target=self._heartbeat,
daemon=True
)
self._heartbeat_thread.start()
def stop(self):
self._active = False
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=1.0)
def _heartbeat(self):
while self._active:
print(f"{self.skill.__class__.__name__} is alive")
time.sleep(3)
DAG 依赖解析
典型电商退货场景的依赖关系:
graph TD
A[订单验证] --> B[风控检测]
A --> C[库存同步]
B --> D[退款执行]
C --> D
实现拓扑排序的核心代码:
from collections import defaultdict, deque
def resolve_dependencies(skills: List[BaseSkill]) -> List[BaseSkill]:
"""返回排好序的技能执行顺序"""
graph = defaultdict(list)
in_degree = {}
# 构建图结构
for skill in skills:
deps = getattr(skill, 'dependencies', [])
in_degree[skill] = len(deps)
for dep in deps:
graph[dep].append(skill)
# 拓扑排序
queue = deque([s for s in in_degree if in_degree[s] == 0])
result = []
while queue:
current = queue.popleft()
result.append(current)
for neighbor in graph[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(skills):
raise ValueError("存在循环依赖!")
return result
生产环境优化
性能优化技巧
-
Subagent 预热 :高频技能保持常驻
class SubagentPool: def __init__(self): self._pool = {} def get(self, skill_cls: Type[BaseSkill]) -> Subagent: if skill_cls not in self._pool: self._pool[skill_cls] = Subagent(skill_cls()) return self._pool[skill_cls] -
结果缓存 :用 LRU 缓存重复查询
from functools import lru_cache class ProductQuerySkill(BaseSkill): @lru_cache(maxsize=100) def execute(self, context): # 数据库查询逻辑 return db.query(context['product_id'])
安全防护措施
-
参数沙箱校验示例:
def validate_input(self, input_data): if not isinstance(input_data.get('user_id'), int): raise ValueError("user_id 必须是整数") return super().validate_input(input_data) -
权限控制矩阵设计:
class Authorization: SKILL_PERMISSIONS = {'RefundSkill': ['finance'], 'InventorySkill': ['warehouse'] } @classmethod def check(cls, skill_name: str, role: str) -> bool: return role in cls.SKILL_PERMISSIONS.get(skill_name, [])
常见问题解决方案
循环依赖检测
在 DAG 解析阶段会抛出异常,但开发阶段建议使用:
import networkx as nx
def check_circular_dependency(skills):
g = nx.DiGraph()
for skill in skills:
g.add_node(skill)
for dep in getattr(skill, 'dependencies', []):
g.add_edge(dep, skill)
try:
nx.find_cycle(g)
return True # 存在循环
except nx.NetworkXNoCycle:
return False
技能版本管理
推荐语义化版本 + 接口兼容检查:
def can_upgrade(old: BaseSkill, new: BaseSkill) -> bool:
return (old.metadata['version'].split('.')[0]
== new.metadata['version'].split('.')[0]
)
延伸思考方向
- 动态热加载 :结合 importlib 实现技能热更新
- 跨语言调度 :通过 gRPC 暴露技能接口
- LLM 生成技能 :用自然语言描述自动生成 Skill 类
实践建议
建议从简单场景开始实践:
- 先实现 2 - 3 个基础 Skill(如问候、时间查询)
- 添加一个 Subagent 管理器
- 逐步引入 DAG 调度
- 最后考虑性能优化和安全控制
当系统发展到 20+ 技能时,模块化架构的优势会非常明显。最近我们将客服 Agent 的代码量减少了 60%,而功能扩展速度提升了 3 倍。这种架构特别适合需求频繁变化的业务场景。
正文完
