153 lines
6.0 KiB
Python
153 lines
6.0 KiB
Python
# -*- coding: utf-8 -*-
|
||
import requests
|
||
import json
|
||
from binascii import a2b_hex, b2a_hex
|
||
from Crypto.PublicKey import RSA
|
||
from Crypto.Cipher import PKCS1_v1_5
|
||
from Crypto.Util.number import bytes_to_long
|
||
from pathlib import Path
|
||
from datetime import datetime, timezone
|
||
|
||
|
||
class SangforSCPClient:
|
||
"""底层SCP-API客户端实现"""
|
||
def __init__(self, scp_http, scp_user, scp_pwd,scp_azid="default"):
|
||
self.SCP_HTTP = scp_http
|
||
self.SCP_USER = scp_user
|
||
self.SCP_PWD = scp_pwd
|
||
self.SCP_AZ_ID = scp_azid
|
||
self.token_file = Path(__file__).parent / "scp_token.json"
|
||
self.session = requests.Session()
|
||
self.session.verify = False # 禁用SSL验证
|
||
|
||
def encrypt_with_modulus(self, content, modulus):
|
||
"""使用模数加密内容"""
|
||
content = content.encode("utf-8")
|
||
e = int(0x10001)
|
||
n = bytes_to_long(a2b_hex(modulus))
|
||
rsa_key = RSA.construct((n, e))
|
||
public_key = rsa_key.publickey()
|
||
cipher = PKCS1_v1_5.new(public_key)
|
||
content = cipher.encrypt(content)
|
||
content = b2a_hex(content)
|
||
return str(content.decode("utf-8"))
|
||
|
||
def get_public_key(self):
|
||
"""获取公钥模数"""
|
||
url = f"{self.SCP_HTTP}/janus/public-key"
|
||
response = self.session.get(url, headers={"Content-Type": "application/json"})
|
||
if response.status_code == 200:
|
||
return response.json()['data']['public_key']
|
||
raise Exception(f"获取公钥失败: {response.text}")
|
||
|
||
def get_token(self):
|
||
"""获取认证Token"""
|
||
if self.token_file.exists():
|
||
with open(self.token_file, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
expires_str = data["data"]["access"]["token"]["expires"]
|
||
expires = datetime.fromisoformat(expires_str.replace("Z", "+00:00"))
|
||
if datetime.now(timezone.utc) < expires:
|
||
return data["data"]["access"]["token"]["id"]
|
||
|
||
url = f"{self.SCP_HTTP}/janus/authenticate"
|
||
key = self.get_public_key().rstrip('\n')
|
||
password = self.encrypt_with_modulus(self.SCP_PWD, modulus=key)
|
||
payload = {
|
||
"auth": {"passwordCredentials": {"username": self.SCP_USER, "password": password}}
|
||
}
|
||
response = self.session.post(url, json=payload, headers={
|
||
"Content-Type": "application/json",
|
||
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466"
|
||
})
|
||
if response.status_code == 200:
|
||
with open(self.token_file, "w", encoding="utf-8") as f:
|
||
json.dump(response.json(), f)
|
||
return response.json()["data"]["access"]["token"]["id"]
|
||
raise Exception(f"获取Token失败: {response.text}")
|
||
|
||
def get_servers_by_pool(self):
|
||
"""获取资源池中的虚拟机列表"""
|
||
url = f"{self.SCP_HTTP}/janus/20180725/servers?az_id={self.SCP_AZ_ID}"
|
||
response = self.session.get(url, headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Token {self.get_token()}"
|
||
})
|
||
if response.status_code == 200:
|
||
return response.json()["data"]["data"]
|
||
raise Exception(f"获取虚拟机列表失败: {response.text}")
|
||
|
||
def get_serverid_by_name(self, vm_name):
|
||
"""通过名称获取虚拟机ID"""
|
||
servers = self.get_servers_by_pool()
|
||
for vm in servers:
|
||
if vm["name"] == vm_name:
|
||
return vm["id"]
|
||
raise Exception(f"虚拟机'{vm_name}'未找到")
|
||
|
||
def power_off(self, vm_id, force=False):
|
||
"""关闭虚拟机"""
|
||
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/stop"
|
||
response = self.session.post(url, json={"force": 1 if force else 0}, headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Token {self.get_token()}"
|
||
})
|
||
if response.status_code != 201:
|
||
raise Exception(f"关机失败: {response.text}")
|
||
|
||
def power_on(self, vm_id):
|
||
"""启动虚拟机"""
|
||
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/start"
|
||
response = self.session.post(url, headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Token {self.get_token()}"
|
||
})
|
||
if response.status_code != 201:
|
||
raise Exception(f"开机失败: {response.text}")
|
||
|
||
class SangforSCPRobotLibrary:
|
||
"""Robot Framework关键字库"""
|
||
def __init__(self):
|
||
self.client = None
|
||
|
||
def set_scp_connection(self, scp_http, scp_user, scp_pwd,scp_azid="3b706a1b-9fbc-4b5f-8768-f8260b081d7b"):
|
||
"""设置SCP连接参数(测试用例中调用)
|
||
|
||
示例:
|
||
| Set SCP Connection | https://10.1.100.1:4430 | admin | admin |
|
||
"""
|
||
self.client = SangforSCPClient(scp_http, scp_user, scp_pwd,scp_azid)
|
||
return "SCP连接已建立"
|
||
|
||
def power_off_vm_by_name(self, vm_name, force=False):
|
||
"""通过名称关闭虚拟机
|
||
|
||
示例:
|
||
| Power Off VM By Name | MyVM |
|
||
| Power Off VM By Name | MyVM | force=${True} |
|
||
"""
|
||
if not self.client:
|
||
raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'")
|
||
self.client.power_off(self.client.get_serverid_by_name(vm_name), force)
|
||
return f"已发送关机指令: {vm_name}"
|
||
|
||
def power_on_vm_by_name(self, vm_name):
|
||
"""通过名称启动虚拟机
|
||
|
||
示例:
|
||
| Power On VM By Name | MyVM |
|
||
"""
|
||
if not self.client:
|
||
raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'")
|
||
self.client.power_on(self.client.get_serverid_by_name(vm_name))
|
||
return f"已发送开机指令: {vm_name}"
|
||
|
||
def get_vm_id_by_name(self, vm_name):
|
||
"""通过名称获取虚拟机ID
|
||
|
||
示例:
|
||
| ${vm_id}= | Get VM ID By Name | MyVM |
|
||
"""
|
||
if not self.client:
|
||
raise Exception("未初始化SCP连接!请先调用'Set SCP Connection'")
|
||
return self.client.get_serverid_by_name(vm_name) |