本文目录导读:
我来帮你实现一个Python端口扫描器,这里提供两个版本:基础版和进阶版。
基础版端口扫描器
import socket
import threading
from datetime import datetime
class PortScanner:
def __init__(self, target, start_port=1, end_port=1024, threads=100):
self.target = target
self.start_port = start_port
self.end_port = end_port
self.threads = threads
self.open_ports = []
self.lock = threading.Lock()
def scan_port(self, port):
"""扫描单个端口"""
try:
# 创建socket连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置超时时间
# 尝试连接
result = sock.connect_ex((self.target, port))
if result == 0: # 端口开放
with self.lock:
self.open_ports.append(port)
sock.close()
except socket.gaierror:
print("主机名解析错误")
except socket.error:
print("连接错误")
def scan_ports(self):
"""并行扫描端口"""
print(f"开始扫描 {self.target}:{self.start_port}-{self.end_port}")
print(f"扫描开始时间: {datetime.now()}")
# 创建线程
threads = []
for port in range(self.start_port, self.end_port + 1):
thread = threading.Thread(target=self.scan_port, args=(port,))
threads.append(thread)
# 控制线程数量
if len(threads) >= self.threads:
for t in threads:
t.start()
for t in threads:
t.join()
threads = []
# 处理剩余线程
for t in threads:
t.start()
for t in threads:
t.join()
print(f"扫描结束时间: {datetime.now()}")
return self.open_ports
# 使用示例
if __name__ == "__main__":
# 扫描目标
target = "127.0.0.1" # 可改为实际IP
scanner = PortScanner(target, 1, 1000, 50)
open_ports = scanner.scan_ports()
if open_ports:
print(f"开放的端口: {sorted(open_ports)}")
else:
print("没有发现开放端口")
进阶版端口扫描器
import socket
import threading
import queue
import time
import sys
from datetime import datetime
from typing import List, Optional
class AdvancedPortScanner:
"""高级端口扫描器"""
def __init__(self, target: str, timeout: float = 1.0, max_threads: int = 200):
self.target = target
self.timeout = timeout
self.max_threads = max_threads
self.open_ports = []
self.port_queue = queue.Queue()
self.lock = threading.Lock()
self.progress = 0
def get_service_name(self, port: int) -> str:
"""获取常见端口服务名称"""
common_ports = {
21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP',
53: 'DNS', 80: 'HTTP', 110: 'POP3', 143: 'IMAP',
443: 'HTTPS', 445: 'SMB', 993: 'IMAPS', 995: 'POP3S',
1433: 'MSSQL', 1521: 'Oracle', 3306: 'MySQL',
3389: 'RDP', 5432: 'PostgreSQL', 6379: 'Redis',
8080: 'HTTP-Proxy', 8443: 'HTTPS-Alt'
}
return common_ports.get(port, f'Port-{port}')
def scan_single_port(self, port: int) -> Optional[int]:
"""扫描单个端口"""
try:
# TCP连接扫描
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
# 尝试TCP连接
result = sock.connect_ex((self.target, port))
sock.close()
if result == 0:
# 获取服务banner
banner = self.get_banner(port)
return (port, banner)
except Exception as e:
return None
return None
def get_banner(self, port: int) -> Optional[str]:
"""获取服务banner"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.target, port))
# 发送探针
sock.send(b"\r\n")
# 接收banner
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
sock.close()
return banner[:50] if banner else None
except:
return None
def worker(self):
"""工作线程"""
while True:
try:
port = self.port_queue.get_nowait()
result = self.scan_single_port(port)
if result:
port_num, banner = result
service = self.get_service_name(port_num)
with self.lock:
self.open_ports.append({
'port': port_num,
'service': service,
'banner': banner
})
# 实时输出
print(f"\r[+] 发现开放端口: {port_num}/{service}" +
(f" (Banner: {banner})" if banner else ""))
# 更新进度
with self.lock:
self.progress += 1
self.print_progress()
self.port_queue.task_done()
except queue.Empty:
break
except Exception as e:
with self.lock:
self.port_queue.task_done()
def print_progress(self):
"""打印扫描进度"""
sys.stdout.write(f"\r扫描进度: {self.progress}/{self.total_ports} "
f"({(self.progress/self.total_ports)*100:.1f}%)")
sys.stdout.flush()
def scan_range(self, start_port: int = 1, end_port: int = 1024):
"""扫描端口范围"""
print(f"===== 端口扫描器 =====")
print(f"目标: {self.target}")
print(f"端口范围: {start_port}-{end_port}")
print(f"最大线程数: {self.max_threads}")
print(f"开始时间: {datetime.now()}\n")
# 压入端口到队列
self.total_ports = end_port - start_port + 1
for port in range(start_port, end_port + 1):
self.port_queue.put(port)
# 启动线程
threads = []
for _ in range(min(self.max_threads, self.total_ports)):
thread = threading.Thread(target=self.worker)
thread.daemon = True
thread.start()
threads.append(thread)
# 等待队列处理完成
self.port_queue.join()
# 等待所有线程结束
for thread in threads:
thread.join()
print(f"\n\n扫描结束时间: {datetime.now()}")
# 输出结果
if self.open_ports:
print("\n=== 扫描结果 ===")
print(f"{'端口':<8} {'服务':<15} {'Banner':<30}")
print("-" * 55)
for port_info in sorted(self.open_ports, key=lambda x: x['port']):
print(f"{port_info['port']:<8} {port_info['service']:<15} "
f"{str(port_info['banner'] or ''):<30}")
else:
print("\n没有发现开放端口")
return self.open_ports
def scan_specific_ports(self, ports: List[int]):
"""扫描特定端口列表"""
return self.scan_range(min(ports), max(ports))
# 使用示例
def main():
# 基础扫描
print("1. 基础端口扫描")
basic_scanner = PortScanner("127.0.0.1", 1, 500)
open_ports = basic_scanner.scan_ports()
print(f"开放端口: {open_ports}")
print("\n" + "="*50 + "\n")
# 高级扫描
print("2. 高级端口扫描")
advanced_scanner = AdvancedPortScanner(
target="127.0.0.1",
timeout=1.0,
max_threads=100
)
# 扫描常见端口
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 445,
993, 995, 1433, 1521, 3306, 3389, 5432, 6379, 8080, 8443]
print("扫描常见端口:")
advanced_scanner.scan_specific_ports(common_ports)
if __name__ == "__main__":
main()
使用说明
安装依赖
基础版不需要额外安装,使用标准库即可。
基本用法
# 基础扫描
scanner = PortScanner("192.168.1.1", 1, 1000, 50)
open_ports = scanner.scan_ports()
# 高级扫描
scanner = AdvancedPortScanner("example.com", timeout=2.0)
results = scanner.scan_range(1, 1024)
主要特性
- 多线程扫描:提高扫描速度
- 超时控制:避免卡死
- 服务识别:自动识别常见服务
- 进度显示:实时显示扫描进度
- Banner抓取:获取服务信息
注意事项
- 合法性:只扫描自己有权限的系统
- 网络影响:大量端口扫描可能被视为攻击
- 性能考虑:根据网络环境调整线程数
- 隐私保护:不要对未经授权的系统进行扫描
这个端口扫描器适合学习和授权的渗透测试使用。