robotframework-demo/Project/extend/SangforSCPRobotLibrary.py

153 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import requests
import json
from binascii import a2b_hex, b2a_hex
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Util.number import bytes_to_long
from pathlib import Path
from datetime import datetime, timezone
class SangforSCPClient:
"""底层SCP-API客户端实现"""
def __init__(self, scp_http, scp_user, scp_pwd,scp_azid="default"):
self.SCP_HTTP = scp_http
self.SCP_USER = scp_user
self.SCP_PWD = scp_pwd
self.SCP_AZ_ID = scp_azid
self.token_file = Path(__file__).parent / "scp_token.json"
self.session = requests.Session()
self.session.verify = False # 禁用SSL验证
def encrypt_with_modulus(self, content, modulus):
"""使用模数加密内容"""
content = content.encode("utf-8")
e = int(0x10001)
n = bytes_to_long(a2b_hex(modulus))
rsa_key = RSA.construct((n, e))
public_key = rsa_key.publickey()
cipher = PKCS1_v1_5.new(public_key)
content = cipher.encrypt(content)
content = b2a_hex(content)
return str(content.decode("utf-8"))
def get_public_key(self):
"""获取公钥模数"""
url = f"{self.SCP_HTTP}/janus/public-key"
response = self.session.get(url, headers={"Content-Type": "application/json"})
if response.status_code == 200:
return response.json()['data']['public_key']
raise Exception(f"获取公钥失败: {response.text}")
def get_token(self):
"""获取认证Token"""
if self.token_file.exists():
with open(self.token_file, "r", encoding="utf-8") as f:
data = json.load(f)
expires_str = data["data"]["access"]["token"]["expires"]
expires = datetime.fromisoformat(expires_str.replace("Z", "+00:00"))
if datetime.now(timezone.utc) < expires:
return data["data"]["access"]["token"]["id"]
url = f"{self.SCP_HTTP}/janus/authenticate"
key = self.get_public_key().rstrip('\n')
password = self.encrypt_with_modulus(self.SCP_PWD, modulus=key)
payload = {
"auth": {"passwordCredentials": {"username": self.SCP_USER, "password": password}}
}
response = self.session.post(url, json=payload, headers={
"Content-Type": "application/json",
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466"
})
if response.status_code == 200:
with open(self.token_file, "w", encoding="utf-8") as f:
json.dump(response.json(), f)
return response.json()["data"]["access"]["token"]["id"]
raise Exception(f"获取Token失败: {response.text}")
def get_servers_by_pool(self):
"""获取资源池中的虚拟机列表"""
url = f"{self.SCP_HTTP}/janus/20180725/servers?az_id={self.SCP_AZ_ID}"
response = self.session.get(url, headers={
"Content-Type": "application/json",
"Authorization": f"Token {self.get_token()}"
})
if response.status_code == 200:
return response.json()["data"]["data"]
raise Exception(f"获取虚拟机列表失败: {response.text}")
def get_serverid_by_name(self, vm_name):
"""通过名称获取虚拟机ID"""
servers = self.get_servers_by_pool()
for vm in servers:
if vm["name"] == vm_name:
return vm["id"]
raise Exception(f"虚拟机'{vm_name}'未找到")
def power_off(self, vm_id, force=False):
"""关闭虚拟机"""
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/stop"
response = self.session.post(url, json={"force": 1 if force else 0}, headers={
"Content-Type": "application/json",
"Authorization": f"Token {self.get_token()}"
})
if response.status_code != 201:
raise Exception(f"关机失败: {response.text}")
def power_on(self, vm_id):
"""启动虚拟机"""
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/start"
response = self.session.post(url, headers={
"Content-Type": "application/json",
"Authorization": f"Token {self.get_token()}"
})
if response.status_code != 201:
raise Exception(f"开机失败: {response.text}")
class SangforSCPRobotLibrary:
"""Robot Framework关键字库"""
def __init__(self):
self.client = None
def set_scp_connection(self, scp_http, scp_user, scp_pwd,scp_azid="3b706a1b-9fbc-4b5f-8768-f8260b081d7b"):
"""设置SCP连接参数测试用例中调用
示例:
| Set SCP Connection | https://10.1.100.1:4430 | admin | admin |
"""
self.client = SangforSCPClient(scp_http, scp_user, scp_pwd,scp_azid)
return "SCP连接已建立"
def power_off_vm_by_name(self, vm_name, force=False):
"""通过名称关闭虚拟机
示例:
| Power Off VM By Name | MyVM |
| Power Off VM By Name | MyVM | force=${True} |
"""
if not self.client:
raise Exception("未初始化SCP连接请先调用'Set SCP Connection'")
self.client.power_off(self.client.get_serverid_by_name(vm_name), force)
return f"已发送关机指令: {vm_name}"
def power_on_vm_by_name(self, vm_name):
"""通过名称启动虚拟机
示例:
| Power On VM By Name | MyVM |
"""
if not self.client:
raise Exception("未初始化SCP连接请先调用'Set SCP Connection'")
self.client.power_on(self.client.get_serverid_by_name(vm_name))
return f"已发送开机指令: {vm_name}"
def get_vm_id_by_name(self, vm_name):
"""通过名称获取虚拟机ID
示例:
| ${vm_id}= | Get VM ID By Name | MyVM |
"""
if not self.client:
raise Exception("未初始化SCP连接请先调用'Set SCP Connection'")
return self.client.get_serverid_by_name(vm_name)