Python断点续传案例有哪些?

wen python案例 2

本文目录导读:

  1. HTTP 文件下载断点续传
  2. 使用 urllib 标准库实现
  3. 多线程/多连接断点续传
  4. 文件上传断点续传(大文件分片上传)
  5. 实际生产中的成熟工具/库

Python 断点续传的常见应用场景主要包括文件下载(从 HTTP/FTP 服务器)和文件上传(到支持分片上传的云存储或自定义服务),以下是几个典型的案例和实现方式:

HTTP 文件下载断点续传

这是最经典的场景,核心是利用 HTTP 协议的 Range 头部,告诉服务器只返回文件的一部分。

案例:使用 requests 库实现

import os
import requests
def download_with_resume(url, local_filename):
    """
    支持断点续传的HTTP文件下载
    """
    # 获取本地已下载部分的大小(如果存在)
    first_byte = 0
    if os.path.exists(local_filename):
        first_byte = os.path.getsize(local_filename)
        print(f"发现已下载 {first_byte} 字节,继续断点续传...")
    # 设置Range头部
    headers = {'Range': f'bytes={first_byte}-'}
    try:
        # 发起带Range的请求
        response = requests.get(url, headers=headers, stream=True, timeout=30)
        # 检查服务器是否支持断点续传 (206 Partial Content)
        if response.status_code == 206:
            total_size = int(response.headers.get('content-range', '').split('/')[-1])
            mode = 'ab'  # 追加模式写入
        elif response.status_code == 200:
            # 服务器不支持Range,只能从头下载
            total_size = int(response.headers.get('content-length', 0))
            first_byte = 0
            mode = 'wb'  # 覆盖写入
            print("服务器不支持断点续传,从头开始下载...")
        else:
            response.raise_for_status()
            return
        # 以追加或覆盖模式写入文件
        with open(local_filename, mode) as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
    except requests.exceptions.RequestException as e:
        print(f"下载中断:{e},已下载 {os.path.getsize(local_filename)} 字节,下次可续传")
        return False
    except Exception as e:
        print(f"发生错误:{e}")
        return False
    print("下载完成!")
    return True
# 使用示例
url = "https://example.com/largefile.zip"
download_with_resume(url, "largefile.zip")

关键点:

  • 检查 Content-Range 响应头确认服务器支持。
  • 状态码 206 Partial Content 表示部分内容,200 表示不支持。
  • 使用 'ab' 模式追加写入已存在的部分。

使用 urllib 标准库实现

不使用第三方库,仅用标准库:

import os
import urllib.request
import urllib.error
def download_with_urllib(url, local_filename):
    if os.path.exists(local_filename):
        downloaded = os.path.getsize(local_filename)
    else:
        downloaded = 0
    headers = {'Range': f'bytes={downloaded}-'}
    req = urllib.request.Request(url, headers=headers)
    try:
        response = urllib.request.urlopen(req)
        # 判断支持情况
        if response.status == 206:
            mode = 'ab'
        elif response.status == 200:
            mode = 'wb'
            downloaded = 0
        else:
            raise Exception(f"Unexpected status: {response.status}")
        with open(local_filename, mode) as f:
            while True:
                chunk = response.read(8192)
                if not chunk:
                    break
                f.write(chunk)
    except urllib.error.HTTPError as e:
        print(f"HTTP Error: {e.code}")
        return False
    except Exception as e:
        print(f"Error: {e}")
        return False
    return True

多线程/多连接断点续传

适合大文件,将文件分成多个段,每个线程下载一段,最后合并,这也是很多下载工具(如aria2、IDM)的核心思想。

import os
import requests
import threading
class MultiThreadDownloader:
    def __init__(self, url, filename, num_threads=4):
        self.url = url
        self.filename = filename
        self.num_threads = num_threads
        self.total_size = 0
        self.downloaded = [0] * num_threads
        self.lock = threading.Lock()
    def get_file_size(self):
        response = requests.head(self.url)
        self.total_size = int(response.headers.get('content-length', 0))
        return self.total_size
    def download_part(self, start, end, part_num):
        headers = {'Range': f'bytes={start}-{end}'}
        response = requests.get(self.url, headers=headers, stream=True)
        # 临时文件名
        temp_file = f"{self.filename}.part{part_num}"
        with open(temp_file, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
                    with self.lock:
                        self.downloaded[part_num] += len(chunk)
        progress = sum(self.downloaded) / self.total_size * 100
        print(f"Part {part_num} finished. Overall: {progress:.2f}%")
    def merge_files(self):
        with open(self.filename, 'wb') as final:
            for i in range(self.num_threads):
                temp_file = f"{self.filename}.part{i}"
                with open(temp_file, 'rb') as part:
                    final.write(part.read())
                os.remove(temp_file)
        print("合并完成!")
    def start(self):
        total_size = self.get_file_size()
        part_size = total_size // self.num_threads
        threads = []
        for i in range(self.num_threads):
            start = i * part_size
            end = (i + 1) * part_size - 1 if i < self.num_threads - 1 else total_size - 1
            t = threading.Thread(target=self.download_part, 
                               args=(start, end, i))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        self.merge_files()
# 使用
downloader = MultiThreadDownloader("https://example.com/bigfile.iso", "bigfile.iso", 4)
downloader.start()

注意: 多线程版本需要在代码中加入断点续传的状态保存(例如记录每个分片已下载的字节数到文件),这里为了简化没有完全实现,真正常生产环境需要配合状态文件记录每个分片的进度。


文件上传断点续传(大文件分片上传)

上传场景通常配合云存储服务(如阿里云OSS、腾讯云COS、AWS S3)的官方SDK,或者自定义的分片上传API。

简化版分片上传逻辑:

import os
import hashlib
import requests
def upload_with_resume(file_path, upload_url, chunk_size=5*1024*1024):
    """
    模拟大文件分片上传(需要服务端支持)
    客户端记录已上传的分片,跳过已完成的分片。
    """
    file_size = os.path.getsize(file_path)
    total_chunks = (file_size + chunk_size - 1) // chunk_size
    # 假设我们从服务端获取已上传的分片列表
    uploaded_chunks = []  # 实际中需要调用接口获取 [1, 3, 5 ...]
    with open(file_path, 'rb') as f:
        for chunk_index in range(total_chunks):
            if chunk_index in uploaded_chunks:
                continue  # 跳过已上传的分片
            # 计算分片偏移量
            start = chunk_index * chunk_size
            f.seek(start)
            # 读取当前分片数据
            if chunk_index == total_chunks - 1:
                data = f.read()  # 最后一片
            else:
                data = f.read(chunk_size)
            # 计算MD5用于校验
            md5 = hashlib.md5(data).hexdigest()
            # 构造分片上传请求
            files = {'file': (f'chunk_{chunk_index}', data)}
            params = {
                'chunk': chunk_index,
                'md5': md5,
                'total_chunks': total_chunks,
                'filename': os.path.basename(file_path)
            }
            try:
                resp = requests.post(upload_url, files=files, data=params, timeout=60)
                if resp.status_code == 200:
                    print(f"分片 {chunk_index+1}/{total_chunks} 上传成功")
                else:
                    print(f"分片 {chunk_index+1} 失败: {resp.text}")
                    # 记录失败,可以重试
                    raise Exception("上传失败")
            except Exception as e:
                print(f"连接中断,分片 {chunk_index+1} 未完成")
                return False  # 下次启动会重新上传未完成的分片
    # 通知服务器合并分片
    requests.post(upload_url + "/merge", params={'filename': os.path.basename(file_path)})
    print("上传完成!")
    return True

实际生产中的成熟工具/库

如果需要快速实现断点续传,可以直接使用成熟的下载工具或库:

工具/库 场景 特点
aria2 (命令行) 下载 支持多连接、断点续传、BT协议
pySmartDL 下载 Python库,多线程,自动断点续传
wget 下载 -c 参数直接支持
boto3 (AWS SDK) S3上传 自动分片上传、断点续传
oss2 (阿里云OSS SDK) OSS上传/下载 内置断点续传功能

aria2示例:

# 直接支持断点续传
aria2c -c -x 4 -s 4 "https://example.com/file.zip"
# -c: 续传  -x: 连接数  -s: 分片数

pySmartDL示例:

from pySmartDL import SmartDL
obj = SmartDL("https://example.com/bigfile.iso", "bigfile.iso", threads=10)
obj.start()

场景 核心原理 建议方案
HTTP下载 Range 头部 requests + Range
多线程下载 分片 + 合片 上面第3个案例
云存储上传 服务端分片API 使用官方SDK
快速开发 现有工具 aria2 / pySmartDL

实际开发中,建议优先使用成熟库(如 pySmartDL)或系统工具(aria2),避免重复造轮子,如果必须自定义,记得处理好状态保存(记录已下载/上传的偏移量)、异常恢复并发控制

标签: Python

抱歉,评论功能暂时关闭!