共计 2254 个字符,预计需要花费 6 分钟才能阅读完成。
1. 为什么需要关注工作流设计
刚开始接触 skill 开发时,我最常遇到两个问题:

- 代码像意大利面条:所有逻辑都堆在一个文件里,改个按钮逻辑要翻 300 行代码
- 状态像薛定谔的猫:永远不知道当前流程走到哪一步,用户突然退出后数据全乱
后来发现这些痛点都指向同一个核心问题——缺乏清晰的工作流管理。好的工作流应该像乐高积木,每个环节独立可替换;又要像快递追踪系统,随时能查到当前进度。
2. 技术方案选型指南
2.1 现成方案对比
| 方案 | 适合场景 | 学习成本 | 定制灵活性 |
|---|---|---|---|
| Airflow | 定时任务 / 数据管道 | 中 | 低 |
| Cadence | 复杂业务流程 | 高 | 中 |
| 自建状态机 | 轻量级交互场景 | 低 | 高 |
2.2 为什么推荐自建方案
对新手来说,从简单状态机入手更易理解本质。就像学骑车先用辅助轮,掌握原理后再上专业装备。我们先用 200 行 Python 代码实现核心逻辑,后续再考虑引入框架。
3. 手把手实现基础工作流
3.1 状态机核心结构
class SkillWorkflow:
def __init__(self):
self.current_state = 'INIT'
self.handlers = {
'INIT': self._handle_init,
'CONFIRM': self._handle_confirm,
'COMPLETE': self._handle_complete
}
def process(self, event):
try:
# 记录当前状态快照
logging.info(f'Before {self.current_state} handling')
handler = self.handlers.get(self.current_state)
if not handler:
raise ValueError(f'No handler for {self.current_state}')
# 执行状态处理
return handler(event)
except Exception as e:
logging.error(f'State {self.current_state} failed: {str(e)}')
self._retry_or_fail(event)
3.2 事件驱动实践技巧
- 事件设计原则:
- 每个事件包含
event_type和payload -
使用
uuid保证事件唯一性 -
状态转换示例:
def _handle_confirm(self, event):
if event['type'] == 'USER_APPROVE':
self.current_state = 'COMPLETE'
return {'status': 'approved'}
elif event['type'] == 'USER_REJECT':
self.current_state = 'CANCELLED'
return {'status': 'rejected'}
else:
# 保持当前状态等待有效事件
return {'status': 'waiting'}
4. 必须掌握的优化策略
4.1 日志存储方案对比
- 本地文件 :开发阶段最快,用
logging.basicConfig即可 - ELK 组合:生产环境推荐,注意日志字段设计
- S3+Athena:低成本查询方案,适合中小项目
4.2 智能重试机制
def _retry_or_fail(self, event, max_retries=3):
retry_count = event.get('retry_count', 0)
if retry_count < max_retries:
# 指数退避重试
delay = min(2 ** retry_count, 30)
time.sleep(delay)
event['retry_count'] = retry_count + 1
self.process(event)
else:
self.current_state = 'FAILED'
self._alert_admin(event)
5. 真实项目避坑经验
5.1 幂等性保障三要素
- 为每个操作生成唯一 ID
- 前置检查使用
SELECT FOR UPDATE - 操作结果缓存到 Redis(设置合理 TTL)
5.2 超时任务处理
建议采用双保险策略:
- 客户端超时:设置 10 秒请求超时
- 服务端兜底:启动后台线程扫描超时任务
def scan_timeout_tasks():
while True:
timeout_tasks = db.query(
"""SELECT task_id FROM workflows
WHERE status='RUNNING'
AND updated_at < NOW() - INTERVAL '5 minutes'""")
for task in timeout_tasks:
mark_task_failed(task['task_id'])
time.sleep(60) # 每分钟检查一次
6. 快速上手实践包
我在 GitHub 准备了开箱即用的示例项目:
-
克隆仓库:
git clone https://github.com/example/skill-workflow-demo -
本地测试:
python -m venv venv source venv/bin/activate pip install -r requirements.txt pytest tests/ -
交互演示:
python demo.py --example pizza_order
延伸思考
当你的工作流运行半年后,如何在不中断服务的情况下:
– 新增中间状态步骤?
– 修改现有状态转移逻辑?
– 回滚到上一个稳定版本?
建议从这三个维度设计方案:
1. 版本兼容性字段设计
2. 数据迁移脚本编写
3. 流量切换策略制定
下次我们可以深入探讨工作流的灰度发布方案。
正文完
