From 9138792cf53dd62b0bb99c7f3209fa77d7a84bfc Mon Sep 17 00:00:00 2001 From: halliday Date: Fri, 11 Apr 2025 11:30:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Project/extend/SangforSCPClient.py | 180 +++++++++++++++++++++++ Project/extend/SangforSCPRobotLibrary.py | 153 +++++++++++++++++++ 2 files changed, 333 insertions(+) create mode 100644 Project/extend/SangforSCPClient.py create mode 100644 Project/extend/SangforSCPRobotLibrary.py diff --git a/Project/extend/SangforSCPClient.py b/Project/extend/SangforSCPClient.py new file mode 100644 index 0000000..3362f4c --- /dev/null +++ b/Project/extend/SangforSCPClient.py @@ -0,0 +1,180 @@ +#!-*- coding:utf8 -*- +import requests +import json +import sys +from binascii import a2b_hex, b2a_hex +from Crypto.PublicKey import RSA +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.Util.number import bytes_to_long +from pathlib import Path +from datetime import datetime, timezone + +class SangforSCPClient: + def __init__(self, scp_http, scp_user, scp_pwd): + """ + 初始化SCP客户端 + :param scp_http: SCP接口地址,如 "https://10.1.100.1:4430" + :param scp_user: 用户名 + :param scp_pwd: 密码 + """ + self.SCP_HTTP = scp_http + self.SCP_USER = scp_user + self.SCP_PWD = scp_pwd + + file_path = Path(__file__).resolve() + self.now_dir = file_path.parent + self.token_file = self.now_dir / "scp_token.json" + self.token = None + self.session = requests.Session() + self.session.verify = False # 禁用SSL验证 + + def encrypt_with_modulus(self, content, modulus=None): + """使用模数对文本进行加密 + :param content: 待加密内容,例如密码 + :param modulus: 公钥模数,需要调用get-public—key接口获取 + :return: 加密后的content + """ + content = content.encode("utf-8") + e = int(0x10001) + n = bytes_to_long(a2b_hex(modulus)) + rsa_key = RSA.construct((n, e)) + # generate/export public key + public_key = rsa_key.publickey() + cipher = PKCS1_v1_5.new(public_key) + content = cipher.encrypt(content) + content = b2a_hex(content) + return str(content.decode("utf-8")) + + def get_public_key(self): + """获取公钥模组""" + url = f"{self.SCP_HTTP}/janus/public-key" + headers = { + "Content-Type": "application/json" + } + + response = self.session.get(url, headers=headers) + + if response.status_code == 200: + return response.json()['data']['public_key'] + else: + raise Exception(f"获取公钥模组失败: {response.text}") + + def get_token(self): + """获取token""" + # 优先先从配置文件从读取token + if self.token_file.exists(): + with open(self.token_file, "r", encoding="utf-8") as f: + data = json.load(f) + # 解析时间并比较 + expires_str = data["data"]["access"]["token"]["expires"] + expires = datetime.fromisoformat(expires_str.replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + if now < expires: + print("Token 未过期") + self.token = data["data"]["access"]["token"]["id"] + return self.token + + # 获取新token + url = f"{self.SCP_HTTP}/janus/authenticate" + headers = { + "Content-Type": "application/json", + "Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466", + } + + # 根据公钥模组获取加密后的密码 + key = self.get_public_key().rstrip('\n') + password = self.encrypt_with_modulus(self.SCP_PWD, modulus=key) + payload = { + "auth": {"passwordCredentials": {"username": self.SCP_USER, "password": password}} + } + + response = self.session.post(url, headers=headers, json=payload) + + if response.status_code == 200: + # 保存结果 + response_data = response.json() + with open(self.token_file, "w", encoding="utf-8") as fw: + json.dump(response_data, fw, indent=4) + self.token = response_data["data"]["access"]["token"]["id"] + return self.token + else: + raise Exception(f"获取token失败: {response.text}") + + def get_servers_by_pool(self, az_id="3b706a1b-9fbc-4b5f-8768-f8260b081d7b"): + """获取指定资源池中的所有虚拟机""" + url = f"{self.SCP_HTTP}/janus/20180725/servers?az_id={az_id}" + headers = { + "Content-Type": "application/json", + "Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466", + "Authorization": "Token " + self.get_token(), + } + response = self.session.get(url, headers=headers) + + if response.status_code == 200: + vms = response.json()["data"]["data"] + return vms + else: + raise Exception(f"获取资源池中虚拟机失败: {response.text}") + + def get_serverid_by_name(self, vm_name): + """获取指定虚拟机的id""" + servers = self.get_servers_by_pool() + for vm in servers: + if vm["name"] == vm_name: + return vm["id"] + raise Exception(f"{vm_name} 没有找到,请确认是否存在") + + def power_off(self, vm_id, force=False): + """关闭虚拟机""" + url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/stop" + headers = { + "Content-Type": "application/json", + "Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466", + "Authorization": "Token " + self.get_token(), + } + # 是否强制关机,1为强制关机 + payload = { + "force": 1 if force else 0 + } + response = self.session.post(url, headers=headers, json=payload) + + if response.status_code == 201: + print("云主机关闭中") + return True + else: + raise Exception(f"云主机无法关机: {response.text}") + + def power_on(self, vm_id): + """开启虚拟机""" + url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/start" + headers = { + "Content-Type": "application/json", + "Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466", + "Authorization": "Token " + self.get_token(), + } + response = self.session.post(url, headers=headers) + + if response.status_code == 201: + print("云主机开机中") + return True + else: + raise Exception(f"云主机无法开启: {response.text}") + + +if __name__ == "__main__": + # 使用示例 + scp_client = SangforSCPClient( + scp_http="https://ip:4430", + scp_user="admin", + scp_pwd="password" + ) + # 虚拟机名称 + vm_name = "DPM_AUTO_TEST" + try: + vm_id = scp_client.get_serverid_by_name(vm_name) + # 关机 + # scp_client.power_off(vm_id) + # 开机 + scp_client.power_on(vm_id) + except Exception as e: + print(f"操作失败: {str(e)}") \ No newline at end of file diff --git a/Project/extend/SangforSCPRobotLibrary.py b/Project/extend/SangforSCPRobotLibrary.py new file mode 100644 index 0000000..0a7ea4b --- /dev/null +++ b/Project/extend/SangforSCPRobotLibrary.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +import requests +import json +from binascii import a2b_hex, b2a_hex +from Crypto.PublicKey import RSA +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.Util.number import bytes_to_long +from pathlib import Path +from datetime import datetime, timezone + + +class SangforSCPClient: + """底层SCP-API客户端实现""" + def __init__(self, scp_http, scp_user, scp_pwd,scp_azid="default"): + self.SCP_HTTP = scp_http + self.SCP_USER = scp_user + self.SCP_PWD = scp_pwd + self.SCP_AZ_ID = scp_azid + self.token_file = Path(__file__).parent / "scp_token.json" + self.session = requests.Session() + self.session.verify = False # 禁用SSL验证 + + def encrypt_with_modulus(self, content, modulus): + """使用模数加密内容""" + content = content.encode("utf-8") + e = int(0x10001) + n = bytes_to_long(a2b_hex(modulus)) + rsa_key = RSA.construct((n, e)) + public_key = rsa_key.publickey() + cipher = PKCS1_v1_5.new(public_key) + content = cipher.encrypt(content) + content = b2a_hex(content) + return str(content.decode("utf-8")) + + def get_public_key(self): + """获取公钥模数""" + url = f"{self.SCP_HTTP}/janus/public-key" + response = self.session.get(url, headers={"Content-Type": "application/json"}) + if response.status_code == 200: + return response.json()['data']['public_key'] + raise Exception(f"获取公钥失败: {response.text}") + + def get_token(self): + """获取认证Token""" + if self.token_file.exists(): + with open(self.token_file, "r", encoding="utf-8") as f: + data = json.load(f) + expires_str = data["data"]["access"]["token"]["expires"] + expires = datetime.fromisoformat(expires_str.replace("Z", "+00:00")) + if datetime.now(timezone.utc) < expires: + return data["data"]["access"]["token"]["id"] + + url = f"{self.SCP_HTTP}/janus/authenticate" + key = self.get_public_key().rstrip('\n') + password = self.encrypt_with_modulus(self.SCP_PWD, modulus=key) + payload = { + "auth": {"passwordCredentials": {"username": self.SCP_USER, "password": password}} + } + response = self.session.post(url, json=payload, headers={ + "Content-Type": "application/json", + "Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466" + }) + if response.status_code == 200: + with open(self.token_file, "w", encoding="utf-8") as f: + json.dump(response.json(), f) + return response.json()["data"]["access"]["token"]["id"] + raise Exception(f"获取Token失败: {response.text}") + + def get_servers_by_pool(self): + """获取资源池中的虚拟机列表""" + url = f"{self.SCP_HTTP}/janus/20180725/servers?az_id={self.SCP_AZ_ID}" + response = self.session.get(url, headers={ + "Content-Type": "application/json", + "Authorization": f"Token {self.get_token()}" + }) + if response.status_code == 200: + return response.json()["data"]["data"] + raise Exception(f"获取虚拟机列表失败: {response.text}") + + def get_serverid_by_name(self, vm_name): + """通过名称获取虚拟机ID""" + servers = self.get_servers_by_pool() + for vm in servers: + if vm["name"] == vm_name: + return vm["id"] + raise Exception(f"虚拟机'{vm_name}'未找到") + + def power_off(self, vm_id, force=False): + """关闭虚拟机""" + url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/stop" + response = self.session.post(url, json={"force": 1 if force else 0}, headers={ + "Content-Type": "application/json", + "Authorization": f"Token {self.get_token()}" + }) + if response.status_code != 201: + raise Exception(f"关机失败: {response.text}") + + def power_on(self, vm_id): + """启动虚拟机""" + url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/start" + response = self.session.post(url, headers={ + "Content-Type": "application/json", + "Authorization": f"Token {self.get_token()}" + }) + if response.status_code != 201: + raise Exception(f"开机失败: {response.text}") + +class SangforSCPRobotLibrary: + """Robot Framework关键字库""" + def __init__(self): + self.client = None + + def set_scp_connection(self, scp_http, scp_user, scp_pwd,scp_azid="3b706a1b-9fbc-4b5f-8768-f8260b081d7b"): + """设置SCP连接参数(测试用例中调用) + + 示例: + | Set SCP Connection | https://10.1.100.1:4430 | admin | admin | + """ + self.client = SangforSCPClient(scp_http, scp_user, scp_pwd,scp_azid) + return "SCP连接已建立" + + def power_off_vm_by_name(self, vm_name, force=False): + """通过名称关闭虚拟机 + + 示例: + | Power Off VM By Name | MyVM | + | Power Off VM By Name | MyVM | force=${True} | + """ + if not self.client: + raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'") + self.client.power_off(self.client.get_serverid_by_name(vm_name), force) + return f"已发送关机指令: {vm_name}" + + def power_on_vm_by_name(self, vm_name): + """通过名称启动虚拟机 + + 示例: + | Power On VM By Name | MyVM | + """ + if not self.client: + raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'") + self.client.power_on(self.client.get_serverid_by_name(vm_name)) + return f"已发送开机指令: {vm_name}" + + def get_vm_id_by_name(self, vm_name): + """通过名称获取虚拟机ID + + 示例: + | ${vm_id}= | Get VM ID By Name | MyVM | + """ + if not self.client: + raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'") + return self.client.get_serverid_by_name(vm_name) \ No newline at end of file