import quart import asyncio from .....core import app, taskmgr from .. import group from .....utils import constants @group.group_class('system', '/api/v1/system') class SystemRouterGroup(group.RouterGroup): async def initialize(self) -> None: @self.route('/info', methods=['GET'], auth_type=group.AuthType.NONE) async def _() -> str: return self.success( data={ "version": constants.semantic_version, "debug": constants.debug_mode, "enabled_platform_count": len(self.ap.platform_mgr.adapters) } ) @self.route('/tasks', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def _() -> str: task_type = quart.request.args.get("type") if task_type == '': task_type = None return self.success( data=self.ap.task_mgr.get_tasks_dict(task_type) ) @self.route('/tasks/', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def _(task_id: str) -> str: task = self.ap.task_mgr.get_task_by_id(int(task_id)) if task is None: return self.http_status(404, 404, "Task not found") return self.success(data=task.to_dict()) @self.route('/reload', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) async def _() -> str: json_data = await quart.request.json scope = json_data.get("scope") await self.ap.reload( scope=scope ) return self.success() @self.route('/_debug/exec', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) async def _() -> str: if not constants.debug_mode: return self.http_status(403, 403, "Forbidden") py_code = await quart.request.data ap = self.ap return self.success(data=exec(py_code, {"ap": ap}))