共计 2206 个字符,预计需要花费 6 分钟才能阅读完成。
传统架构的痛点分析
在开发 Agentic Skill 系统时,很多团队最初会采用传统的分层架构。这种架构看似清晰,但随着技能数量的增加,会遇到两个致命问题:

-
状态爆炸(State Explosion): 每个技能都需要维护自己的状态,当多个技能组合使用时,状态组合呈指数级增长。例如一个对话系统同时处理 NLU、数据库查询和推荐三个技能,就需要管理 3! 种状态流转路径
-
技能耦合(Skill Coupling): 技能之间直接通过函数调用交互,修改一个技能会引发连锁反应。我们曾遇到修改支付技能导致附近 3 个关联技能报错的情况
事件驱动架构设计
核心组件
采用事件总线 (Event Bus) 作为中枢神经,配合三个关键组件:
@startuml
component "Skill Router" as router
component "State Store" as store
component "Event Bus" as bus
database "Skills" as skills
router -> bus : 订阅 / 发布事件
store -> bus : 状态变更事件
bus -> skills : 事件分发
@enduml
- Skill Router:负责技能的路由选择和优先级管理,采用策略模式实现
- State Store:基于 Redis 的有限状态机(FSM),使用 Lua 脚本保证原子性
- Event Bus:选用 NATS 而非 Kafka,因其更适合实时性要求高的场景
Python 实现示例
事件处理器核心代码
from typing import Callable, Any
import asyncio
from dataclasses import dataclass
@dataclass
class SkillEvent:
skill_id: str
payload: bytes
correlation_id: str
class EventHandler:
def __init__(self):
self._listeners: dict[str, list[Callable]] = {}
def add_listener(self,
event_type: str,
callback: Callable[[SkillEvent], Any]) -> None:
"""注册事件监听器,支持类型注解"""
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(callback)
async def dispatch(self, event: SkillEvent) -> None:
"""异步派发事件,包含基础错误处理"""
try:
if event.skill_id in self._listeners:
await asyncio.gather(*[cb(event) for cb in self._listeners[event.skill_id]]
)
except Exception as e:
print(f"Event dispatch failed: {str(e)}")
Protocol Buffers 定义
syntax = "proto3";
message SkillRequest {
string skill_id = 1;
bytes input_payload = 2;
map<string, string> metadata = 3;
}
message SkillResponse {
enum Status {
SUCCESS = 0;
FAILED = 1;
RETRYABLE = 2;
}
Status status = 1;
bytes output = 2;
string error_msg = 3;
}
生产环境实践
性能压测方案
使用 Locust 模拟高并发场景,重点测试事件总线的吞吐量:
from locust import HttpUser, task
class SkillUser(HttpUser):
@task
def trigger_skill(self):
self.client.post("/event", json={
"skill_id": "weather",
"payload": {"city": "Beijing"}
})
def on_start(self):
# 初始化用户会话
self.client.put("/session", json={"user_id": "test"})
技能隔离方案对比
| 方案 | 启动时间 | 安全性 | 资源开销 |
|---|---|---|---|
| Docker | 500ms-2s | 高 | 高 |
| WebAssembly | 50-100ms | 中 | 低 |
| gVisor | 100-300ms | 极高 | 中 |
关键避坑经验
- 事件循环阻塞:
- 避免在回调中执行 CPU 密集型操作
- 使用
loop.run_in_executor处理阻塞 IO - 设置合理的 asyncio 超时(timeout)
- 监控事件循环延迟(loop.slow_callback_duration)
-
定期调用
loop.call_soon保持响应性 -
RBAC 实现要点:
- 技能权限采用最小权限原则
- 使用 JWT claims 携带角色信息
- 资源命名遵循
skill:action:resource格式 - 审计日志记录所有敏感操作
开放性问题
当技能需要跨多个 Kubernetes 集群调度时,我们面临一个经典权衡:
– 强一致性 (Strong Consistency) 会显著增加延迟
– 最终一致性 (Eventual Consistency) 可能导致状态冲突
你们团队是如何解决这个问题的?欢迎在评论区分享实践经验。
正文完
