本文目录导读:
我来介绍几种Python增量爬取的实现方法,从简单到复杂逐步说明。
基于时间戳的增量爬取
基础实现
import requests
import json
from datetime import datetime
import time
class TimestampIncrementalSpider:
def __init__(self):
self.last_crawl_time = None
self.state_file = 'crawl_state.json'
self.load_state()
def load_state(self):
"""加载上次爬取时间"""
try:
with open(self.state_file, 'r') as f:
state = json.load(f)
self.last_crawl_time = state.get('last_crawl_time')
except FileNotFoundError:
self.last_crawl_time = None
def save_state(self):
"""保存爬取时间"""
with open(self.state_file, 'w') as f:
json.dump({'last_crawl_time': self.last_crawl_time}, f)
def crawl_news(self, limit=10):
"""爬取新闻示例"""
# 模拟API请求
news_list = [
{'id': 1, 'title': '新闻1', 'time': '2024-01-20 10:00:00'},
{'id': 2, 'title': '新闻2', 'time': '2024-01-20 11:00:00'},
]
new_items = []
for news in news_list:
news_time = datetime.strptime(news['time'], '%Y-%m-%d %H:%M:%S')
# 判断是否是新的内容
if self.last_crawl_time is None or news_time > self.last_crawl_time:
new_items.append(news)
# 更新爬取时间
if new_items:
self.last_crawl_time = max(
datetime.strptime(n['time'], '%Y-%m-%d %H:%M:%S')
for n in new_items
)
self.save_state()
return new_items
# 使用示例
spider = TimestampIncrementalSpider()
new_news = spider.crawl_news()
print(f"新增新闻: {len(new_news)} 条")
基于ID的增量爬取
带ID追踪的实现
import redis
import requests
from bs4 import BeautifulSoup
import json
class IDBasedIncrementalSpider:
def __init__(self):
self.redis_client = redis.StrictRedis(
host='localhost',
port=6379,
decode_responses=True
)
self.crawled_key = 'crawled_ids' # Redis集合key
def is_crawled(self, item_id):
"""检查是否已爬取"""
return self.redis_client.sismember(self.crawled_key, str(item_id))
def mark_crawled(self, item_id):
"""标记为已爬取"""
self.redis_client.sadd(self.crawled_key, str(item_id))
def crawl_new_items(self, url):
"""爬取新项目"""
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
new_items = []
items = soup.select('.item') # 假设CSS选择器
for item in items:
item_id = item.get('data-id')
if not self.is_crawled(item_id):
# 解析内容
title = item.select_one('.title').text
content = item.select_one('.content').text
new_item = {
'id': item_id,
'title': title,
'content': content
}
new_items.append(new_item)
# 标记为已爬取
self.mark_crawled(item_id)
return new_items
# 使用示例
spider = IDBasedIncrementalSpider()
new_data = spider.crawl_new_items('http://example.com/list')
完整的增量爬虫框架
使用Scrapy实现增量爬取
import scrapy
from scrapy_redis.spiders import RedisSpider
import hashlib
import json
class IncrementalSpider(RedisSpider):
"""使用Scrapy-Redis实现的增量爬虫"""
name = 'incremental_spider'
redis_key = 'incremental:start_urls'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.duplicate_filter = set()
def generate_fingerprint(self, item):
"""生成唯一指纹"""
text = f"{item['title']}{item['url']}"
return hashlib.md5(text.encode()).hexdigest()
def parse(self, response):
"""解析响应"""
items = response.css('.item')
for item in items:
data = {
'title': item.css('.title::text').get(),
'url': response.url,
'content': item.css('.content::text').get(),
'time': item.css('.time::text').get()
}
fingerprint = self.generate_fingerprint(data)
# 检查是否重复
if fingerprint not in self.duplicate_filter:
self.duplicate_filter.add(fingerprint)
yield data
def closed(self, reason):
"""爬虫关闭时保存状态"""
with open('crawled_fingerprints.json', 'w') as f:
json.dump(list(self.duplicate_filter), f)
基于数据库的增量方案
使用SQLite存储状态
import sqlite3
import requests
from datetime import datetime
import hashlib
class DatabaseIncrementalSpider:
def __init__(self, db_path='crawl_state.db'):
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""创建数据表"""
cursor = self.conn.cursor()
# 存储爬取状态
cursor.execute('''
CREATE TABLE IF NOT EXISTS crawl_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT UNIQUE,
last_crawl_time TEXT,
last_item_id TEXT
)
''')
# 存储已爬取内容
cursor.execute('''
CREATE TABLE IF NOT EXISTS crawled_items (
fingerprint TEXT PRIMARY KEY,
title TEXT,
url TEXT,
content TEXT,
crawl_time TEXT
)
''')
self.conn.commit()
def get_state(self, source):
"""获取爬取状态"""
cursor = self.conn.cursor()
cursor.execute(
'SELECT last_crawl_time, last_item_id FROM crawl_state WHERE source = ?',
(source,)
)
return cursor.fetchone()
def update_state(self, source, last_crawl_time=None, last_item_id=None):
"""更新爬取状态"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO crawl_state
(source, last_crawl_time, last_item_id)
VALUES (?, ?, ?)
''', (source, last_crawl_time, last_item_id))
self.conn.commit()
def item_exists(self, fingerprint):
"""检查项目是否已存在"""
cursor = self.conn.cursor()
cursor.execute(
'SELECT 1 FROM crawled_items WHERE fingerprint = ?',
(fingerprint,)
)
return cursor.fetchone() is not None
def save_item(self, item):
"""保存新项目"""
content = f"{item['title']}{item['url']}"
fingerprint = hashlib.md5(content.encode()).hexdigest()
if not self.item_exists(fingerprint):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO crawled_items
(fingerprint, title, url, content, crawl_time)
VALUES (?, ?, ?, ?, ?)
''', (
fingerprint,
item['title'],
item['url'],
item['content'],
datetime.now().isoformat()
))
self.conn.commit()
return True
return False
def crawl(self, source_url, parser_func):
"""通用爬取方法"""
response = requests.get(source_url)
items = parser_func(response)
new_count = 0
for item in items:
if self.save_item(item):
new_count += 1
self.update_state(
source=source_url,
last_crawl_time=datetime.now().isoformat()
)
return new_count
# 使用示例
def parse_news(response):
"""解析新闻页面"""
items = []
# 解析逻辑...
return items
spider = DatabaseIncrementalSpider()
new_count = spider.crawl('http://example.com/news', parse_news)
print(f"新增 {new_count} 条内容")
高级增量爬取(支持断点续爬)
import asyncio
import aiohttp
from typing import Set, Dict
import pickle
import os
class AsyncIncrementalSpider:
"""异步增量爬虫,支持断点续爬"""
def __init__(self, state_file='spider_state.pkl'):
self.state_file = state_file
self.visited_urls: Set[str] = set()
self.pending_urls: Set[str] = set()
self.load_state()
def load_state(self):
"""加载爬取状态"""
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'rb') as f:
state = pickle.load(f)
self.visited_urls = state.get('visited', set())
self.pending_urls = state.get('pending', set())
print(f"恢复状态: 已访问 {len(self.visited_urls)}, 待处理 {len(self.pending_urls)}")
except Exception as e:
print(f"加载状态失败: {e}")
def save_state(self):
"""保存爬取状态"""
state = {
'visited': self.visited_urls,
'pending': self.pending_urls,
'timestamp': __import__('time').time()
}
with open(self.state_file, 'wb') as f:
pickle.dump(state, f)
async def fetch(self, session, url):
"""异步获取页面"""
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.text()
return None
except Exception as e:
print(f"请求失败 {url}: {e}")
self.pending_urls.add(url) # 失败后重新加入待处理
return None
async def process_page(self, session, url):
"""处理单个页面"""
if url in self.visited_urls:
return []
html = await self.fetch(session, url)
if not html:
return []
# 解析页面,获取新URL
new_urls = self.extract_urls(html, url)
new_items = self.extract_items(html)
self.visited_urls.add(url)
self.pending_urls.discard(url)
return new_urls, new_items
def extract_urls(self, html, base_url):
"""提取页面中的URL"""
# 简化实现,实际使用BeautifulSoup等
urls = []
# 解析逻辑...
return urls
def extract_items(self, html):
"""提取页面中的数据"""
# 简化实现
items = []
# 解析逻辑...
return items
async def crawl(self, start_urls: list, max_concurrent=10):
"""异步爬取主函数"""
self.pending_urls.update(start_urls)
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
while self.pending_urls:
current_batch = list(self.pending_urls)[:max_concurrent]
tasks = []
for url in current_batch:
task = self.process_page(session, url)
tasks.append(task)
results = await asyncio.gather(*tasks)
for new_urls, items in results:
if new_urls:
for url in new_urls:
if url not in self.visited_urls:
self.pending_urls.add(url)
# 定期保存状态
self.save_state()
print(f"进度: 已访问 {len(self.visited_urls)}, 待处理 {len(self.pending_urls)}")
self.save_state()
print("爬取完成!")
# 使用示例
async def main():
spider = AsyncIncrementalSpider()
await spider.crawl(['http://example.com/page1'])
# 再次运行,会从断点继续
await spider.crawl(['http://example.com/page2'])
# asyncio.run(main())
配置化的增量爬虫
# config.yaml
"""
spider:
name: news_spider
start_urls:
- http://example.com/news
incremental:
type: id # timestamp, id, fingerprint
storage: redis # redis, sqlite, file
check_interval: 300 # seconds
extraction: h1.title
content: div.content
time: span.time
id: attribute(data-id)
"""
import yaml
from typing import Dict, Any
class ConfigurableIncrementalSpider:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.setup_storage()
def setup_storage(self):
"""根据配置设置存储"""
storage_type = self.config['spider']['incremental']['storage']
if storage_type == 'redis':
self.storage = RedisStorage()
elif storage_type == 'sqlite':
self.storage = SQLiteStorage()
else:
self.storage = FileStorage()
def check_if_new(self, item):
"""根据类型检查是否新内容"""
check_type = self.config['spider']['incremental']['type']
if check_type == 'id':
return self.storage.is_new_id(item['id'])
elif check_type == 'timestamp':
return self.storage.is_new_time(item['time'])
elif check_type == 'fingerprint':
return self.storage.is_new_fingerprint(item)
return True
# 简单使用
class SimpleIncrementalSpider:
"""最简增量爬虫实现"""
def __init__(self):
self.seen = set()
def should_crawl(self, url):
if url not in self.seen:
self.seen.add(url)
return True
return False
def crawl(self, url):
if self.should_crawl(url):
# 执行爬取
pass
# 使用示例
spider = SimpleIncrementalSpider()
urls = ['page1', 'page2', 'page1'] # page1重复
for url in urls:
spider.crawl(url) # page1只会被爬取一次
- 去重策略:选择合适的方法(ID、时间戳、指纹)
- 状态持久化:保存爬取状态到文件或数据库
- 异常处理:爬取失败时的重试机制
- 性能优化:批量处理、异步请求
- 断点续爬:支持中断后继续爬取
选择哪种实现方式取决于你的具体需求:
- 简单场景:时间戳+文件存储
- 数据量大:Redis去重
- 需要持久化:数据库方案
- 大规模爬取:Scrapy-Redis框架
标签: 案例实现