使用 aiohttp asyncio 实现异步请求

openclaw AI使用帮助 2

OpenClaw 作为一个网络爬虫/抓取工具,速度优化可以从多个层面进行,以下是一些关键的优化策略:

使用 aiohttp asyncio 实现异步请求-第1张图片-AI小龙虾下载官网 - openclaw下载 - openclaw小龙虾

并发与异步优化

import asyncio
from aiohttp import TCPConnector
async def fetch(session, url, semaphore):
    async with semaphore:  # 限制并发数
        async with session.get(url, timeout=10) as response:
            return await response.text()
async def main(urls):
    connector = TCPConnector(
        limit=100,  # 连接池限制
        limit_per_host=20,  # 每个host限制
        enable_cleanup_closed=True
    )
    semaphore = asyncio.Semaphore(50)  # 控制并发数
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

连接池与重用

import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
session = requests.Session()
adapter = HTTPAdapter(
    pool_connections=100,    # 连接池大小
    pool_maxsize=100,        # 最大连接数
    max_retries=3,           # 重试次数
    pool_block=False         # 不阻塞连接池
)
session.mount('http://', adapter)
session.mount('https://', adapter)

DNS缓存优化

# 使用 dnspython 缓存DNS解析
import dns.resolver
import functools
import time
@functools.lru_cache(maxsize=1024)
def cached_dns_lookup(domain):
    resolver = dns.resolver.Resolver()
    resolver.timeout = 2
    resolver.lifetime = 2
    return [str(r) for r in resolver.resolve(domain, 'A')]
# 或者在系统中配置DNS缓存
# /etc/hosts 或使用本地DNS缓存服务

请求批量化处理

from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
class BatchProcessor:
    def __init__(self, max_workers=10, batch_size=100):
        self.max_workers = max_workers
        self.batch_size = batch_size
    def process_batch(self, urls):
        """批量处理URL,减少连接建立开销"""
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self.fetch_url, url): url for url in urls}
            results = []
            for future in as_completed(futures):
                results.append(future.result())
        return results

缓存策略

# 使用 Redis 或内存缓存已访问的URL
import redis
from hashlib import md5
class URLCache:
    def __init__(self, redis_host='localhost', ttl=3600):
        self.redis = redis.Redis(host=redis_host, decode_responses=True)
        self.ttl = ttl
    def get_cache_key(self, url):
        return f"url_cache:{md5(url.encode()).hexdigest()}"
    def should_skip(self, url):
        """检查URL是否已处理"""
        key = self.get_cache_key(url)
        return self.redis.exists(key)
    def mark_processed(self, url):
        """标记URL已处理"""
        key = self.get_cache_key(url)
        self.redis.setex(key, self.ttl, '1')

解析优化

# 使用 lxml 代替 BeautifulSoup(快5-10倍)
from lxml import html, etree
import cchardet  # 比chardet更快
def fast_parse(html_content):
    # 快速检测编码
    encoding = cchardet.detect(html_content)['encoding']
    if encoding:
        html_content = html_content.decode(encoding, errors='ignore')
    # 使用lxml解析
    tree = html.fromstring(html_content)
    # 使用XPath代替CSS选择器(更快)
    links = tree.xpath('//a[@href]/@href')s = tree.xpath('//title/text()')
    return links, titles

配置文件优化

# config.yaml
crawler:
  max_concurrent: 50
  delay: 0.1  # 请求延迟(秒)
  timeout: 10
  retry_times: 3
  user_agent: "Mozilla/5.0"
  # 连接池设置
  connection_pool:
    size: 100
    max_per_host: 20
  # 缓存设置
  cache:
    enabled: true
    ttl: 3600
    backend: "redis"

性能监控与调优

import time
from collections import defaultdict
import psutil
class PerformanceMonitor:
    def __init__(self):
        self.stats = defaultdict(list)
        self.start_time = time.time()
    def record(self, metric, value):
        self.stats[metric].append(value)
    def get_report(self):
        report = {
            'total_requests': len(self.stats['request_time']),
            'avg_response_time': sum(self.stats['request_time']) / max(len(self.stats['request_time']), 1),
            'success_rate': sum(self.stats['success']) / max(len(self.stats['success']), 1),
            'memory_usage': psutil.Process().memory_info().rss / 1024 / 1024,  # MB
            'total_time': time.time() - self.start_time
        }
        return report

分布式爬虫架构

# 使用 Celery 或 Redis Queue 实现分布式
from celery import Celery
app = Celery('crawler', broker='redis://localhost:6379/0')
@app.task(queue='crawler_tasks', rate_limit='100/m')
def crawl_task(url):
    # 抓取任务
    return fetch_and_parse(url)
# 启动多个worker并行处理
# celery -A crawler worker --concurrency=10 -Q crawler_tasks

实用优化技巧

延迟优化

import random
from asyncio import sleep
# 随机延迟避免被屏蔽
async def smart_delay():
    base_delay = 0.1
    jitter = random.uniform(-0.05, 0.05)
    await sleep(max(base_delay + jitter, 0.05))

头部优化

headers = {
    'Accept-Encoding': 'gzip, deflate, br',  # 启用压缩
    'Connection': 'keep-alive',  # 保持连接
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}

日志优化

import logging
logging.getLogger("urllib3").setLevel(logging.WARNING)  # 减少日志输出
logging.getLogger("asyncio").setLevel(logging.WARNING)

建议的优化优先级:

  1. 第一优先级:启用并发/异步(通常能提升10-100倍速度)
  2. 第二优先级:优化解析器(使用lxml代替BeautifulSoup)
  3. 第三优先级:实施缓存策略
  4. 第四优先级:优化网络连接(连接池、DNS缓存)
  5. 第五优先级:分布式部署

根据你的具体使用场景(单机抓取 vs 分布式抓取、数据量大小、目标网站限制等),选择合适的优化组合,建议先进行性能测试,找出瓶颈再进行针对性优化。

标签: aiohttp asyncio

抱歉,评论功能暂时关闭!