mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-26 03:44:58 +08:00
* feat: add WebChat adapter for pipeline debugging - Create WebChatAdapter for handling debug messages in pipeline testing - Add HTTP API endpoints for debug message sending and retrieval - Implement frontend debug dialog with session switching (private/group chat) - Add Chinese i18n translations for debug interface - Auto-create default WebChat bot during database initialization - Support fixed session IDs: webchatperson and webchatgroup for testing Co-Authored-By: Junyan Qin <Chin>, 秦骏言 in Chinese, you can call me my english name Rock Chin. <rockchinq@gmail.com> * perf: ui for webchat * feat: complete webchat backend * feat: core chat apis * perf: button style in pipeline card * perf: log btn in bot card * perf: webchat entities definition * fix: bugs * perf: web chat * perf: dialog styles * perf: styles * perf: styles * fix: group invalid in webchat * perf: simulate real im message * perf: group timeout toast * feat(webchat): add supports for mentioning bot in group * perf(webchat): at component styles * perf: at badge display in message * fix: linter errors * fix: webchat was listed on adapter list --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Junyan Qin <Chin>, 秦骏言 in Chinese, you can call me my english name Rock Chin. <rockchinq@gmail.com>
84 lines
3.3 KiB
Python
84 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import traceback
|
|
|
|
from ..core import app, entities
|
|
|
|
|
|
class Controller:
|
|
"""总控制器"""
|
|
|
|
ap: app.Application
|
|
|
|
semaphore: asyncio.Semaphore = None
|
|
"""请求并发控制信号量"""
|
|
|
|
def __init__(self, ap: app.Application):
|
|
self.ap = ap
|
|
self.semaphore = asyncio.Semaphore(self.ap.instance_config.data['concurrency']['pipeline'])
|
|
|
|
async def consumer(self):
|
|
"""事件处理循环"""
|
|
try:
|
|
while True:
|
|
selected_query: entities.Query = None
|
|
|
|
# 取请求
|
|
async with self.ap.query_pool:
|
|
queries: list[entities.Query] = self.ap.query_pool.queries
|
|
|
|
for query in queries:
|
|
session = await self.ap.sess_mgr.get_session(query)
|
|
self.ap.logger.debug(f'Checking query {query} session {session}')
|
|
|
|
if not session.semaphore.locked():
|
|
selected_query = query
|
|
await session.semaphore.acquire()
|
|
|
|
break
|
|
|
|
if selected_query: # 找到了
|
|
queries.remove(selected_query)
|
|
else: # 没找到 说明:没有请求 或者 所有query对应的session都已达到并发上限
|
|
await self.ap.query_pool.condition.wait()
|
|
continue
|
|
|
|
if selected_query:
|
|
|
|
async def _process_query(selected_query: entities.Query):
|
|
async with self.semaphore: # 总并发上限
|
|
# find pipeline
|
|
# Here firstly find the bot, then find the pipeline, in case the bot adapter's config is not the latest one.
|
|
# Like aiocqhttp, once a client is connected, even the adapter was updated and restarted, the existing client connection will not be affected.
|
|
pipeline_uuid = selected_query.pipeline_uuid
|
|
|
|
if pipeline_uuid:
|
|
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
|
|
if pipeline:
|
|
await pipeline.run(selected_query)
|
|
|
|
async with self.ap.query_pool:
|
|
(await self.ap.sess_mgr.get_session(selected_query)).semaphore.release()
|
|
# 通知其他协程,有新的请求可以处理了
|
|
self.ap.query_pool.condition.notify_all()
|
|
|
|
self.ap.task_mgr.create_task(
|
|
_process_query(selected_query),
|
|
kind='query',
|
|
name=f'query-{selected_query.query_id}',
|
|
scopes=[
|
|
entities.LifecycleControlScope.APPLICATION,
|
|
entities.LifecycleControlScope.PLATFORM,
|
|
],
|
|
)
|
|
|
|
except Exception as e:
|
|
# traceback.print_exc()
|
|
self.ap.logger.error(f'控制器循环出错: {e}')
|
|
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
|
|
|
async def run(self):
|
|
"""运行控制器"""
|
|
await self.consumer()
|