script/Ping多多/MyPing配置IP.py

286 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import platform
import os
import time
import json
import yaml
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import Fore, Style, init
# 初始化colorama
init(autoreset=True)
def load_config(file_path='config.yml', base_dir=None):
"""从YAML配置文件加载IP列表和注释"""
if base_dir:
file_path = os.path.join(base_dir, file_path)
# 检查文件是否存在
if not os.path.exists(file_path):
raise FileNotFoundError(f"配置文件不存在: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
except yaml.YAMLError as e:
raise ValueError(f"YAML格式错误: {e}")
# 获取IP列表和注释
ip_comments = []
if 'IP_LIST' in config and isinstance(config['IP_LIST'], dict):
for ip, comment in config['IP_LIST'].items():
ip = str(ip).strip()
comment = str(comment).strip() if comment else ""
# 验证是否为有效的IP地址或域名
if ip and not ip.startswith('#'):
ip_comments.append((ip, comment))
if not ip_comments:
raise ValueError("未找到有效的IP地址配置请检查config.yml中的IP_LIST部分")
# 返回IP列表、注释字典和可选参数
ip_list = [item[0] for item in ip_comments]
comment_dict = {item[0]: item[1] for item in ip_comments}
# 获取可选参数
settings = config.get('SETTINGS', {})
if not isinstance(settings, dict):
settings = {}
optional_params = {
'count': max(1, min(10, settings.get('count', 2))), # 限制范围1-10
'timeout': max(1, min(10, settings.get('timeout', 2))), # 限制范围1-10秒
'max_workers': max(1, min(50, settings.get('max_workers', 10))) # 限制范围1-50
}
return ip_list, comment_dict, optional_params
def ping_ip(ip, count=2, timeout=2):
"""执行ping操作并返回结果"""
# 跨平台参数处理
param = '-n' if platform.system().lower() == 'windows' else '-c'
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
# 构建命令
command = ['ping', param, str(count)]
if platform.system().lower() != 'windows':
command.extend([timeout_param, str(timeout * 1000)]) # Linux/macOS使用毫秒
else:
command.extend(['-w', str(timeout * 1000)]) # Windows使用毫秒
command.append(ip)
# 使用subprocess.DEVNULL兼容所有平台
try:
response = subprocess.run(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout + count,
check=True
)
return ip, True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return ip, False
def check_ips(ip_list, comment_dict, count=2, timeout=2, max_workers=10):
"""并发检查多个IP的连通性带实时进度显示"""
results = {}
total = len(ip_list)
completed = 0
online_count = 0
offline_count = 0
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}开始检测 {total} 个IP地址的连通性...{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(ping_ip, ip, count, timeout): ip for ip in ip_list}
for future in as_completed(futures):
try:
ip, status = future.result()
results[ip] = status
completed += 1
if status:
online_count += 1
status_icon = f"{Fore.GREEN}{Style.RESET_ALL}"
else:
offline_count += 1
status_icon = f"{Fore.RED}{Style.RESET_ALL}"
# 获取注释
comment = comment_dict.get(ip, "")
comment_str = f" - {comment}" if comment else ""
# 实时显示检测结果
print(f" [{completed}/{total}] {status_icon} {ip:<20}{comment_str}")
except Exception as e:
completed += 1
offline_count += 1
print(f" [{completed}/{total}] {Fore.RED}{Style.RESET_ALL} {ip:<20} - {Fore.YELLOW}错误: {str(e)}{Style.RESET_ALL}")
# 打印总结
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}检测完成!{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
return results
def save_results(results, comment_dict, file_path='ping_results.json', base_dir=None):
"""保存结果到JSON文件"""
if base_dir:
file_path = os.path.join(base_dir, file_path)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump({
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'results': [
{
'ip': ip,
'status': "Pass" if status else "Fail",
'comment': comment_dict.get(ip, "")
}
for ip, status in results.items()
]
}, f, indent=2, ensure_ascii=False)
def create_sample_config(file_path='config.yml', base_dir=None):
"""创建示例YAML配置文件"""
if base_dir:
file_path = os.path.join(base_dir, file_path)
if not os.path.exists(file_path):
# 使用注释格式创建更友好的YAML文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write("""# Ping多多配置文件 - YAML格式
# 每行一个IP地址冒号后添加注释可选
IP_LIST:
# DNS服务器
8.8.8.8: "Google DNS"
192.168.1.1: "路由器"
# 其他示例
example.com: "示例网站"
10.0.0.1: ""
# 可选参数配置
SETTINGS:
count: 2 # ping次数
timeout: 2 # 超时时间(秒)
max_workers: 10 # 并发线程数
""")
if __name__ == "__main__":
# 显示欢迎信息
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}🔍 Ping多多 - IP连通性检测工具 v2.0{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
# 获取当前脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 创建示例配置文件(如果不存在)
create_sample_config(base_dir=script_dir)
try:
# 加载配置优先使用YAML格式
config_file = os.path.join(script_dir, 'config.yml')
if not os.path.exists(config_file):
# 如果YAML不存在尝试旧的INI格式
config_file = os.path.join(script_dir, 'config.ini')
ip_list, comment_dict, optional_params = load_config(file_path=config_file)
count = optional_params['count']
timeout = optional_params['timeout']
max_workers = optional_params['max_workers']
# 执行ping测试
print(f"正在测试 {len(ip_list)} 个IP地址的连通性...")
print(f"参数: count={count}, timeout={timeout}s, max_workers={max_workers}")
start_time = time.time()
results = check_ips(ip_list, comment_dict, count, timeout, max_workers)
elapsed = time.time() - start_time
# 输出汇总结果(带注释)
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}📊 检测结果汇总{Style.RESET_ALL}")
print(f"{Fore.YELLOW}{'='*60}{Style.RESET_ALL}\n")
# 先输出开机的
print(f"{Fore.GREEN}【开机】:{Style.RESET_ALL}")
online_found = False
for ip, status in results.items():
if status:
online_found = True
comment = comment_dict.get(ip, "")
display_text = f"{ip} ({comment})" if comment else ip
print(f" {Fore.GREEN}{Style.RESET_ALL} {display_text}")
if not online_found:
print(f" {Fore.LIGHTBLACK_EX}{Style.RESET_ALL}")
# 再输出关机的
print(f"\n{Fore.RED}【关机】:{Style.RESET_ALL}")
offline_found = False
for ip, status in results.items():
if not status:
offline_found = True
comment = comment_dict.get(ip, "")
display_text = f"{ip} ({comment})" if comment else ip
print(f" {Fore.RED}{Style.RESET_ALL} {display_text}")
if not offline_found:
print(f" {Fore.LIGHTBLACK_EX}{Style.RESET_ALL}")
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}")
success_count = sum(1 for s in results.values() if s)
fail_count = sum(1 for s in results.values() if not s)
total_count = len(ip_list)
# 计算成功率
success_rate = (success_count / total_count * 100) if total_count > 0 else 0
print(f"{Fore.CYAN}📈 统计信息:{Style.RESET_ALL}")
print(f" {Fore.WHITE}服务器总数 : {total_count}{Style.RESET_ALL}")
print(f" {Fore.GREEN}✓ 开机: {success_count}{Style.RESET_ALL} {Fore.RED}✗ 关机: {fail_count}{Style.RESET_ALL}")
print(f" {Fore.BLUE}⏱ 耗时: {elapsed:.2f}{Style.RESET_ALL}")
print(f" {Fore.YELLOW}📊 成功率: {success_rate:.1f}%{Style.RESET_ALL}")
# # 根据成功率显示不同的提示
# if success_rate >= 90:
# print(f"\n{Fore.GREEN}✅ 系统状态良好!{Style.RESET_ALL}")
# elif success_rate >= 70:
# print(f"\n{Fore.YELLOW}⚠️ 部分服务器离线,请检查{Style.RESET_ALL}")
# else:
# print(f"\n{Fore.RED}❌ 大量服务器离线,需要立即处理!{Style.RESET_ALL}")
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}\n")
# 保存结果
save_results(results, comment_dict, base_dir=script_dir)
print(f"{Fore.GREEN}✓ 结果已保存到 ping_results.json{Style.RESET_ALL}\n")
except FileNotFoundError as e:
print(f"\n{Fore.RED}✗ 错误: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}提示: 首次运行会自动创建示例配置文件{Style.RESET_ALL}\n")
except ValueError as e:
print(f"\n{Fore.RED}✗ 配置错误: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}请检查config.yml文件格式是否正确{Style.RESET_ALL}\n")
except Exception as e:
print(f"\n{Fore.RED}✗ 未知错误: {e}{Style.RESET_ALL}")
import traceback
traceback.print_exc()
print()