proxmox/plugins/module_utils/proxmox_api.py

77 lines
2.7 KiB
Python

from dataclasses import dataclass
from typing import List, Tuple
import requests
API_BASE_PATH: str = "/api2/json"
@dataclass()
class ProxmoxAuthInfo:
instance_url: str
token_id: str
secret: str
verify_cert: bool = True
@dataclass(frozen=True)
class ProxmoxRealm:
type: str
realm: str
comment: str = None
default: bool = False
def get_realms(auth_info: ProxmoxAuthInfo) -> List['ProxmoxRealm']:
realm_answer = _proxmox_request('get', '/access/domains', auth_info).json()
return list(map(
lambda r: ProxmoxRealm(r['type'], r['realm'], r.get('comment', None), r.get('default', False)),
realm_answer['data']
))
def set_realm_data(auth_info: ProxmoxAuthInfo, realm: str, config: dict[str, str], dry_run: bool, state: str = 'present') -> Tuple[dict, dict]:
existing_realm = _proxmox_request('get', f"/access/domains/{realm}", auth_info)
existing_realm_data = None
new_realm_data = {}
if state == 'present':
if existing_realm.ok:
existing_realm_data = existing_realm.json()['data']
if not dry_run:
realm_res = _proxmox_request('put', f"/access/domains/{realm}", auth_info, data=config)
realm_res.raise_for_status()
else:
if not dry_run:
realm_res = _proxmox_request('post', '/access/domains', auth_info, data=(config | {'realm': realm}))
realm_res.raise_for_status()
if not dry_run:
if realm_res.ok:
new_realm_data = _proxmox_request('get', f"/access/domains/{realm}", auth_info).json()['data']
else:
new_realm_data = config | {'name': realm}
else:
if existing_realm.ok:
existing_realm_data = existing_realm.json()['data']
if not dry_run:
realm_res = _proxmox_request('delete', f"/access/domains/{realm}", auth_info)
realm_res.raise_for_status()
if realm_res.ok:
new_realm_data = None
else:
new_realm_data = None
else:
new_realm_data = None
return existing_realm_data, new_realm_data
def _get_token_from_auth_info(auth_info: ProxmoxAuthInfo) -> str:
return f"PVEAPIToken={auth_info.token_id}={auth_info.secret}"
def _proxmox_request(method: str, endpoint: str, auth_info: ProxmoxAuthInfo, **params):
return requests.request(method,
f"{auth_info.instance_url}{API_BASE_PATH}{endpoint}",
headers={'Authorization': _get_token_from_auth_info(auth_info)},
verify=auth_info.verify_cert,
**params)