check-system-info/bin/get_system_info.py

239 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import _load
import psutil
import smtplib
import argparse
import yaml
from datetime import datetime
from pathlib import Path
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
# 定义公共的部分
path = Path(__file__)
report_dir = path.parent.parent / "report"
config_file_patch = path.parent.parent / "etc/check_config.yml"
# 初始化目录
if not report_dir.exists():
# shutil.rmtree(data_dir)
report_dir.mkdir()
# 报告文件
week = str(datetime.now().isocalendar()[1])
file_name_week = "ComputerInfos-week-" + week + ".txt"
file_name_today = "ComputerInfos-today.txt"
file_today_path = report_dir/file_name_today
file_week_path = report_dir/file_name_week
# 获取配置文件
with open(config_file_patch, "r", encoding='utf-8') as fy:
config = yaml.safe_load(fy)
# print(config)
need_sendmail = False
# 发邮件
def send_mail():
# 发件人
sender = config["receivers"][0]
# 接收邮件,可以发给多人
receivers = config["receivers"]
# 邮件主体
msg = MIMEMultipart()
# 正文
with open(str(file_today_path), "r", encoding='utf-8') as f: # 打开文本
msgdata = f.read()
message = "{0}周,{1}资源使用情况检测已完成,本次检测内容如下所示,本周的所有信息请查看附件!\n{2}".format(
week, get_IP(), msgdata)
msg.attach(MIMEText(message, 'plain', _charset="utf-8"))
# 发送者
msg['From'] = Header(sender, 'utf-8')
# 接收者
msg['To'] = Header(receivers[0], 'utf-8')
# 主题
error_msg = ""
if need_sendmail:
error_msg = "【资源警告】"
subject = '【长期任务】{0}{1}{2}系统运行信息'.format(error_msg, week, get_IP())
msg['Subject'] = Header(subject, 'utf-8')
# 附件信息
att = MIMEText(open(str(file_week_path), 'rb').read(), 'base64', 'utf-8')
att["Content-Type"] = 'application/octet-stream'
att["Content-Disposition"] = 'attachment; filename="{}"'.format(
file_name_week)
msg.attach(att)
try:
smtpObj = smtplib.SMTP('10.10.110.102')
#身份认证
smtpObj.login(sender,"111111")
smtpObj.sendmail(sender, receivers, msg.as_string())
print("邮件发送成功")
except smtplib.SMTPException:
print("Error: 无法发送邮件")
# 写入文件
def save_txt(datas):
with open(str(file_today_path), 'a+', encoding='utf-8', newline='') as file_txt:
[file_txt.write(str(item)+'\n') for item in datas]
# 合并文件
def save_all_tex():
with open(file_today_path, 'rb') as f1, open(file_week_path, 'ab+') as f2:
f2.write(f1.read())
# 获取本机磁盘使用率和剩余空间G信息
def get_disk_info():
# 循环磁盘分区
content = []
for disk in psutil.disk_partitions():
# 读写方式 光盘 or 有效磁盘类型
if 'cdrom' in disk.opts or disk.fstype == '':
continue
disk_name = disk.mountpoint
disk_info = psutil.disk_usage(disk.mountpoint)
# 磁盘剩余空间单位G
free_disk_size = disk_info.free//1024//1024//1024
# 磁盘总空间单位G
total_disk_size = disk_info.total//1024//1024//1024
# 检测是否超过阈值
error_msg = ""
if disk_info.percent > config["Disk_MAX"]:
error_msg = "\n\t警告: {0}使用率超过{1},请及时处理!".format(
disk_name, str(config["Disk_MAX"]))
global need_sendmail
need_sendmail = True
# 当前磁盘使用率、剩余空间G和磁盘总空间信息
info = "\t{0}\t使用率:{1}% 剩余空间:{2}G 总大小:{3}G {4}".format(
disk_name, str(disk_info.percent), free_disk_size, total_disk_size, error_msg)
# print(info)
# 拼接多个磁盘的信息
content.append(info)
print(content)
save_txt(content)
# 获取某个目录的大小
def get_dir_size(path):
list1 = []
for item in path.iterdir():
if item.is_file():
file_size = file_today_path.stat().st_size
list1.append(file_size)
str_tex = f"\t{item}的大小是{file_size}字节"
print(str_tex)
save_txt([str_tex])
elif item.is_dir():
# print(f"目录: {item}")
save_txt(['\t--------------'])
get_dir_size(item)
str_dir_tex = '\t{0} 的大小为: {1:.4f} MB'.format(path, (sum(list1)/1024/1024))
print(str_dir_tex)
save_txt([str_dir_tex])
# cpu信息
def get_cpu_info():
cpu_percent = psutil.cpu_percent(interval=1)
# 检测是否超过阈值
error_msg = ""
if cpu_percent > config["CPU_MAX"]:
error_msg = "\n\t警告: CPU使用率超过{0},请及时处理!".format(str(config["CPU_MAX"]))
global need_sendmail
need_sendmail = True
cpu_info = ["", "CPU使用率{0}% {1}".format(cpu_percent, error_msg), ""]
print(cpu_info)
# return cpu_info
save_txt(cpu_info)
# 内存信息
def get_memory_info():
virtual_memory = psutil.virtual_memory()
used_memory = virtual_memory.used/1024/1024/1024
free_memory = virtual_memory.free/1024/1024/1024
memory_percent = virtual_memory.percent
# 检测是否超过阈值
error_msg = ""
if memory_percent > config["Memory_MAX"]:
error_msg = "\n\t警告: 内存使用率超过{0},请及时处理!".format(
str(config["Memory_MAX"]))
global need_sendmail
need_sendmail = True
memory_info = ["内存使用:{0:0.2f}G使用率{1:0.1f}%,剩余内存:{2:0.2f}G {3}".format(
used_memory, memory_percent, free_memory, error_msg), ""]
print(memory_info)
# return memory_info
save_txt(memory_info)
def get_IP():
"""获取ipv4地址"""
dic = psutil.net_if_addrs()
ipv4_list = []
for adapter in dic:
snicList = dic[adapter]
for snic in snicList:
# if snic.family.name in {'AF_LINK', 'AF_PACKET'}:
# mac = snic.address
if snic.family.name == 'AF_INET':
ipv4 = snic.address
if ipv4 != '127.0.0.1':
ipv4_list.append(ipv4)
# elif snic.family.name == 'AF_INET6':
# ipv6 = snic.address
print(ipv4_list)
return (ipv4_list[0])
def main():
# 删除之前的文件
if file_today_path.exists():
file_today_path.unlink()
# 公共信息
commoninfo = []
if not file_week_path.exists():
t1 = "本周是第{0}".format(week)
commoninfo.append(t1)
commoninfo.append("")
commoninfo.append("--------------------START--------------------------")
commoninfo.append("检测时间: {0}".format(datetime.now()))
commoninfo.append("检测IP {0}".format(get_IP()))
save_txt(commoninfo)
save_txt(["", "磁盘使用情况:", ""])
get_disk_info()
get_cpu_info()
get_memory_info()
path = config["agent_dir"]
directory_path = Path(path)
save_txt(["{0}目录大小:".format(path)])
get_dir_size(directory_path)
save_txt(['----------------------END----------------------------'])
if __name__ == '__main__':
main()
save_all_tex()
# 默认不发邮件,通过参数控制
parser = argparse.ArgumentParser(description='send email')
parser.add_argument("--send-mail", type=bool, default=False)
args = parser.parse_args()
# 超过阈值的和需要发邮件的
if (args.send_mail or need_sendmail):
send_mail()