如何用Python实现一个能检测开放端口的扫描器

访客 网络编程 1

本文目录导读:

  1. 基础版端口扫描器
  2. 进阶版端口扫描器
  3. 使用说明

我来帮你实现一个Python端口扫描器,这里提供两个版本:基础版和进阶版。

基础版端口扫描器

import socket
import threading
from datetime import datetime
class PortScanner:
    def __init__(self, target, start_port=1, end_port=1024, threads=100):
        self.target = target
        self.start_port = start_port
        self.end_port = end_port
        self.threads = threads
        self.open_ports = []
        self.lock = threading.Lock()
    def scan_port(self, port):
        """扫描单个端口"""
        try:
            # 创建socket连接
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)  # 设置超时时间
            # 尝试连接
            result = sock.connect_ex((self.target, port))
            if result == 0:  # 端口开放
                with self.lock:
                    self.open_ports.append(port)
            sock.close()
        except socket.gaierror:
            print("主机名解析错误")
        except socket.error:
            print("连接错误")
    def scan_ports(self):
        """并行扫描端口"""
        print(f"开始扫描 {self.target}:{self.start_port}-{self.end_port}")
        print(f"扫描开始时间: {datetime.now()}")
        # 创建线程
        threads = []
        for port in range(self.start_port, self.end_port + 1):
            thread = threading.Thread(target=self.scan_port, args=(port,))
            threads.append(thread)
            # 控制线程数量
            if len(threads) >= self.threads:
                for t in threads:
                    t.start()
                for t in threads:
                    t.join()
                threads = []
        # 处理剩余线程
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print(f"扫描结束时间: {datetime.now()}")
        return self.open_ports
# 使用示例
if __name__ == "__main__":
    # 扫描目标
    target = "127.0.0.1"  # 可改为实际IP
    scanner = PortScanner(target, 1, 1000, 50)
    open_ports = scanner.scan_ports()
    if open_ports:
        print(f"开放的端口: {sorted(open_ports)}")
    else:
        print("没有发现开放端口")

进阶版端口扫描器

import socket
import threading
import queue
import time
import sys
from datetime import datetime
from typing import List, Optional
class AdvancedPortScanner:
    """高级端口扫描器"""
    def __init__(self, target: str, timeout: float = 1.0, max_threads: int = 200):
        self.target = target
        self.timeout = timeout
        self.max_threads = max_threads
        self.open_ports = []
        self.port_queue = queue.Queue()
        self.lock = threading.Lock()
        self.progress = 0
    def get_service_name(self, port: int) -> str:
        """获取常见端口服务名称"""
        common_ports = {
            21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP',
            53: 'DNS', 80: 'HTTP', 110: 'POP3', 143: 'IMAP',
            443: 'HTTPS', 445: 'SMB', 993: 'IMAPS', 995: 'POP3S',
            1433: 'MSSQL', 1521: 'Oracle', 3306: 'MySQL', 
            3389: 'RDP', 5432: 'PostgreSQL', 6379: 'Redis',
            8080: 'HTTP-Proxy', 8443: 'HTTPS-Alt'
        }
        return common_ports.get(port, f'Port-{port}')
    def scan_single_port(self, port: int) -> Optional[int]:
        """扫描单个端口"""
        try:
            # TCP连接扫描
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            # 尝试TCP连接
            result = sock.connect_ex((self.target, port))
            sock.close()
            if result == 0:
                # 获取服务banner
                banner = self.get_banner(port)
                return (port, banner)
        except Exception as e:
            return None
        return None
    def get_banner(self, port: int) -> Optional[str]:
        """获取服务banner"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((self.target, port))
            # 发送探针
            sock.send(b"\r\n")
            # 接收banner
            banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
            sock.close()
            return banner[:50] if banner else None
        except:
            return None
    def worker(self):
        """工作线程"""
        while True:
            try:
                port = self.port_queue.get_nowait()
                result = self.scan_single_port(port)
                if result:
                    port_num, banner = result
                    service = self.get_service_name(port_num)
                    with self.lock:
                        self.open_ports.append({
                            'port': port_num,
                            'service': service,
                            'banner': banner
                        })
                        # 实时输出
                        print(f"\r[+] 发现开放端口: {port_num}/{service}" + 
                              (f" (Banner: {banner})" if banner else ""))
                # 更新进度
                with self.lock:
                    self.progress += 1
                    self.print_progress()
                self.port_queue.task_done()
            except queue.Empty:
                break
            except Exception as e:
                with self.lock:
                    self.port_queue.task_done()
    def print_progress(self):
        """打印扫描进度"""
        sys.stdout.write(f"\r扫描进度: {self.progress}/{self.total_ports} "
                        f"({(self.progress/self.total_ports)*100:.1f}%)")
        sys.stdout.flush()
    def scan_range(self, start_port: int = 1, end_port: int = 1024):
        """扫描端口范围"""
        print(f"===== 端口扫描器 =====")
        print(f"目标: {self.target}")
        print(f"端口范围: {start_port}-{end_port}")
        print(f"最大线程数: {self.max_threads}")
        print(f"开始时间: {datetime.now()}\n")
        # 压入端口到队列
        self.total_ports = end_port - start_port + 1
        for port in range(start_port, end_port + 1):
            self.port_queue.put(port)
        # 启动线程
        threads = []
        for _ in range(min(self.max_threads, self.total_ports)):
            thread = threading.Thread(target=self.worker)
            thread.daemon = True
            thread.start()
            threads.append(thread)
        # 等待队列处理完成
        self.port_queue.join()
        # 等待所有线程结束
        for thread in threads:
            thread.join()
        print(f"\n\n扫描结束时间: {datetime.now()}")
        # 输出结果
        if self.open_ports:
            print("\n=== 扫描结果 ===")
            print(f"{'端口':<8} {'服务':<15} {'Banner':<30}")
            print("-" * 55)
            for port_info in sorted(self.open_ports, key=lambda x: x['port']):
                print(f"{port_info['port']:<8} {port_info['service']:<15} "
                      f"{str(port_info['banner'] or ''):<30}")
        else:
            print("\n没有发现开放端口")
        return self.open_ports
    def scan_specific_ports(self, ports: List[int]):
        """扫描特定端口列表"""
        return self.scan_range(min(ports), max(ports))
# 使用示例
def main():
    # 基础扫描
    print("1. 基础端口扫描")
    basic_scanner = PortScanner("127.0.0.1", 1, 500)
    open_ports = basic_scanner.scan_ports()
    print(f"开放端口: {open_ports}")
    print("\n" + "="*50 + "\n")
    # 高级扫描
    print("2. 高级端口扫描")
    advanced_scanner = AdvancedPortScanner(
        target="127.0.0.1",
        timeout=1.0,
        max_threads=100
    )
    # 扫描常见端口
    common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 
                    993, 995, 1433, 1521, 3306, 3389, 5432, 6379, 8080, 8443]
    print("扫描常见端口:")
    advanced_scanner.scan_specific_ports(common_ports)
if __name__ == "__main__":
    main()

使用说明

安装依赖

基础版不需要额外安装,使用标准库即可。

基本用法

# 基础扫描
scanner = PortScanner("192.168.1.1", 1, 1000, 50)
open_ports = scanner.scan_ports()
# 高级扫描
scanner = AdvancedPortScanner("example.com", timeout=2.0)
results = scanner.scan_range(1, 1024)

主要特性

  1. 多线程扫描:提高扫描速度
  2. 超时控制:避免卡死
  3. 服务识别:自动识别常见服务
  4. 进度显示:实时显示扫描进度
  5. Banner抓取:获取服务信息

注意事项

  1. 合法性:只扫描自己有权限的系统
  2. 网络影响:大量端口扫描可能被视为攻击
  3. 性能考虑:根据网络环境调整线程数
  4. 隐私保护:不要对未经授权的系统进行扫描

这个端口扫描器适合学习和授权的渗透测试使用。

标签: 端口扫描 Python实现

抱歉,评论功能暂时关闭!