robotframework-demo/Project/extend/SangforSCPClient.py

180 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!-*- coding:utf8 -*-
import requests
import json
import sys
from binascii import a2b_hex, b2a_hex
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Util.number import bytes_to_long
from pathlib import Path
from datetime import datetime, timezone
class SangforSCPClient:
def __init__(self, scp_http, scp_user, scp_pwd):
"""
初始化SCP客户端
:param scp_http: SCP接口地址"https://10.1.100.1:4430"
:param scp_user: 用户名
:param scp_pwd: 密码
"""
self.SCP_HTTP = scp_http
self.SCP_USER = scp_user
self.SCP_PWD = scp_pwd
file_path = Path(__file__).resolve()
self.now_dir = file_path.parent
self.token_file = self.now_dir / "scp_token.json"
self.token = None
self.session = requests.Session()
self.session.verify = False # 禁用SSL验证
def encrypt_with_modulus(self, content, modulus=None):
"""使用模数对文本进行加密
:param content: 待加密内容,例如密码
:param modulus: 公钥模数需要调用get-public—key接口获取
:return: 加密后的content
"""
content = content.encode("utf-8")
e = int(0x10001)
n = bytes_to_long(a2b_hex(modulus))
rsa_key = RSA.construct((n, e))
# generate/export public key
public_key = rsa_key.publickey()
cipher = PKCS1_v1_5.new(public_key)
content = cipher.encrypt(content)
content = b2a_hex(content)
return str(content.decode("utf-8"))
def get_public_key(self):
"""获取公钥模组"""
url = f"{self.SCP_HTTP}/janus/public-key"
headers = {
"Content-Type": "application/json"
}
response = self.session.get(url, headers=headers)
if response.status_code == 200:
return response.json()['data']['public_key']
else:
raise Exception(f"获取公钥模组失败: {response.text}")
def get_token(self):
"""获取token"""
# 优先先从配置文件从读取token
if self.token_file.exists():
with open(self.token_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 解析时间并比较
expires_str = data["data"]["access"]["token"]["expires"]
expires = datetime.fromisoformat(expires_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if now < expires:
print("Token 未过期")
self.token = data["data"]["access"]["token"]["id"]
return self.token
# 获取新token
url = f"{self.SCP_HTTP}/janus/authenticate"
headers = {
"Content-Type": "application/json",
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466",
}
# 根据公钥模组获取加密后的密码
key = self.get_public_key().rstrip('\n')
password = self.encrypt_with_modulus(self.SCP_PWD, modulus=key)
payload = {
"auth": {"passwordCredentials": {"username": self.SCP_USER, "password": password}}
}
response = self.session.post(url, headers=headers, json=payload)
if response.status_code == 200:
# 保存结果
response_data = response.json()
with open(self.token_file, "w", encoding="utf-8") as fw:
json.dump(response_data, fw, indent=4)
self.token = response_data["data"]["access"]["token"]["id"]
return self.token
else:
raise Exception(f"获取token失败: {response.text}")
def get_servers_by_pool(self, az_id="3b706a1b-9fbc-4b5f-8768-f8260b081d7b"):
"""获取指定资源池中的所有虚拟机"""
url = f"{self.SCP_HTTP}/janus/20180725/servers?az_id={az_id}"
headers = {
"Content-Type": "application/json",
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466",
"Authorization": "Token " + self.get_token(),
}
response = self.session.get(url, headers=headers)
if response.status_code == 200:
vms = response.json()["data"]["data"]
return vms
else:
raise Exception(f"获取资源池中虚拟机失败: {response.text}")
def get_serverid_by_name(self, vm_name):
"""获取指定虚拟机的id"""
servers = self.get_servers_by_pool()
for vm in servers:
if vm["name"] == vm_name:
return vm["id"]
raise Exception(f"{vm_name} 没有找到,请确认是否存在")
def power_off(self, vm_id, force=False):
"""关闭虚拟机"""
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/stop"
headers = {
"Content-Type": "application/json",
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466",
"Authorization": "Token " + self.get_token(),
}
# 是否强制关机1为强制关机
payload = {
"force": 1 if force else 0
}
response = self.session.post(url, headers=headers, json=payload)
if response.status_code == 201:
print("云主机关闭中")
return True
else:
raise Exception(f"云主机无法关机: {response.text}")
def power_on(self, vm_id):
"""开启虚拟机"""
url = f"{self.SCP_HTTP}/janus/20180725/servers/{vm_id}/start"
headers = {
"Content-Type": "application/json",
"Cookie": "aCMPAuthToken=56debefee2c54355a49fc73dfaaa9466",
"Authorization": "Token " + self.get_token(),
}
response = self.session.post(url, headers=headers)
if response.status_code == 201:
print("云主机开机中")
return True
else:
raise Exception(f"云主机无法开启: {response.text}")
if __name__ == "__main__":
# 使用示例
scp_client = SangforSCPClient(
scp_http="https://ip:4430",
scp_user="admin",
scp_pwd="password"
)
# 虚拟机名称
vm_name = "DPM_AUTO_TEST"
try:
vm_id = scp_client.get_serverid_by_name(vm_name)
# 关机
# scp_client.power_off(vm_id)
# 开机
scp_client.power_on(vm_id)
except Exception as e:
print(f"操作失败: {str(e)}")