共计 2657 个字符,预计需要花费 7 分钟才能阅读完成。
真实案例:低效爬虫引发的血泪史
去年我们团队接手了一个电商价格监控项目,初期使用简单 Requests+BeautifulSoup 方案,结果三天内遭遇:

- 目标网站封禁整个 C 段 IP,导致公司内网无法访问
- 关键商品价格漏抓率高达 37%(动态加载未处理)
- 单机日均仅能采集 2 万条数据(竞品能做到 20 万 +)
这个教训让我们意识到: 没有工程化的爬虫系统就是定时炸弹 。下面分享重构过程中总结的实战经验。
高并发请求管理
同步 vs 异步性能实测
测试环境:阿里云 2 核 4G 服务器,目标站点允许每秒 5 请求
# 同步方案(requests)import requests
def sync_fetch(urls):
for url in urls:
resp = requests.get(url)
# 处理响应...
# 异步方案(aiohttp)import aiohttp
async def async_fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
性能对比(1000 次请求):
| 方案 | 耗时 (s) | CPU 占用 | 网络 IO 等待 |
|---|---|---|---|
| requests | 218 | 15% | 92% |
| aiohttp | 47 | 68% | 31% |
协程池最佳实践
import asyncio
from aiohttp import TCPConnector
class AsyncFetcher:
def __init__(self, max_conn=100):
self.semaphore = asyncio.Semaphore(max_conn)
async def _request(self, session, url):
async with self.semaphore:
try:
async with session.get(url, timeout=10) as resp:
if resp.status == 200:
return await resp.text()
except Exception as e:
print(f"Request failed: {url} - {str(e)}")
return None
async def batch_fetch(self, urls):
connector = TCPConnector(limit=0) # 禁用连接数限制
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self._request(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
关键优化点:
- 使用 Semaphore 控制并发度,避免瞬间爆发请求
- TCPConnector 调整连接池参数
- 统一的超时和异常处理
反爬虫对抗体系
动态 Header 生成器
from faker import Faker
import random
class HeaderFactory:
def __init__(self):
self.faker = Faker()
with open('user_agents.txt') as f:
self.user_agents = [line.strip() for line in f]
def generate(self):
return {'User-Agent': random.choice(self.user_agents),
'Referer': self.faker.url(),
'Accept-Language': 'en-US,en;q=0.9',
'X-Forwarded-For': self.faker.ipv4()}
配套策略:
- 维护 2000+ 条真实 User-Agent
- 每 10 次请求更换 IP(代理池实现)
- 随机请求间隔(0.5s~3s)
数据解析优化
XPath+ 正则混合解析
from lxml import etree
import re
def parse_product(html):
tree = etree.HTML(html)
# XPath 提取主体内容
name = tree.xpath('//h1[@class="title"]/text()')[0].strip()
# 正则处理特殊格式
price_text = tree.xpath('//span[@class="price"]/text()')[0]
price = re.search(r'\d+\.\d{2}', price_text).group()
# 处理 JS 动态数据
script = tree.xpath('//script[contains(.,"window.__DATA__")]/text()')[0]
stock = re.search(r'"stock":(\d+)', script).group(1)
return {
'name': name,
'price': float(price),
'stock': int(stock)
}
生产环境避坑指南
IP 被封应急方案
- 立即切换代理 IP 池
- 降低请求频率至原 1 /10
- 检查是否触犯 robots.txt 规则
- 模拟人工操作(鼠标移动、页面停留)
验证码识别接入
推荐方案优先级:
- 打码平台(如超级鹰)
- 自建 CNN 模型(适合固定样式)
- 人工打码队列(备用)
断点续爬实现
import pickle
class StateManager:
def __init__(self, state_file='crawl_state.pkl'):
self.state_file = state_file
def save(self, urls_done, urls_todo):
with open(self.state_file, 'wb') as f:
pickle.dump({
'done': urls_done,
'todo': urls_todo
}, f)
def load(self):
try:
with open(self.state_file, 'rb') as f:
return pickle.load(f)
except FileNotFoundError:
return {'done': [], 'todo': []}
开放性问题思考
- 分布式调度系统设计要点:
- 如何避免重复抓取?
- 怎样动态调整节点负载?
-
失败任务的重试策略
-
动态页面解决方案:
- Puppeteer/Playwright 无头浏览器
- 逆向分析接口加密逻辑
- WebAssembly hook 技术
最后建议:先用本文方案搭建基础框架,再逐步扩展分布式能力。记住,好的爬虫应该像绅士一样 – 既拿到需要的数据,又不给服务器造成负担。
正文完
