286 lines
10 KiB
Python
286 lines
10 KiB
Python
import subprocess
|
||
import platform
|
||
import os
|
||
import time
|
||
import json
|
||
import yaml
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from colorama import Fore, Style, init
|
||
|
||
# 初始化colorama
|
||
init(autoreset=True)
|
||
|
||
|
||
def load_config(file_path='config.yml', base_dir=None):
|
||
"""从YAML配置文件加载IP列表和注释"""
|
||
if base_dir:
|
||
file_path = os.path.join(base_dir, file_path)
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"配置文件不存在: {file_path}")
|
||
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
except yaml.YAMLError as e:
|
||
raise ValueError(f"YAML格式错误: {e}")
|
||
|
||
# 获取IP列表和注释
|
||
ip_comments = []
|
||
if 'IP_LIST' in config and isinstance(config['IP_LIST'], dict):
|
||
for ip, comment in config['IP_LIST'].items():
|
||
ip = str(ip).strip()
|
||
comment = str(comment).strip() if comment else ""
|
||
|
||
# 验证是否为有效的IP地址或域名
|
||
if ip and not ip.startswith('#'):
|
||
ip_comments.append((ip, comment))
|
||
|
||
if not ip_comments:
|
||
raise ValueError("未找到有效的IP地址配置,请检查config.yml中的IP_LIST部分")
|
||
|
||
# 返回IP列表、注释字典和可选参数
|
||
ip_list = [item[0] for item in ip_comments]
|
||
comment_dict = {item[0]: item[1] for item in ip_comments}
|
||
|
||
# 获取可选参数
|
||
settings = config.get('SETTINGS', {})
|
||
if not isinstance(settings, dict):
|
||
settings = {}
|
||
|
||
optional_params = {
|
||
'count': max(1, min(10, settings.get('count', 2))), # 限制范围1-10
|
||
'timeout': max(1, min(10, settings.get('timeout', 2))), # 限制范围1-10秒
|
||
'max_workers': max(1, min(50, settings.get('max_workers', 10))) # 限制范围1-50
|
||
}
|
||
|
||
return ip_list, comment_dict, optional_params
|
||
|
||
|
||
def ping_ip(ip, count=2, timeout=2):
|
||
"""执行ping操作并返回结果"""
|
||
# 跨平台参数处理
|
||
param = '-n' if platform.system().lower() == 'windows' else '-c'
|
||
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
|
||
|
||
# 构建命令
|
||
command = ['ping', param, str(count)]
|
||
if platform.system().lower() != 'windows':
|
||
command.extend([timeout_param, str(timeout * 1000)]) # Linux/macOS使用毫秒
|
||
else:
|
||
command.extend(['-w', str(timeout * 1000)]) # Windows使用毫秒
|
||
|
||
command.append(ip)
|
||
|
||
# 使用subprocess.DEVNULL兼容所有平台
|
||
try:
|
||
response = subprocess.run(
|
||
command,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
timeout=timeout + count,
|
||
check=True
|
||
)
|
||
return ip, True
|
||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
||
return ip, False
|
||
|
||
|
||
def check_ips(ip_list, comment_dict, count=2, timeout=2, max_workers=10):
|
||
"""并发检查多个IP的连通性,带实时进度显示"""
|
||
results = {}
|
||
total = len(ip_list)
|
||
completed = 0
|
||
online_count = 0
|
||
offline_count = 0
|
||
|
||
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}开始检测 {total} 个IP地址的连通性...{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
|
||
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
futures = {executor.submit(ping_ip, ip, count, timeout): ip for ip in ip_list}
|
||
|
||
for future in as_completed(futures):
|
||
try:
|
||
ip, status = future.result()
|
||
results[ip] = status
|
||
completed += 1
|
||
|
||
if status:
|
||
online_count += 1
|
||
status_icon = f"{Fore.GREEN}✓{Style.RESET_ALL}"
|
||
else:
|
||
offline_count += 1
|
||
status_icon = f"{Fore.RED}✗{Style.RESET_ALL}"
|
||
|
||
# 获取注释
|
||
comment = comment_dict.get(ip, "")
|
||
comment_str = f" - {comment}" if comment else ""
|
||
|
||
# 实时显示检测结果
|
||
print(f" [{completed}/{total}] {status_icon} {ip:<20}{comment_str}")
|
||
|
||
except Exception as e:
|
||
completed += 1
|
||
offline_count += 1
|
||
print(f" [{completed}/{total}] {Fore.RED}✗{Style.RESET_ALL} {ip:<20} - {Fore.YELLOW}错误: {str(e)}{Style.RESET_ALL}")
|
||
|
||
# 打印总结
|
||
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}检测完成!{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
|
||
|
||
return results
|
||
|
||
|
||
def save_results(results, comment_dict, file_path='ping_results.json', base_dir=None):
|
||
"""保存结果到JSON文件"""
|
||
if base_dir:
|
||
file_path = os.path.join(base_dir, file_path)
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
'results': [
|
||
{
|
||
'ip': ip,
|
||
'status': "Pass" if status else "Fail",
|
||
'comment': comment_dict.get(ip, "")
|
||
}
|
||
for ip, status in results.items()
|
||
]
|
||
}, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def create_sample_config(file_path='config.yml', base_dir=None):
|
||
"""创建示例YAML配置文件"""
|
||
if base_dir:
|
||
file_path = os.path.join(base_dir, file_path)
|
||
|
||
if not os.path.exists(file_path):
|
||
# 使用注释格式创建更友好的YAML文件
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write("""# Ping多多配置文件 - YAML格式
|
||
# 每行一个IP地址,冒号后添加注释(可选)
|
||
|
||
IP_LIST:
|
||
# DNS服务器
|
||
8.8.8.8: "Google DNS"
|
||
192.168.1.1: "路由器"
|
||
|
||
# 其他示例
|
||
example.com: "示例网站"
|
||
10.0.0.1: ""
|
||
|
||
# 可选参数配置
|
||
SETTINGS:
|
||
count: 2 # ping次数
|
||
timeout: 2 # 超时时间(秒)
|
||
max_workers: 10 # 并发线程数
|
||
""")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 显示欢迎信息
|
||
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}🔍 Ping多多 - IP连通性检测工具 v2.0{Style.RESET_ALL}")
|
||
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}\n")
|
||
|
||
# 获取当前脚本所在目录
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 创建示例配置文件(如果不存在)
|
||
create_sample_config(base_dir=script_dir)
|
||
|
||
try:
|
||
# 加载配置(优先使用YAML格式)
|
||
config_file = os.path.join(script_dir, 'config.yml')
|
||
if not os.path.exists(config_file):
|
||
# 如果YAML不存在,尝试旧的INI格式
|
||
config_file = os.path.join(script_dir, 'config.ini')
|
||
|
||
ip_list, comment_dict, optional_params = load_config(file_path=config_file)
|
||
count = optional_params['count']
|
||
timeout = optional_params['timeout']
|
||
max_workers = optional_params['max_workers']
|
||
|
||
# 执行ping测试
|
||
print(f"正在测试 {len(ip_list)} 个IP地址的连通性...")
|
||
print(f"参数: count={count}, timeout={timeout}s, max_workers={max_workers}")
|
||
start_time = time.time()
|
||
results = check_ips(ip_list, comment_dict, count, timeout, max_workers)
|
||
elapsed = time.time() - start_time
|
||
|
||
# 输出汇总结果(带注释)
|
||
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}")
|
||
print(f"{Fore.YELLOW}📊 检测结果汇总{Style.RESET_ALL}")
|
||
print(f"{Fore.YELLOW}{'='*60}{Style.RESET_ALL}\n")
|
||
|
||
# 先输出开机的
|
||
print(f"{Fore.GREEN}【开机】:{Style.RESET_ALL}")
|
||
online_found = False
|
||
for ip, status in results.items():
|
||
if status:
|
||
online_found = True
|
||
comment = comment_dict.get(ip, "")
|
||
display_text = f"{ip} ({comment})" if comment else ip
|
||
print(f" {Fore.GREEN}✓{Style.RESET_ALL} {display_text}")
|
||
|
||
if not online_found:
|
||
print(f" {Fore.LIGHTBLACK_EX}无{Style.RESET_ALL}")
|
||
|
||
# 再输出关机的
|
||
print(f"\n{Fore.RED}【关机】:{Style.RESET_ALL}")
|
||
offline_found = False
|
||
for ip, status in results.items():
|
||
if not status:
|
||
offline_found = True
|
||
comment = comment_dict.get(ip, "")
|
||
display_text = f"{ip} ({comment})" if comment else ip
|
||
print(f" {Fore.RED}✗{Style.RESET_ALL} {display_text}")
|
||
|
||
if not offline_found:
|
||
print(f" {Fore.LIGHTBLACK_EX}无{Style.RESET_ALL}")
|
||
|
||
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}")
|
||
success_count = sum(1 for s in results.values() if s)
|
||
fail_count = sum(1 for s in results.values() if not s)
|
||
total_count = len(ip_list)
|
||
|
||
# 计算成功率
|
||
success_rate = (success_count / total_count * 100) if total_count > 0 else 0
|
||
|
||
print(f"{Fore.CYAN}📈 统计信息:{Style.RESET_ALL}")
|
||
print(f" {Fore.WHITE}服务器总数 : {total_count}{Style.RESET_ALL}")
|
||
print(f" {Fore.GREEN}✓ 开机: {success_count}{Style.RESET_ALL} {Fore.RED}✗ 关机: {fail_count}{Style.RESET_ALL}")
|
||
print(f" {Fore.BLUE}⏱ 耗时: {elapsed:.2f}秒{Style.RESET_ALL}")
|
||
print(f" {Fore.YELLOW}📊 成功率: {success_rate:.1f}%{Style.RESET_ALL}")
|
||
|
||
# # 根据成功率显示不同的提示
|
||
# if success_rate >= 90:
|
||
# print(f"\n{Fore.GREEN}✅ 系统状态良好!{Style.RESET_ALL}")
|
||
# elif success_rate >= 70:
|
||
# print(f"\n{Fore.YELLOW}⚠️ 部分服务器离线,请检查{Style.RESET_ALL}")
|
||
# else:
|
||
# print(f"\n{Fore.RED}❌ 大量服务器离线,需要立即处理!{Style.RESET_ALL}")
|
||
|
||
print(f"\n{Fore.YELLOW}{'='*60}{Style.RESET_ALL}\n")
|
||
|
||
# 保存结果
|
||
save_results(results, comment_dict, base_dir=script_dir)
|
||
print(f"{Fore.GREEN}✓ 结果已保存到 ping_results.json{Style.RESET_ALL}\n")
|
||
|
||
except FileNotFoundError as e:
|
||
print(f"\n{Fore.RED}✗ 错误: {e}{Style.RESET_ALL}")
|
||
print(f"{Fore.YELLOW}提示: 首次运行会自动创建示例配置文件{Style.RESET_ALL}\n")
|
||
except ValueError as e:
|
||
print(f"\n{Fore.RED}✗ 配置错误: {e}{Style.RESET_ALL}")
|
||
print(f"{Fore.YELLOW}请检查config.yml文件格式是否正确{Style.RESET_ALL}\n")
|
||
except Exception as e:
|
||
print(f"\n{Fore.RED}✗ 未知错误: {e}{Style.RESET_ALL}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
print()
|