共计 2367 个字符,预计需要花费 6 分钟才能阅读完成。
背景痛点:为什么你的脚本总在深夜报警?
经历过脚本在凌晨三点崩溃的运维同学都懂,低质量脚本带来的灾难往往是连锁反应。总结这些生产事故,主要存在三类高频问题:

-
硬编码参数泛滥 :IP 地址、API 密钥直接写在脚本里,更换环境时需要全文搜索替换
-
异常处理缺失 :网络波动导致任务失败后,脚本继续执行产生脏数据
-
性能黑洞 :用 Python 循环处理百万行 CSV 文件,却不使用 pandas 向量化操作
更可怕的是,这些脚本往往以 ” 临时解决方案 ” 的名义存活数年,最终成为技术债的定时炸弹。
技术选型:Shell/Python/Node.js 怎么选?
不同场景需要不同武器,这是我们的语言选择矩阵:
- Shell(Bash):适合 200 行内的系统级操作(如日志轮转、服务监控)
- 优势:直接调用系统命令,启动速度快
-
劣势:复杂数据结构处理困难
-
Python:适合数据处理 /API 交互等复杂场景
- 优势:丰富的第三方库,可读性极佳
-
劣势:启动时间较长(约 100ms)
-
Node.js:适合高并发 IO 操作(如批量请求 API)
- 优势:异步非阻塞特性
- 劣势:回调地狱风险
实际项目中,我们常看到 Python 调用 Shell 命令的混合模式,此时务必注意子进程的超时控制。
模块化设计实战
配置文件分离
将变量提取到 YAML 配置文件中,并通过环境变量指定运行环境:
# config.prod.yaml
database:
host: ${DB_HOST}
port: 5432
加载时使用动态替换:
import yaml
from os import environ
config = yaml.safe_load(open('config.prod.yaml'))
db_host = config['database']['host'].replace('${DB_HOST}', environ.get('DB_HOST'))
函数拆分原则
按单一职责拆分函数,每个函数不超过屏幕高度(约 50 行)。例如备份脚本应拆分为:
def connect_db():
# 数据库连接逻辑
def export_data():
# 数据导出逻辑
def upload_to_s3():
# 上传逻辑
def cleanup():
# 临时文件清理
生产级代码示例
Python 重试模板
import time
import random
from typing import Callable
# 装饰器实现指数退避重试
def retry(max_attempts: int = 3, base_delay: float = 1.0):
def decorator(func: Callable):
def wrapper(*args, **kwargs):
attempt = 0
while attempt < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempt += 1
if attempt == max_attempts:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.2)
print(f"Attempt {attempt} failed, retrying in {delay:.2f}s")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=5)
def call_unstable_api():
# 模拟 API 调用
if random.random() > 0.7:
raise ConnectionError("API timeout")
return "success"
Bash 信号处理
#!/bin/bash
cleanup() {
echo "捕获终止信号,清理临时文件..."
rm -f /tmp/temp_*.log
exit 1
}
# 捕获 SIGTERM 和 SIGINT
trap cleanup SIGTERM SIGINT
# 设置 30 秒超时
timeout 30s ./sub_process.sh
if [$? -eq 124]; then
echo "任务执行超时"
cleanup
fi
性能优化对比
测试处理 10 万行 CSV 数据时不同方法的耗时(MBP M1):
| 方法 | 耗时 (s) | 内存峰值 (MB) |
|---|---|---|
| Python 逐行读取 | 3.21 | 45 |
| Pandas chunk 读取 | 1.07 | 120 |
| Rust 实现 | 0.38 | 28 |
对于 CPU 密集型任务,推荐使用 multiprocessing.Pool:
from multiprocessing import Pool
def process_item(item):
# 耗 CPU 的计算
return item * 2
if __name__ == '__main__':
with Pool(4) as p:
results = p.map(process_item, range(100000))
容器化特别注意事项
在 Docker 中运行时,Shell 脚本可能无法正确处理信号,因为:
- 容器内 PID 1 进程需要特殊处理 SIGTERM
- 子进程可能成为僵尸进程
解决方案是使用 dumb-init 或 tini 作为入口点:
FROM alpine
RUN apk add --no-cache dumb-init
ENTRYPOINT ["dumb-init", "--"]
CMD ["/your/script.sh"]
五大常见反模式
- 过度依赖 cron:没有考虑任务执行时长重叠问题
- 忽略僵尸进程 :未使用 wait 清理子进程
- 敏感信息硬编码 :将密钥提交到 Git 仓库
- 无状态检查 :重复运行导致数据重复
- 无资源限制 :内存泄漏拖垮整个服务器
开放性问题
当我们需要在混合环境(Windows/Linux/macOS)中分发脚本时,如何设计统一的执行方案?可以考虑以下方向:
- 使用 Docker 容器封装环境差异
- 开发 CLI 工具自动转换路径分隔符
- 采用 Ansible 等配置管理工具
期待你在实践中找到更适合自己团队的解决方案。
