#!/usr/bin/env python __metaclass__ = type import os import traceback import random, string from ansible.module_utils.basic import AnsibleModule, missing_required_lib from dataclasses import asdict LIB_IMPORT_ERR = None try: from signedjson.key import * HAS_LIB = True except: HAS_LIB = False LIB_IMPORT_ERR = traceback.format_exc() DOCUMENTION = r""" --- module: synapse_signing_key author: - transcaffeine (transcaffeine@finally.coffee) requirements: - python >= 3.9 - signed_json >= 1.1.4 short_description: Generate and persist a synapse signing key description: - "Allows to generate, save and manipulate synapse signing keys" options: path: description: "Where the signing key is saved or read from" type: str required: false state: description: "State of the key to enforce, can be used to remove keys or generate them" type: str choices: [present, absent] default: present required: false """ EXAMPLES = r""" - name: Read existing signing key from filesystem finallycoffee.matrix.synapse_signing_key: path: "/path/to/existing-synapse.signing.key" - name: Delete existing signing key finallycoffee.matrix.synapse.signing_key: path: "/this/signing/key/will/be/deleted.signing.key" state: absent """ RETURN = r""" signing_key: description: Complete synapse signing key returned: always type: str """ def main() -> None: _ = dict module = AnsibleModule( argument_spec=_( path=_(required=False, type="str"), state=_( required=False, type="str", default="present", choices=["present", "absent"], ), ), supports_check_mode=True, ) result = _(changed=False, diff={}, msg="") failed = False if not HAS_LIB: module.fail_json(msg=missing_required_lib("signed_json"), exception=LIB_IMPORT_ERR) path = module.params['path'] state = module.params['state'] existing_key_found = False if path: try: keys = _read_signing_keys(module.params['path']) existing_key_found = True except FileNotFoundError: existing_key_found = False if not existing_key_found: keys = _generate_signing_key() result['changed'] = True if not module.check_mode: if state == 'present' and not existing_key_found and path: _write_signing_keys(path, keys) if state == 'absent' and existing_key_found: os.remove(path) result['changed'] = True result['diff'] = { "before_header": path, "after_header": path, "before": _render_signing_keys(keys) if existing_key_found else "", "after": "" if state == 'absent' else _render_signing_keys(keys), } result['signing_key'] = _render_signing_keys(keys) if failed: module.fail_json(**result) else: module.exit_json(**result) def _render_signing_keys(keys) -> str: return "\n".join(_render_signing_key(k) for k in keys) def _render_signing_key(key) -> str: key_b64 = encode_signing_key_base64(key) return f"{key.alg} {key.version} {key_b64}" def _read_signing_keys(file): with open(file, "r", opener=lambda path, f: os.open(path, f)) as stream: return read_signing_keys(stream) def _write_signing_keys(file, keys) -> None: with open(file, "w", opener=lambda path, f: os.open(path, f, mode=0o640)) as stream: write_signing_keys(stream, keys) def _generate_signing_key(): id = '' for i in range(0, 4): id += random.choice(string.ascii_letters) key_id = "a_" + id key = generate_signing_key(key_id) return [key] if __name__ == "__main__": main()