多Skill协同开发实战:从零构建高效任务编排系统

2次阅读
没有评论

共计 2475 个字符,预计需要花费 7 分钟才能阅读完成。

image.webp

核心挑战解析

第一次设计多 Skill 系统时,最让我头疼的是这三个问题:

多 Skill 协同开发实战:从零构建高效任务编排系统

  • 依赖管理 :SkillA 需要 SkillB 的输出,但 SkillB 又在等 SkillC,就像团队里互相甩锅的同事
  • 上下文共享 :全局变量乱飞,改个字段名能让五个技能崩溃
  • 异常处理 :一个技能挂掉导致整个流程卡住,还很难定位问题点

技术方案选型

试过三种主流方案后,我的对比结论:

  1. 观察者模式 :适合简单事件通知,但复杂依赖会变成 ” 回调地狱 ”
  2. 工作流引擎 :太重,90% 的功能用不上还影响性能
  3. 状态机 + 事件总线 :最终选择,用状态控制流程,用事件解耦技能

Python 实现关键代码

技能注册中心(线程安全版)

from typing import Dict, Type
from threading import Lock

class SkillRegistry:
    _instance = None
    _lock = Lock()

    def __new__(cls):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
                    cls._skills: Dict[str, Type[BaseSkill]] = {}
        return cls._instance

    def register(self, name: str, skill_cls):
        with self._lock:
            self._skills[name] = skill_cls

使用 Protocol 定义接口

from typing import Protocol, runtime_checkable

@runtime_checkable
class BaseSkill(Protocol):
    async def execute(self, ctx: dict) -> dict:
        ...

    @property
    def timeout(self) -> int:
        ...

上下文深拷贝保护

import copy

class Context:
    def __init__(self, initial: dict):
        self._data = copy.deepcopy(initial)

    def get(self, key: str, default=None):
        return copy.deepcopy(self._data.get(key, default))

    def set(self, key: str, value):
        self._data[key] = copy.deepcopy(value)

性能优化实战

熔断策略实现

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, max_failures=3):
        self.failures = 0
        self.last_failure = None

    async def __call__(self, skill_call):
        if self.last_failure and datetime.now() - self.last_failure < timedelta(minutes=5):
            raise SkillTimeout("Circuit breaker tripped")

        try:
            return await skill_call()
        except Exception:
            self.failures += 1
            self.last_failure = datetime.now()
            if self.failures >= 3:
                raise

执行轨迹优化

改用位图记录执行路径,相比原始 JSON 日志内存占用减少 87%:

class ExecutionTracker:
    def __init__(self, skill_count: int):
        self.bitmap = 0
        self.mask = (1 << skill_count) - 1

    def mark_executed(self, skill_id: int):
        self.bitmap |= (1 << skill_id)

    def is_complete(self) -> bool:
        return (self.bitmap & self.mask) == self.mask

避坑指南

循环依赖检测

用拓扑排序检测技能依赖图中的环:

def check_cyclic(skill_deps: Dict[str, List[str]]) -> bool:
    visited = set()
    path = set()

    def dfs(skill):
        if skill in visited:
            return False
        visited.add(skill)
        path.add(skill)

        for neighbor in skill_deps.get(skill, []):
            if neighbor in path or dfs(neighbor):
                return True

        path.remove(skill)
        return False

    return any(dfs(skill) for skill in skill_deps)

上下文序列化陷阱

实测发现 pickle 比 JSON 慢 15 倍,改用 MessagePack 后的优化方案:

import msgpack

class ContextSerializer:
    @staticmethod
    def serialize(ctx: dict) -> bytes:
        # 过滤不可序列化对象
        clean_ctx = {k: v for k,v in ctx.items() 
                    if isinstance(v, (str, int, float, bool, list, dict))}
        return msgpack.packb(clean_ctx)

    @staticmethod
    def deserialize(data: bytes) -> dict:
        return msgpack.unpackb(data)

开放性问题

目前我们的技能都是启动时加载,如果要实现类似 ” 热插拔 ” 的动态加载,需要考虑:

  1. 如何安全卸载技能而不影响正在执行的流程?
  2. 版本冲突时如何回滚?
  3. 怎样设计权限隔离机制?

欢迎在评论区分享你的解决方案。在实际项目中,我们最终采用了类加载器隔离 + 版本快照的方案,但这又带来了新的 GC 挑战 …

正文完
 0
评论(没有评论)