Python数据去重爬取实操?

wen python案例 1

我来详细介绍Python数据去重爬取的实操方法,包括多种去重策略的实现。

基础去重方案

使用集合(Set)去重

import requests
from urllib.parse import urlparse
class BasicDedupCrawler:
    def __init__(self):
        self.visited_urls = set()
        self.visited_contents = set()
    def crawl(self, url):
        """基础URL去重爬取"""
        if url in self.visited_urls:
            print(f"跳过已访问URL: {url}")
            return None
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            # 标记URL为已访问
            self.visited_urls.add(url)
            return response.text
        except Exception as e:
            print(f"爬取失败 {url}: {e}")
            return None
    def crawl_dedup_content(self, url):
        """内容去重爬取"""
        content = self.crawl(url)
        if content:
            # 使用内容哈希进行去重
            content_hash = hash(content)
            if content_hash not in self.visited_contents:
                self.visited_contents.add(content_hash)
                return content
        return None

基于Bloom Filter的高效去重

import hashlib
import mmh3
from bitarray import bitarray
class BloomFilter:
    def __init__(self, size=1000000, hash_count=7):
        self.size = size
        self.hash_count = hash_count
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
    def add(self, item):
        """添加元素到布隆过滤器"""
        for i in range(self.hash_count):
            # 使用不同的哈希函数
            digest = hashlib.md5(f"{item}{i}".encode()).hexdigest()
            index = int(digest, 16) % self.size
            self.bit_array[index] = 1
    def contains(self, item):
        """检查元素是否存在"""
        for i in range(self.hash_count):
            digest = hashlib.md5(f"{item}{i}".encode()).hexdigest()
            index = int(digest, 16) % self.size
            if self.bit_array[index] == 0:
                return False
        return True
class BloomFilterCrawler:
    def __init__(self):
        self.bloom = BloomFilter()
        self.url_count = 0
    def should_crawl(self, url):
        """判断是否需要爬取"""
        if self.bloom.contains(url):
            return False
        self.bloom.add(url)
        self.url_count += 1
        return True

基于数据库的持久化去重

import sqlite3
import hashlib
from datetime import datetime
class DatabaseDedupCrawler:
    def __init__(self, db_path='crawler.db'):
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
    def create_tables(self):
        """创建去重表"""
        cursor = self.conn.cursor()
        # URL去重表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS visited_urls (
                url_hash TEXT PRIMARY KEY,
                url TEXT NOT NULL,
                visited_time TIMESTAMP
            )
        ''')
        # 内容去重表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS content_hashes (
                content_hash TEXT PRIMARY KEY,
                url TEXT,
                first_seen TIMESTAMP
            )
        ''')
        self.conn.commit()
    def is_url_visited(self, url):
        """检查URL是否已访问"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        cursor = self.conn.execute(
            "SELECT 1 FROM visited_urls WHERE url_hash = ?",
            (url_hash,)
        )
        return cursor.fetchone() is not None
    def mark_url_visited(self, url):
        """标记URL为已访问"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        self.conn.execute(
            "INSERT OR IGNORE INTO visited_urls VALUES (?, ?, ?)",
            (url_hash, url, datetime.now())
        )
        self.conn.commit()
    def is_content_duplicate(self, content):
        """检查内容是否重复"""
        content_hash = hashlib.md5(content.encode()).hexdigest()
        cursor = self.conn.execute(
            "SELECT 1 FROM content_hashes WHERE content_hash = ?",
            (content_hash,)
        )
        return cursor.fetchone() is not None

完整的多线程去重爬虫示例

import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from collections import deque
import hashlib
import time
class AdvancedDedupCrawler:
    def __init__(self, max_threads=5, max_pages=100):
        self.max_threads = max_threads
        self.max_pages = max_pages
        self.visited_urls = set()
        self.visited_content_hashes = set()
        self.url_queue = deque()
        self.url_lock = threading.Lock()
        self.content_lock = threading.Lock()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    def extract_links(self, html, base_url):
        """提取页面中的链接"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            # 构建完整URL
            full_url = urljoin(base_url, href)
            # 只保留同域名的链接
            if urlparse(full_url).netloc == urlparse(base_url).netloc:
                links.append(full_url)
        return links
    def is_duplicate(self, url, content):
        """综合去重判断"""
        with self.url_lock:
            if url in self.visited_urls:
                return True, "URL已访问"
        # 内容去重
        content_hash = hashlib.md5(content.encode()).hexdigest()
        with self.content_lock:
            if content_hash in self.visited_content_hashes:
                return True, "内容重复"
        return False, None
    def process_page(self, url):
        """处理单个页面"""
        try:
            response = self.session.get(url, timeout=10)
            response.encoding = response.apparent_encoding
            if response.status_code != 200:
                return None
            content = response.text
            # 去重检查
            is_dup, reason = self.is_duplicate(url, content)
            if is_dup:
                print(f"跳过 [{reason}]: {url}")
                return None
            # 标记为已访问
            with self.url_lock:
                self.visited_urls.add(url)
            with self.content_lock:
                self.visited_content_hashes.add(
                    hashlib.md5(content.encode()).hexdigest()
                )
            print(f"爬取成功 [{len(self.visited_urls)}]: {url}")
            # 提取新链接
            new_links = self.extract_links(content, url)
            with self.url_lock:
                for link in new_links:
                    if link not in self.visited_urls:
                        self.url_queue.append(link)
            return content
        except Exception as e:
            print(f"爬取失败 {url}: {e}")
            return None
    def crawl(self, start_url):
        """主爬取逻辑"""
        self.url_queue.append(start_url)
        crawled_count = 0
        with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
            future_to_url = {}
            while (self.url_queue or future_to_url) and crawled_count < self.max_pages:
                # 提交新任务
                while self.url_queue and len(future_to_url) < self.max_threads:
                    url = self.url_queue.popleft()
                    future = executor.submit(self.process_page, url)
                    future_to_url[future] = url
                # 处理完成的任务
                if future_to_url:
                    completed_futures = []
                    for future in as_completed(future_to_url, timeout=5):
                        url = future_to_url[future]
                        try:
                            result = future.result()
                            if result:
                                crawled_count += 1
                        except Exception as e:
                            print(f"任务异常 {url}: {e}")
                        completed_futures.append(future)
                    # 清理已完成的任务
                    for future in completed_futures:
                        del future_to_url[future]
        print(f"爬取完成,共爬取 {crawled_count} 个页面")
    def get_statistics(self):
        """获取统计信息"""
        return {
            "visited_urls": len(self.visited_urls),
            "unique_contents": len(self.visited_content_hashes),
            "queue_size": len(self.url_queue)
        }
# 使用示例
if __name__ == "__main__":
    # 初始化爬虫
    crawler = AdvancedDedupCrawler(max_threads=3, max_pages=50)
    # 开始爬取
    start_url = "https://example.com"
    crawler.crawl(start_url)
    # 获取统计
    stats = crawler.get_statistics()
    print(f"统计信息: {stats}")

基于Redis的分布式去重

import redis
import hashlib
import json
class RedisDedupCrawler:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.StrictRedis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        self.url_set_key = 'crawler:visited_urls'
        self.content_set_key = 'crawler:content_hashes'
    def add_url(self, url):
        """添加URL到已访问集合"""
        url_hash = hashlib.sha256(url.encode()).hexdigest()
        return self.redis_client.sadd(self.url_set_key, url_hash)
    def is_url_visited(self, url):
        """检查URL是否已访问"""
        url_hash = hashlib.sha256(url.encode()).hexdigest()
        return self.redis_client.sismember(self.url_set_key, url_hash)
    def add_content_hash(self, content):
        """添加内容哈希到集合"""
        content_hash = hashlib.md5(content.encode()).hexdigest()
        return self.redis_client.sadd(self.content_set_key, content_hash)
    def is_content_duplicate(self, content):
        """检查内容是否重复"""
        content_hash = hashlib.md5(content.encode()).hexdigest()
        return self.redis_client.sismember(self.content_set_key, content_hash)
    def get_crawled_count(self):
        """获取已爬取URL数量"""
        return self.redis_client.scard(self.url_set_key)

去重策略最佳实践

多层级去重

class MultiLayerDedup:
    def __init__(self):
        # 内存缓存(最快)
        self.memory_cache = set()
        # Bloom Filter(节省内存)
        self.bloom_filter = BloomFilter()
        # 数据库(持久化)
        self.db = DatabaseDedupCrawler()
        # Redis(分布式)
        self.redis = None  # 可选
    def check_all(self, url, content=None):
        """多层级去重检查"""
        # 1. 内存缓存检查(最快)
        if url in self.memory_cache:
            return True, "内存缓存命中"
        # 2. Bloom Filter检查
        if self.bloom_filter.contains(url):
            # 进一步确认
            if self.db.is_url_visited(url):
                self.memory_cache.add(url)
                return True, "Bloom Filter命中"
        # 3. 内容去重
        if content:
            if self.db.is_content_duplicate(content):
                return True, "内容重复"
        return False, None
    def mark_visited(self, url, content=None):
        """标记为已访问"""
        # 1. 内存缓存
        self.memory_cache.add(url)
        # 2. Bloom Filter
        self.bloom_filter.add(url)
        # 3. 数据库持久化
        self.db.mark_url_visited(url)
        if content:
            self.db.mark_content_visited(content)

实用建议

  1. 选择合适的去重策略

    • 小规模爬虫:使用集合(Set)即可
    • 大规模爬虫:Bloom Filter + 数据库
    • 分布式系统:Redis + Bloom Filter
  2. URL规范化

    from urllib.parse import urlparse, urlunparse

def normalize_url(url): """URL规范化""" parsed = urlparse(url)

去除fragment

parsed = parsed._replace(fragment='')
# 去除末尾斜杠
path = parsed.path.rstrip('/')
parsed = parsed._replace(path=path)
# 转换为小写
return urlunparse(parsed).lower()

3. **性能优化**:
   - 使用缓冲和批量操作
   - 选择合适的哈希算法
   - 合理设置Bloom Filter参数
这个实操指南提供了从简单到复杂的去重方案,你可以根据实际需求选择合适的策略。

标签: 爬虫实操

抱歉,评论功能暂时关闭!