Python磁盘监控案例有哪些?一文掌握8个实战方案与最佳实践
文章目录导读
- 为何要用Python做磁盘监控?
- 核心监控指标与底层原理
- 基于psutil的轻量级监控
- 跨平台磁盘IO监控(Windows+Linux)
- 实时告警脚本(邮件+短信)
- Web版可视化磁盘仪表盘(Flask+ECharts)
- 日志分析与历史趋势存储(SQLite+CSV)
- 分布式集群磁盘监控(SSH+Paramiko)
- 容器环境下的磁盘监控(Docker API)
- 综合监控平台搭建(Prometheus+Grafana对接)
- 常见问题FAQ
- 总结与最佳实践建议
为何要用Python做磁盘监控?
在现代运维与DevOps体系中,磁盘空间不足或IO性能瓶颈是导致服务宕机的主要原因之一,Python凭借其跨平台兼容性(Windows/Linux/macOS)、丰富的第三方库(如psutil、shutil、os)以及快速开发能力,成为实现磁盘监控的首选语言,根据Stack Overflow 2024调查报告,Python在运维自动化领域的采用率高达68.3%,远超Bash(31.5%)和Go(22.1%)。
问:为什么不直接用系统自带的df或iostat命令?
答:Python方案的优势在于:①与现有监控系统无缝整合 ②支持自定义告警逻辑(如连续三次超过阈值才告警) ③可输出结构化数据(JSON/数据库)供分析工具使用。
核心监控指标与底层原理
在编写脚本前,需要明确磁盘监控的核心维度:
| 指标类型 | 具体参数 | 数据来源(Python库) |
|---|---|---|
| 空间占用 | 总容量、已用容量、可用容量、使用率(%) | psutil.disk_usage()、os.statvfs() |
| IO性能 | 读写速率、IOPS、等待时间 | psutil.disk_io_counters() |
| 文件系统 | inode使用率、挂载点状态 | os.stat()、psutil.disk_partitions() |
| 健康状态 | SMART数据(需额外库pySMART) | pySMART、subprocess调用smartctl |
底层原理:psutil库通过调用系统底层的/proc/diskstats(Linux)或Win32_PerfFormattedData_PerfDisk(Windows)获取原始计数器数据,再计算时间差得到速率值。
案例一:基于psutil的轻量级监控
场景:单机版快速检查磁盘空间,适合集成到Cron任务或脚本中。
import psutil
def disk_monitor(threshold=80):
partitions = psutil.disk_partitions()
for part in partitions:
usage = psutil.disk_usage(part.mountpoint)
percent = usage.percent
if percent > threshold:
print(f"[WARNING] {part.mountpoint} 使用率 {percent}% 超过阈值 {threshold}%")
else:
print(f"[OK] {part.mountpoint}: {percent}%")
# 详细IO情况
io_before = psutil.disk_io_counters(perdisk=True)
time.sleep(1)
io_after = psutil.disk_io_counters(perdisk=True)
for disk, counters in io_after.items():
read_speed = (counters.read_bytes - io_before[disk].read_bytes) / 1024 / 1024
write_speed = (counters.write_bytes - io_before[disk].write_bytes) / 1024 / 1024
print(f"{disk}: 读 {read_speed:.2f} MB/s, 写 {write_speed:.2f} MB/s")
核心点:使用perdisk=True可以获取每块磁盘的独立数据,而不是总和。
问:如何避免在容器中误报?
答:如果运行在Docker容器内,disk_partitions()可能只返回容器挂载点,此时应排除overlay或tmpfs类型分区:[p for p in partitions if 'overlay' not in p.fstype]。
案例二:跨平台磁盘IO监控(Windows+Linux)
场景:跨平台生产环境,统一采集IO数据并输出标准格式。
import psutil, platform
def get_disk_io(interval=1):
system = platform.system()
# 初始统计
prev = psutil.disk_io_counters(perdisk=True)
time.sleep(interval)
cur = psutil.disk_io_counters(perdisk=True)
result = {}
for disk in cur:
# 过滤掉虚拟磁盘(如Linux的loop设备)
if 'loop' in disk: continue
if 'dm-' in disk: continue
read_diff = cur[disk].read_bytes - prev[disk].read_bytes
write_diff = cur[disk].write_bytes - prev[disk].write_bytes
iops_read = cur[disk].read_count - prev[disk].read_count
iops_write = cur[disk].write_count - prev[disk].write_count
result[disk] = {
'read_mb_s': read_diff / (1024*1024) / interval,
'write_mb_s': write_diff / (1024*1024) / interval,
'iops_read': iops_read / interval,
'iops_write': iops_write / interval,
'avg_wait_ms': calculate_wait(cur[disk], prev[disk], interval) # 自定义函数
}
return result
跨平台适配:Linux下需解析/proc/diskstats,Windows下使用WMI,但psutil已统一接口。
案例三:实时告警脚本(邮件+短信)
场景:当磁盘使用率>90%或IO延迟>500ms时,通过SMTP邮件+企业微信机器人告警。
import smtplib, requests
from email.mime.text import MIMEText
def send_alert(subject, body, level='warning'):
# 邮件发送
msg = MIMEText(body)
msg['Subject'] = f'[Disk {level}] {subject}'
with smtplib.SMTP('smtp.xxx.com', 587) as server:
server.login('user@xxx.com', 'password')
server.send_message(msg, from_addr='monitor@xxx.com', to_addrs=['admin@xxx.com'])
# 企业微信机器人(Webhook方式)
webhook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx'
data = {"msgtype": "text", "text": {"content": f"磁盘告警:{subject}\n{body}"}}
requests.post(webhook_url, json=data)
优化建议:加入“静默期”机制,防止同一问题重复告警:if last_alert_time and (time.time()-last_alert_time) < 300: return。
案例四:Web版可视化磁盘仪表盘(Flask+ECharts)
场景:开发一个轻量级Web页面,展示磁盘实时使用率曲线和IO趋势。
from flask import Flask, render_template, jsonify
import psutil, time
app = Flask(__name__)
@app.route('/api/disk_data')
def disk_data():
# 采集5组数据点
data = []
for _ in range(5):
usage = psutil.disk_usage('/')
io = psutil.disk_io_counters()
data.append({
'time': int(time.time()*1000),
'usage': usage.percent,
'io_read': io.read_bytes/1024/1024,
'io_write': io.write_bytes/1024/1024
})
time.sleep(1)
return jsonify(data)
@app.route('/')
def index():
return render_template('dashboard.html')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
前端使用ECharts折线图展示数据,关键代码示例(dashboard.html片段):
setInterval(async () => {
const resp = await fetch('/api/disk_data');
const points = await resp.json();
myChart.setOption({ // 更新图表数据
series: [{
data: points.map(p => p.usage)
}]
});
}, 5000); // 每5秒轮询一次
案例五:日志分析与历史趋势存储(SQLite+CSV)
场景:每10分钟记录磁盘状态到SQLite数据库,用于生成周报。
import sqlite3, csv, datetime
def init_db():
conn = sqlite3.connect('disk_monitor.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS disk_stats
(id INTEGER PRIMARY KEY, timestamp TEXT, mount_point TEXT,
total_gb REAL, used_gb REAL, percent REAL)''')
conn.commit()
return conn
def record_stats(conn):
for part in psutil.disk_partitions():
usage = psutil.disk_usage(part.mountpoint)
c = conn.cursor()
c.execute("INSERT INTO disk_stats VALUES (?,?,?,?,?,?,?)",
(None, datetime.now().isoformat(), part.mountpoint,
usage.total/1024**3, usage.used/1024**3, usage.percent))
conn.commit()
# 导出CSV进行离线分析
def export_csv(conn, filename='disk_report.csv'):
c = conn.cursor()
c.execute("SELECT * FROM disk_stats WHERE timestamp > datetime('now', '-7 days')")
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id','timestamp','mount','total_gb','used_gb','percent'])
writer.writerows(c.fetchall())
问:SQLite能否承受频繁写入?
答:10分钟一次写入对小规模环境完全足够,若需更高频率(如秒级),建议改用InfluxDB或TimescaleDB。
案例六:分布式集群磁盘监控(SSH+Paramiko)
场景:通过SSH批量获取10台以上服务器的磁盘信息,使用Paramiko库。
import paramiko
def remote_disk_check(hosts, username, pkey_path='/home/ops/.ssh/id_rsa'):
script = '''
import psutil, json
result = [{'mount': p.mountpoint, 'percent': psutil.disk_usage(p.mountpoint).percent} for p in psutil.disk_partitions()]
print(json.dumps(result))
'''
results = {}
for host in hosts:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username=username, key_filename=pkey_path)
stdin, stdout, stderr = client.exec_command(f"python3 -c '{script}'")
output = stdout.read().decode()
results[host] = json.loads(output) if output else []
client.close()
return results
注意:使用python3 -c直接执行字符串,避免在远程服务器创建临时文件,生产环境建议用Fabric或Ansible替代原生Paramiko。
案例七:容器环境下的磁盘监控(Docker API)
场景:监控Docker容器内部磁盘使用情况(非宿主机)。
import docker
client = docker.from_env()
for container in client.containers.list():
stats = container.stats(stream=False) # 单次统计
blkio_stats = stats.get('blkio_stats', {})
if blkio_stats:
# 读取每个device的IO累计值
for device in blkio_stats.get('io_service_bytes_recursive', []):
print(f"Container {container.short_id} - Device {device['op']}: {device['value']} bytes")
# 注意:容器内磁盘使用需通过exec执行df命令
exit_code, output = container.exec_run('df -h /data')
if exit_code == 0:
print(output.decode())
坑点:容器的统计数据是自容器启动以来的累计值,需要自己计算速率差。
案例八:综合监控平台搭建(Prometheus+Grafana对接)
场景:将Python采集的数据暴露为Prometheus Metrics,用Grafana展示。
from prometheus_client import start_http_server, Gauge
import psutil, time
# 定义指标
disk_usage = Gauge('disk_usage_percent', 'Disk usage %', ['mountpoint'])
disk_read_speed = Gauge('disk_read_bytes_per_sec', 'Read speed', ['device'])
disk_write_speed = Gauge('disk_write_bytes_per_sec', 'Write speed', ['device'])
def collect():
# 采集空间
for part in psutil.disk_partitions():
usage = psutil.disk_usage(part.mountpoint)
disk_usage.labels(mountpoint=part.mountpoint).set(usage.percent)
if __name__ == '__main__':
start_http_server(8000) # 暴露metrics接口
while True:
collect()
time.sleep(10)
之后在prometheus.yml中加入- targets: ['你的IP:8000'],Grafana中导入ID为1860的磁盘监控模板(官方推荐)。
常见问题FAQ
Q1:如何在Windows上获取磁盘型号和序列号?
A:使用wmi库:import wmi; c = wmi.WMI(); for disk in c.Win32_DiskDrive(): print(disk.Model, disk.SerialNumber)。
Q2:采集数据占用CPU过高怎么办?
A:psutil.disk_io_counters()本身不占用资源,但disk_partitions()在NFS挂载点过多时可能阻塞,可增加超时参数:psutil.disk_partitions(all=False)。
Q3:怎么监控网络文件系统(NFS/CIFS)的延迟?
A:使用subprocess执行mountstats命令,或挂载时启用stat=0(Linux)禁用客户端延迟统计。
Q4:是否有现成的Python磁盘监控开源项目?
A:推荐psutil官方示例(GitHub psutil/examples)、Telegraf(Go编写,但支持Python插件)、Glances(Python全栈监控)。
总结与最佳实践建议
综合以上8个案例,我们覆盖了从单机脚本到分布式集群、从终端告警到Grafana可视化的完整链路,在生成文章时,建议遵循以下原则:
- 轻量化优先:除非需要长时间历史分析,否则避免使用重量级数据库,SQLite或InfluxDB足够。
- 异常降级:当远程服务器SSH连接失败时,记录日志而非中断整个监控流程。
- 安全合规:不要在代码中硬编码密码,使用环境变量或Vault。
- IO采样时机:IO统计需要至少间隔0.5秒以上采样两次计算差值,单次数据意义不大。
- 缓存挂载点列表:频繁调用
disk_partitions()可能产生系统调用开销,可每5分钟缓存一次。
一个生产级别的磁盘监控系统,不应该只关注“报警时”的处理,更应该通过历史趋势分析预测磁盘扩容时间点——这正是Python数据分析库(如Pandas、NumPy)可以大显身手的地方,希望本文整理的8个案例能帮助你快速搭建适合自己业务的磁盘监控体系。
标签: Python