共计 2475 个字符,预计需要花费 7 分钟才能阅读完成。
核心挑战解析
第一次设计多 Skill 系统时,最让我头疼的是这三个问题:

- 依赖管理 :SkillA 需要 SkillB 的输出,但 SkillB 又在等 SkillC,就像团队里互相甩锅的同事
- 上下文共享 :全局变量乱飞,改个字段名能让五个技能崩溃
- 异常处理 :一个技能挂掉导致整个流程卡住,还很难定位问题点
技术方案选型
试过三种主流方案后,我的对比结论:
- 观察者模式 :适合简单事件通知,但复杂依赖会变成 ” 回调地狱 ”
- 工作流引擎 :太重,90% 的功能用不上还影响性能
- 状态机 + 事件总线 :最终选择,用状态控制流程,用事件解耦技能
Python 实现关键代码
技能注册中心(线程安全版)
from typing import Dict, Type
from threading import Lock
class SkillRegistry:
_instance = None
_lock = Lock()
def __new__(cls):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._skills: Dict[str, Type[BaseSkill]] = {}
return cls._instance
def register(self, name: str, skill_cls):
with self._lock:
self._skills[name] = skill_cls
使用 Protocol 定义接口
from typing import Protocol, runtime_checkable
@runtime_checkable
class BaseSkill(Protocol):
async def execute(self, ctx: dict) -> dict:
...
@property
def timeout(self) -> int:
...
上下文深拷贝保护
import copy
class Context:
def __init__(self, initial: dict):
self._data = copy.deepcopy(initial)
def get(self, key: str, default=None):
return copy.deepcopy(self._data.get(key, default))
def set(self, key: str, value):
self._data[key] = copy.deepcopy(value)
性能优化实战
熔断策略实现
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, max_failures=3):
self.failures = 0
self.last_failure = None
async def __call__(self, skill_call):
if self.last_failure and datetime.now() - self.last_failure < timedelta(minutes=5):
raise SkillTimeout("Circuit breaker tripped")
try:
return await skill_call()
except Exception:
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= 3:
raise
执行轨迹优化
改用位图记录执行路径,相比原始 JSON 日志内存占用减少 87%:
class ExecutionTracker:
def __init__(self, skill_count: int):
self.bitmap = 0
self.mask = (1 << skill_count) - 1
def mark_executed(self, skill_id: int):
self.bitmap |= (1 << skill_id)
def is_complete(self) -> bool:
return (self.bitmap & self.mask) == self.mask
避坑指南
循环依赖检测
用拓扑排序检测技能依赖图中的环:
def check_cyclic(skill_deps: Dict[str, List[str]]) -> bool:
visited = set()
path = set()
def dfs(skill):
if skill in visited:
return False
visited.add(skill)
path.add(skill)
for neighbor in skill_deps.get(skill, []):
if neighbor in path or dfs(neighbor):
return True
path.remove(skill)
return False
return any(dfs(skill) for skill in skill_deps)
上下文序列化陷阱
实测发现 pickle 比 JSON 慢 15 倍,改用 MessagePack 后的优化方案:
import msgpack
class ContextSerializer:
@staticmethod
def serialize(ctx: dict) -> bytes:
# 过滤不可序列化对象
clean_ctx = {k: v for k,v in ctx.items()
if isinstance(v, (str, int, float, bool, list, dict))}
return msgpack.packb(clean_ctx)
@staticmethod
def deserialize(data: bytes) -> dict:
return msgpack.unpackb(data)
开放性问题
目前我们的技能都是启动时加载,如果要实现类似 ” 热插拔 ” 的动态加载,需要考虑:
- 如何安全卸载技能而不影响正在执行的流程?
- 版本冲突时如何回滚?
- 怎样设计权限隔离机制?
欢迎在评论区分享你的解决方案。在实际项目中,我们最终采用了类加载器隔离 + 版本快照的方案,但这又带来了新的 GC 挑战 …
正文完
