From c151665419f65bd23c203b52a42c65ad041796dc Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 22 Oct 2024 18:09:18 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/api/http/controller/main.py | 10 ++++- pkg/audit/center/apigroup.py | 7 ++-- pkg/core/app.py | 40 ++++++++---------- pkg/core/boot.py | 10 +++++ pkg/core/bootutils/log.py | 4 +- pkg/core/bootutils/misc.py | 0 pkg/core/stages/build_app.py | 2 + pkg/core/taskmgr.py | 73 +++++++++++++++++++++++++++++++++ pkg/pipeline/controller.py | 5 ++- pkg/platform/manager.py | 6 +-- 10 files changed, 121 insertions(+), 36 deletions(-) delete mode 100644 pkg/core/bootutils/misc.py create mode 100644 pkg/core/taskmgr.py diff --git a/pkg/api/http/controller/main.py b/pkg/api/http/controller/main.py index 99eb28e2..1e48b094 100644 --- a/pkg/api/http/controller/main.py +++ b/pkg/api/http/controller/main.py @@ -30,12 +30,18 @@ class HTTPController: while True: await asyncio.sleep(1) - task = asyncio.create_task(self.quart_app.run_task( + # task = asyncio.create_task(self.quart_app.run_task( + # host=self.ap.system_cfg.data['http-api']['host'], + # port=self.ap.system_cfg.data['http-api']['port'], + # shutdown_trigger=shutdown_trigger_placeholder + # )) + # self.ap.asyncio_tasks.append(task) + self.ap.task_mgr.create_task(self.quart_app.run_task( host=self.ap.system_cfg.data['http-api']['host'], port=self.ap.system_cfg.data['http-api']['port'], shutdown_trigger=shutdown_trigger_placeholder )) - self.ap.asyncio_tasks.append(task) + async def register_routes(self) -> None: diff --git a/pkg/audit/center/apigroup.py b/pkg/audit/center/apigroup.py index 6ae8be5d..fe7ab4f6 100644 --- a/pkg/audit/center/apigroup.py +++ b/pkg/audit/center/apigroup.py @@ -69,11 +69,12 @@ class APIGroup(metaclass=abc.ABCMeta): **kwargs ) -> asyncio.Task: """执行请求""" - task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs)) + # task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs)) - self.ap.asyncio_tasks.append(task) + # self.ap.asyncio_tasks.append(task) - return task + + return self.ap.task_mgr.create_task(self._do(method, path, data, params, headers, **kwargs)).task def gen_rid( self diff --git a/pkg/core/app.py b/pkg/core/app.py index c67cb082..c7364f30 100644 --- a/pkg/core/app.py +++ b/pkg/core/app.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import asyncio +import threading import traceback from ..platform import manager as im_mgr @@ -21,6 +22,7 @@ from ..utils import version as version_mgr, proxy as proxy_mgr, announce as anno from ..persistence import mgr as persistencemgr from ..api.http.controller import main as http_controller from ..utils import logcache +from . import taskmgr class Application: @@ -28,7 +30,8 @@ class Application: event_loop: asyncio.AbstractEventLoop = None - asyncio_tasks: list[asyncio.Task] = [] + # asyncio_tasks: list[asyncio.Task] = [] + task_mgr: taskmgr.AsyncTaskManager = None platform_mgr: im_mgr.PlatformManager = None @@ -103,8 +106,6 @@ class Application: async def run(self): await self.plugin_mgr.initialize_plugins() - tasks = [] - try: # 后续可能会允许动态重启其他任务 @@ -113,28 +114,19 @@ class Application: while True: await asyncio.sleep(1) - tasks = [ - asyncio.create_task(self.platform_mgr.run()), # 消息平台 - asyncio.create_task(self.ctrl.run()), # 消息处理循环 - asyncio.create_task(self.http_ctrl.run()), # http 接口服务 - asyncio.create_task(never_ending()) - ] - self.asyncio_tasks.extend(tasks) + # tasks = [ + # asyncio.create_task(self.platform_mgr.run()), # 消息平台 + # asyncio.create_task(self.ctrl.run()), # 消息处理循环 + # asyncio.create_task(self.http_ctrl.run()), # http 接口服务 + # asyncio.create_task(never_ending()) + # ] + # self.asyncio_tasks.extend(tasks) + self.task_mgr.create_task(self.platform_mgr.run()) + self.task_mgr.create_task(self.ctrl.run()) + self.task_mgr.create_task(self.http_ctrl.run()) + self.task_mgr.create_task(never_ending()) - # 挂系统信号处理 - import signal - - def signal_handler(sig, frame): - for task in self.asyncio_tasks: - task.cancel() - self.logger.info("程序退出.") - # 结束当前事件循环 - self.event_loop.stop() - exit(0) - - signal.signal(signal.SIGINT, signal_handler) - - await asyncio.gather(*tasks, return_exceptions=True) + await self.task_mgr.wait_all() except asyncio.CancelledError: pass except Exception as e: diff --git a/pkg/core/boot.py b/pkg/core/boot.py index 0f0ab3ae..dff772d9 100644 --- a/pkg/core/boot.py +++ b/pkg/core/boot.py @@ -49,6 +49,16 @@ async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application: async def main(loop: asyncio.AbstractEventLoop): try: + + # 挂系统信号处理 + import signal + + def signal_handler(sig, frame): + print("[Signal] 程序退出.") + os._exit(0) + + signal.signal(signal.SIGINT, signal_handler) + app_inst = await make_app(loop) await app_inst.run() except Exception as e: diff --git a/pkg/core/bootutils/log.py b/pkg/core/bootutils/log.py index 36bc51a0..dea961b5 100644 --- a/pkg/core/bootutils/log.py +++ b/pkg/core/bootutils/log.py @@ -27,8 +27,8 @@ async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging. if constants.debug_mode: level = logging.DEBUG - log_file_name = "data/logs/qcg-%s.log" % time.strftime( - "%Y-%m-%d-%H-%M-%S", time.localtime() + log_file_name = "data/logs/langbot-%s.log" % time.strftime( + "%Y-%m-%d", time.localtime() ) qcg_logger = logging.getLogger("qcg") diff --git a/pkg/core/bootutils/misc.py b/pkg/core/bootutils/misc.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index 8c63f7dc..bc169b17 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -18,6 +18,7 @@ from ...platform import manager as im_mgr from ...persistence import mgr as persistencemgr from ...api.http.controller import main as http_controller from ...utils import logcache +from .. import taskmgr @stage.stage_class("BuildAppStage") @@ -28,6 +29,7 @@ class BuildAppStage(stage.BootingStage): async def run(self, ap: app.Application): """构建app对象的各个组件对象并初始化 """ + ap.task_mgr = taskmgr.AsyncTaskManager(ap) proxy_mgr = proxy.ProxyManager(ap) await proxy_mgr.initialize() diff --git a/pkg/core/taskmgr.py b/pkg/core/taskmgr.py new file mode 100644 index 00000000..ffe15e50 --- /dev/null +++ b/pkg/core/taskmgr.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import asyncio +import typing + +from . import app + + +class TaskContext: + """任务跟踪上下文""" + + current_action: str + """当前正在执行的动作""" + + log: str + """记录日志""" + + def __init__(self): + self.current_action = "" + self.log = "" + + def log(self, msg: str): + self.log += msg + "\n" + + def set_current_action(self, action: str): + self.current_action = action + + +class TaskWrapper: + """任务包装器""" + + task_type: str = "system" # 任务类型: system 或 user + """任务类型""" + + task_context: TaskContext + """任务上下文""" + + task: asyncio.Task + """任务""" + + ap: app.Application + """应用实例""" + + def __init__(self, ap: app.Application, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None): + self.ap = ap + self.task_context = context or TaskContext() + self.task = self.ap.event_loop.create_task(coro) + self.task_type = task_type + + +class AsyncTaskManager: + """保存app中的所有异步任务 + 包含系统级的和用户级(插件安装、更新等由用户直接发起的)的""" + + ap: app.Application + + tasks: list[TaskWrapper] + """所有任务""" + + def __init__(self, ap: app.Application): + self.ap = ap + self.tasks = [] + + def create_task(self, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None) -> TaskWrapper: + wrapper = TaskWrapper(self.ap, coro, task_type, context) + self.tasks.append(wrapper) + return wrapper + + async def wait_all(self): + await asyncio.gather(*[t.task for t in self.tasks], return_exceptions=True) + + def get_all_tasks(self) -> list[TaskWrapper]: + return self.tasks diff --git a/pkg/pipeline/controller.py b/pkg/pipeline/controller.py index 3b4e2c64..29d53d71 100644 --- a/pkg/pipeline/controller.py +++ b/pkg/pipeline/controller.py @@ -60,8 +60,9 @@ class Controller: # 通知其他协程,有新的请求可以处理了 self.ap.query_pool.condition.notify_all() - task = asyncio.create_task(_process_query(selected_query)) - self.ap.asyncio_tasks.append(task) + # task = asyncio.create_task(_process_query(selected_query)) + # self.ap.asyncio_tasks.append(task) + self.ap.task_mgr.create_task(_process_query(selected_query)) except Exception as e: # traceback.print_exc() diff --git a/pkg/platform/manager.py b/pkg/platform/manager.py index 4b22d5c9..47822c28 100644 --- a/pkg/platform/manager.py +++ b/pkg/platform/manager.py @@ -184,10 +184,10 @@ class PlatformManager: tasks.append(exception_wrapper(adapter)) for task in tasks: - async_task = asyncio.create_task(task) - self.ap.asyncio_tasks.append(async_task) + # async_task = asyncio.create_task(task) + # self.ap.asyncio_tasks.append(async_task) + self.ap.task_mgr.create_task(task) except Exception as e: self.ap.logger.error('平台适配器运行出错: ' + str(e)) self.ap.logger.debug(f"Traceback: {traceback.format_exc()}") -