mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-25 19:37:36 +08:00
Compare commits
19 Commits
v4.3.5
...
feature/we
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4995c8cd9 | ||
|
|
127a38b15c | ||
|
|
e4729337c8 | ||
|
|
5fa75330cf | ||
|
|
547e3d098e | ||
|
|
f1ddddfe00 | ||
|
|
4e61302156 | ||
|
|
9e3cf418ba | ||
|
|
3e29ec7892 | ||
|
|
f452742cd2 | ||
|
|
b560432b0b | ||
|
|
99e5478ced | ||
|
|
09dba91a37 | ||
|
|
18ec4adac9 | ||
|
|
8bedaa468a | ||
|
|
0ab366fcac | ||
|
|
d664039e54 | ||
|
|
6535ba4f72 | ||
|
|
3b181cff93 |
4
codecov.yml
Normal file
4
codecov.yml
Normal file
@@ -0,0 +1,4 @@
|
||||
coverage:
|
||||
status:
|
||||
project: off
|
||||
patch: off
|
||||
0
libs/coze_server_api/__init__.py
Normal file
0
libs/coze_server_api/__init__.py
Normal file
192
libs/coze_server_api/client.py
Normal file
192
libs/coze_server_api/client.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import json
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import io
|
||||
from typing import Dict, List, Any, AsyncGenerator
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
|
||||
|
||||
class AsyncCozeAPIClient:
|
||||
def __init__(self, api_key: str, api_base: str = "https://api.coze.cn"):
|
||||
self.api_key = api_key
|
||||
self.api_base = api_base
|
||||
self.session = None
|
||||
|
||||
async def __aenter__(self):
|
||||
"""支持异步上下文管理器"""
|
||||
await self.coze_session()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
"""退出时自动关闭会话"""
|
||||
await self.close()
|
||||
|
||||
|
||||
|
||||
async def coze_session(self):
|
||||
"""确保HTTP session存在"""
|
||||
if self.session is None:
|
||||
connector = aiohttp.TCPConnector(
|
||||
ssl=False if self.api_base.startswith("http://") else True,
|
||||
limit=100,
|
||||
limit_per_host=30,
|
||||
keepalive_timeout=30,
|
||||
enable_cleanup_closed=True,
|
||||
)
|
||||
timeout = aiohttp.ClientTimeout(
|
||||
total=120, # 默认超时时间
|
||||
connect=30,
|
||||
sock_read=120,
|
||||
)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Accept": "text/event-stream",
|
||||
}
|
||||
self.session = aiohttp.ClientSession(
|
||||
headers=headers, timeout=timeout, connector=connector
|
||||
)
|
||||
return self.session
|
||||
|
||||
async def close(self):
|
||||
"""显式关闭会话"""
|
||||
if self.session and not self.session.closed:
|
||||
await self.session.close()
|
||||
self.session = None
|
||||
|
||||
async def upload(
|
||||
self,
|
||||
file,
|
||||
) -> str:
|
||||
# 处理 Path 对象
|
||||
if isinstance(file, Path):
|
||||
if not file.exists():
|
||||
raise ValueError(f"File not found: {file}")
|
||||
with open(file, "rb") as f:
|
||||
file = f.read()
|
||||
|
||||
# 处理文件路径字符串
|
||||
elif isinstance(file, str):
|
||||
if not os.path.isfile(file):
|
||||
raise ValueError(f"File not found: {file}")
|
||||
with open(file, "rb") as f:
|
||||
file = f.read()
|
||||
|
||||
# 处理文件对象
|
||||
elif hasattr(file, 'read'):
|
||||
file = file.read()
|
||||
|
||||
session = await self.coze_session()
|
||||
url = f"{self.api_base}/v1/files/upload"
|
||||
|
||||
try:
|
||||
file_io = io.BytesIO(file)
|
||||
async with session.post(
|
||||
url,
|
||||
data={
|
||||
"file": file_io,
|
||||
},
|
||||
timeout=aiohttp.ClientTimeout(total=60),
|
||||
) as response:
|
||||
if response.status == 401:
|
||||
raise Exception("Coze API 认证失败,请检查 API Key 是否正确")
|
||||
|
||||
response_text = await response.text()
|
||||
|
||||
|
||||
if response.status != 200:
|
||||
raise Exception(
|
||||
f"文件上传失败,状态码: {response.status}, 响应: {response_text}"
|
||||
)
|
||||
try:
|
||||
result = await response.json()
|
||||
except json.JSONDecodeError:
|
||||
raise Exception(f"文件上传响应解析失败: {response_text}")
|
||||
|
||||
if result.get("code") != 0:
|
||||
raise Exception(f"文件上传失败: {result.get('msg', '未知错误')}")
|
||||
|
||||
file_id = result["data"]["id"]
|
||||
return file_id
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception("文件上传超时")
|
||||
except Exception as e:
|
||||
raise Exception(f"文件上传失败: {str(e)}")
|
||||
|
||||
|
||||
async def chat_messages(
|
||||
self,
|
||||
bot_id: str,
|
||||
user_id: str,
|
||||
additional_messages: List[Dict] | None = None,
|
||||
conversation_id: str | None = None,
|
||||
auto_save_history: bool = True,
|
||||
stream: bool = True,
|
||||
timeout: float = 120,
|
||||
) -> AsyncGenerator[Dict[str, Any], None]:
|
||||
"""发送聊天消息并返回流式响应
|
||||
|
||||
Args:
|
||||
bot_id: Bot ID
|
||||
user_id: 用户ID
|
||||
additional_messages: 额外消息列表
|
||||
conversation_id: 会话ID
|
||||
auto_save_history: 是否自动保存历史
|
||||
stream: 是否流式响应
|
||||
timeout: 超时时间
|
||||
"""
|
||||
session = await self.coze_session()
|
||||
url = f"{self.api_base}/v3/chat"
|
||||
|
||||
payload = {
|
||||
"bot_id": bot_id,
|
||||
"user_id": user_id,
|
||||
"stream": stream,
|
||||
"auto_save_history": auto_save_history,
|
||||
}
|
||||
|
||||
if additional_messages:
|
||||
payload["additional_messages"] = additional_messages
|
||||
|
||||
params = {}
|
||||
if conversation_id:
|
||||
params["conversation_id"] = conversation_id
|
||||
|
||||
|
||||
try:
|
||||
async with session.post(
|
||||
url,
|
||||
json=payload,
|
||||
params=params,
|
||||
timeout=aiohttp.ClientTimeout(total=timeout),
|
||||
) as response:
|
||||
if response.status == 401:
|
||||
raise Exception("Coze API 认证失败,请检查 API Key 是否正确")
|
||||
|
||||
if response.status != 200:
|
||||
raise Exception(f"Coze API 流式请求失败,状态码: {response.status}")
|
||||
|
||||
|
||||
async for chunk in response.content:
|
||||
chunk = chunk.decode("utf-8")
|
||||
if chunk != '\n':
|
||||
if chunk.startswith("event:"):
|
||||
chunk_type = chunk.replace("event:", "", 1).strip()
|
||||
elif chunk.startswith("data:"):
|
||||
chunk_data = chunk.replace("data:", "", 1).strip()
|
||||
else:
|
||||
yield {"event": chunk_type, "data": json.loads(chunk_data)}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception(f"Coze API 流式请求超时 ({timeout}秒)")
|
||||
except Exception as e:
|
||||
raise Exception(f"Coze API 流式请求失败: {str(e)}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
229
pkg/api/http/controller/groups/pipelines/websocket.py
Normal file
229
pkg/api/http/controller/groups/pipelines/websocket.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""流水线调试 WebSocket 路由
|
||||
|
||||
提供基于 WebSocket 的实时双向通信,用于流水线调试。
|
||||
支持 person 和 group 两种会话类型的隔离。
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import quart
|
||||
|
||||
from ... import group
|
||||
from ....service.websocket_pool import WebSocketConnection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_client_event(connection: WebSocketConnection, message: dict, ap):
|
||||
"""处理客户端发送的事件
|
||||
|
||||
Args:
|
||||
connection: WebSocket 连接对象
|
||||
message: 客户端消息 {'type': 'xxx', 'data': {...}}
|
||||
ap: Application 实例
|
||||
"""
|
||||
event_type = message.get('type')
|
||||
data = message.get('data', {})
|
||||
|
||||
pipeline_uuid = connection.pipeline_uuid
|
||||
session_type = connection.session_type
|
||||
|
||||
try:
|
||||
webchat_adapter = ap.platform_mgr.webchat_proxy_bot.adapter
|
||||
|
||||
if event_type == 'send_message':
|
||||
# 发送消息到指定会话
|
||||
message_chain_obj = data.get('message_chain', [])
|
||||
client_message_id = data.get('client_message_id')
|
||||
|
||||
if not message_chain_obj:
|
||||
await connection.send('error', {'error': 'message_chain is required', 'error_code': 'INVALID_REQUEST'})
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Received send_message: pipeline={pipeline_uuid}, "
|
||||
f"session={session_type}, "
|
||||
f"client_msg_id={client_message_id}"
|
||||
)
|
||||
|
||||
# 调用 webchat_adapter.send_webchat_message
|
||||
# 消息将通过 reply_message_chunk 自动推送到 WebSocket
|
||||
result = None
|
||||
async for msg in webchat_adapter.send_webchat_message(
|
||||
pipeline_uuid=pipeline_uuid, session_type=session_type, message_chain_obj=message_chain_obj, is_stream=True
|
||||
):
|
||||
result = msg
|
||||
|
||||
# 发送确认
|
||||
if result:
|
||||
await connection.send(
|
||||
'message_sent',
|
||||
{
|
||||
'client_message_id': client_message_id,
|
||||
'server_message_id': result.get('id'),
|
||||
'timestamp': result.get('timestamp'),
|
||||
},
|
||||
)
|
||||
|
||||
elif event_type == 'load_history':
|
||||
# 加载指定会话的历史消息
|
||||
before_message_id = data.get('before_message_id')
|
||||
limit = data.get('limit', 50)
|
||||
|
||||
logger.info(f"Loading history: pipeline={pipeline_uuid}, session={session_type}, limit={limit}")
|
||||
|
||||
# 从对应会话获取历史消息
|
||||
messages = webchat_adapter.get_webchat_messages(pipeline_uuid, session_type)
|
||||
|
||||
# 简单分页:返回最后 limit 条
|
||||
if before_message_id:
|
||||
# TODO: 实现基于 message_id 的分页
|
||||
history_messages = messages[-limit:]
|
||||
else:
|
||||
history_messages = messages[-limit:] if len(messages) > limit else messages
|
||||
|
||||
await connection.send(
|
||||
'history', {'messages': history_messages, 'has_more': len(messages) > len(history_messages)}
|
||||
)
|
||||
|
||||
elif event_type == 'interrupt':
|
||||
# 中断消息
|
||||
message_id = data.get('message_id')
|
||||
logger.info(f"Interrupt requested: message_id={message_id}")
|
||||
|
||||
# TODO: 实现中断逻辑
|
||||
await connection.send('interrupted', {'message_id': message_id, 'partial_content': ''})
|
||||
|
||||
elif event_type == 'ping':
|
||||
# 心跳
|
||||
connection.last_ping = datetime.now()
|
||||
await connection.send('pong', {'timestamp': data.get('timestamp')})
|
||||
|
||||
else:
|
||||
logger.warning(f"Unknown event type: {event_type}")
|
||||
await connection.send('error', {'error': f'Unknown event type: {event_type}', 'error_code': 'UNKNOWN_EVENT'})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling event {event_type}: {e}", exc_info=True)
|
||||
await connection.send(
|
||||
'error',
|
||||
{'error': f'Internal server error: {str(e)}', 'error_code': 'INTERNAL_ERROR', 'details': {'event_type': event_type}},
|
||||
)
|
||||
|
||||
|
||||
@group.group_class('pipeline-websocket', '/api/v1/pipelines/<pipeline_uuid>/chat')
|
||||
class PipelineWebSocketRouterGroup(group.RouterGroup):
|
||||
"""流水线调试 WebSocket 路由组"""
|
||||
|
||||
async def initialize(self) -> None:
|
||||
@self.route('/ws')
|
||||
async def websocket_handler(pipeline_uuid: str):
|
||||
"""WebSocket 连接处理 - 会话隔离
|
||||
|
||||
连接流程:
|
||||
1. 客户端建立 WebSocket 连接
|
||||
2. 客户端发送 connect 事件(携带 session_type 和 token)
|
||||
3. 服务端验证并创建连接对象
|
||||
4. 进入消息循环,处理客户端事件
|
||||
5. 断开时清理连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
"""
|
||||
websocket = quart.websocket._get_current_object()
|
||||
connection_id = str(uuid.uuid4())
|
||||
session_key = None
|
||||
connection = None
|
||||
|
||||
try:
|
||||
# 1. 等待客户端发送 connect 事件
|
||||
first_message = await websocket.receive_json()
|
||||
|
||||
if first_message.get('type') != 'connect':
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'First message must be connect event', 'error_code': 'INVALID_HANDSHAKE'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
connect_data = first_message.get('data', {})
|
||||
session_type = connect_data.get('session_type')
|
||||
token = connect_data.get('token')
|
||||
|
||||
# 验证参数
|
||||
if session_type not in ['person', 'group']:
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'session_type must be person or group', 'error_code': 'INVALID_SESSION_TYPE'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 验证 token
|
||||
if not token:
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'token is required', 'error_code': 'MISSING_TOKEN'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 验证用户身份
|
||||
try:
|
||||
user = await self.ap.user_service.verify_token(token)
|
||||
if not user:
|
||||
await websocket.send_json({'type': 'error', 'data': {'error': 'Unauthorized', 'error_code': 'UNAUTHORIZED'}})
|
||||
await websocket.close(1008)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Token verification failed: {e}")
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'Token verification failed', 'error_code': 'AUTH_ERROR'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 2. 创建连接对象并加入连接池
|
||||
connection = WebSocketConnection(
|
||||
connection_id=connection_id,
|
||||
websocket=websocket,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
session_type=session_type,
|
||||
created_at=datetime.now(),
|
||||
last_ping=datetime.now(),
|
||||
)
|
||||
|
||||
session_key = connection.session_key
|
||||
ws_pool = self.ap.ws_pool
|
||||
ws_pool.add_connection(connection)
|
||||
|
||||
# 3. 发送连接成功事件
|
||||
await connection.send(
|
||||
'connected', {'connection_id': connection_id, 'session_type': session_type, 'pipeline_uuid': pipeline_uuid}
|
||||
)
|
||||
|
||||
logger.info(f"WebSocket connected: {connection_id} [pipeline={pipeline_uuid}, session={session_type}]")
|
||||
|
||||
# 4. 进入消息处理循环
|
||||
while True:
|
||||
try:
|
||||
message = await websocket.receive_json()
|
||||
await handle_client_event(connection, message, self.ap)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"WebSocket connection cancelled: {connection_id}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error receiving message from {connection_id}: {e}")
|
||||
break
|
||||
|
||||
except quart.exceptions.WebsocketDisconnected:
|
||||
logger.info(f"WebSocket disconnected: {connection_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket error for {connection_id}: {e}", exc_info=True)
|
||||
finally:
|
||||
# 清理连接
|
||||
if connection and session_key:
|
||||
ws_pool = self.ap.ws_pool
|
||||
await ws_pool.remove_connection(connection_id, session_key)
|
||||
logger.info(f"WebSocket connection cleaned up: {connection_id}")
|
||||
211
pkg/api/http/service/websocket_pool.py
Normal file
211
pkg/api/http/service/websocket_pool.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""WebSocket 连接池管理
|
||||
|
||||
用于管理流水线调试的 WebSocket 连接,支持会话隔离和消息广播。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import quart
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebSocketConnection:
|
||||
"""单个 WebSocket 连接"""
|
||||
|
||||
connection_id: str
|
||||
websocket: quart.websocket.WebSocket
|
||||
pipeline_uuid: str
|
||||
session_type: str # 'person' 或 'group'
|
||||
created_at: datetime
|
||||
last_ping: datetime
|
||||
|
||||
@property
|
||||
def session_key(self) -> str:
|
||||
"""会话唯一标识: pipeline_uuid:session_type"""
|
||||
return f"{self.pipeline_uuid}:{self.session_type}"
|
||||
|
||||
async def send(self, event_type: str, data: dict):
|
||||
"""发送事件到客户端
|
||||
|
||||
Args:
|
||||
event_type: 事件类型
|
||||
data: 事件数据
|
||||
"""
|
||||
try:
|
||||
await self.websocket.send_json({"type": event_type, "data": data})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send message to {self.connection_id}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
class WebSocketConnectionPool:
|
||||
"""WebSocket 连接池 - 按会话隔离
|
||||
|
||||
连接池结构:
|
||||
connections[session_key][connection_id] = WebSocketConnection
|
||||
其中 session_key = f"{pipeline_uuid}:{session_type}"
|
||||
|
||||
这样可以确保:
|
||||
- person 和 group 会话完全隔离
|
||||
- 不同 pipeline 的会话隔离
|
||||
- 同一会话的多个连接可以同步接收消息(多标签页)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.connections: dict[str, dict[str, WebSocketConnection]] = {}
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
def add_connection(self, conn: WebSocketConnection):
|
||||
"""添加连接到指定会话
|
||||
|
||||
Args:
|
||||
conn: WebSocket 连接对象
|
||||
"""
|
||||
session_key = conn.session_key
|
||||
|
||||
if session_key not in self.connections:
|
||||
self.connections[session_key] = {}
|
||||
|
||||
self.connections[session_key][conn.connection_id] = conn
|
||||
|
||||
logger.info(
|
||||
f"WebSocket connection added: {conn.connection_id} "
|
||||
f"to session {session_key} "
|
||||
f"(total: {len(self.connections[session_key])} connections)"
|
||||
)
|
||||
|
||||
async def remove_connection(self, connection_id: str, session_key: str):
|
||||
"""从指定会话移除连接
|
||||
|
||||
Args:
|
||||
connection_id: 连接 ID
|
||||
session_key: 会话标识
|
||||
"""
|
||||
async with self._lock:
|
||||
if session_key in self.connections:
|
||||
conn = self.connections[session_key].pop(connection_id, None)
|
||||
|
||||
# 如果该会话没有连接了,清理会话
|
||||
if not self.connections[session_key]:
|
||||
del self.connections[session_key]
|
||||
|
||||
if conn:
|
||||
logger.info(
|
||||
f"WebSocket connection removed: {connection_id} "
|
||||
f"from session {session_key} "
|
||||
f"(remaining: {len(self.connections.get(session_key, {}))} connections)"
|
||||
)
|
||||
|
||||
def get_connection(self, connection_id: str, session_key: str) -> Optional[WebSocketConnection]:
|
||||
"""获取指定连接
|
||||
|
||||
Args:
|
||||
connection_id: 连接 ID
|
||||
session_key: 会话标识
|
||||
|
||||
Returns:
|
||||
WebSocketConnection 或 None
|
||||
"""
|
||||
return self.connections.get(session_key, {}).get(connection_id)
|
||||
|
||||
def get_connections_by_session(self, pipeline_uuid: str, session_type: str) -> list[WebSocketConnection]:
|
||||
"""获取指定会话的所有连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型 ('person' 或 'group')
|
||||
|
||||
Returns:
|
||||
连接列表
|
||||
"""
|
||||
session_key = f"{pipeline_uuid}:{session_type}"
|
||||
return list(self.connections.get(session_key, {}).values())
|
||||
|
||||
async def broadcast_to_session(self, pipeline_uuid: str, session_type: str, event_type: str, data: dict):
|
||||
"""广播消息到指定会话的所有连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型 ('person' 或 'group')
|
||||
event_type: 事件类型
|
||||
data: 事件数据
|
||||
"""
|
||||
connections = self.get_connections_by_session(pipeline_uuid, session_type)
|
||||
|
||||
if not connections:
|
||||
logger.debug(f"No connections for session {pipeline_uuid}:{session_type}, skipping broadcast")
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Broadcasting {event_type} to session {pipeline_uuid}:{session_type}, " f"{len(connections)} connections"
|
||||
)
|
||||
|
||||
# 并发发送到所有连接,忽略失败的连接
|
||||
results = await asyncio.gather(*[conn.send(event_type, data) for conn in connections], return_exceptions=True)
|
||||
|
||||
# 统计失败的连接
|
||||
failed_count = sum(1 for result in results if isinstance(result, Exception))
|
||||
if failed_count > 0:
|
||||
logger.warning(f"Failed to send to {failed_count}/{len(connections)} connections")
|
||||
|
||||
def get_all_sessions(self) -> list[str]:
|
||||
"""获取所有活跃会话的 session_key 列表
|
||||
|
||||
Returns:
|
||||
会话标识列表
|
||||
"""
|
||||
return list(self.connections.keys())
|
||||
|
||||
def get_connection_count(self, pipeline_uuid: str, session_type: str) -> int:
|
||||
"""获取指定会话的连接数量
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型
|
||||
|
||||
Returns:
|
||||
连接数量
|
||||
"""
|
||||
session_key = f"{pipeline_uuid}:{session_type}"
|
||||
return len(self.connections.get(session_key, {}))
|
||||
|
||||
async def cleanup_stale_connections(self, timeout_seconds: int = 120):
|
||||
"""清理超时的连接
|
||||
|
||||
Args:
|
||||
timeout_seconds: 超时时间(秒)
|
||||
"""
|
||||
now = datetime.now()
|
||||
stale_connections = []
|
||||
|
||||
# 查找超时连接
|
||||
for session_key, session_conns in self.connections.items():
|
||||
for conn_id, conn in session_conns.items():
|
||||
elapsed = (now - conn.last_ping).total_seconds()
|
||||
if elapsed > timeout_seconds:
|
||||
stale_connections.append((conn_id, session_key))
|
||||
|
||||
# 移除超时连接
|
||||
for conn_id, session_key in stale_connections:
|
||||
logger.warning(f"Removing stale connection: {conn_id} from {session_key}")
|
||||
await self.remove_connection(conn_id, session_key)
|
||||
|
||||
# 尝试关闭 WebSocket
|
||||
try:
|
||||
conn = self.get_connection(conn_id, session_key)
|
||||
if conn:
|
||||
await conn.websocket.close(1000, "Connection timeout")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing stale connection {conn_id}: {e}")
|
||||
|
||||
if stale_connections:
|
||||
logger.info(f"Cleaned up {len(stale_connections)} stale connections")
|
||||
@@ -105,6 +105,10 @@ class Application:
|
||||
|
||||
storage_mgr: storagemgr.StorageMgr = None
|
||||
|
||||
# ========= WebSocket =========
|
||||
|
||||
ws_pool = None # WebSocketConnectionPool
|
||||
|
||||
# ========= HTTP Services =========
|
||||
|
||||
user_service: user_service.UserService = None
|
||||
|
||||
@@ -19,6 +19,7 @@ from ...api.http.service import model as model_service
|
||||
from ...api.http.service import pipeline as pipeline_service
|
||||
from ...api.http.service import bot as bot_service
|
||||
from ...api.http.service import knowledge as knowledge_service
|
||||
from ...api.http.service import websocket_pool
|
||||
from ...discover import engine as discover_engine
|
||||
from ...storage import mgr as storagemgr
|
||||
from ...utils import logcache
|
||||
@@ -87,10 +88,18 @@ class BuildAppStage(stage.BootingStage):
|
||||
await llm_tool_mgr_inst.initialize()
|
||||
ap.tool_mgr = llm_tool_mgr_inst
|
||||
|
||||
# Initialize WebSocket connection pool
|
||||
ws_pool_inst = websocket_pool.WebSocketConnectionPool()
|
||||
ap.ws_pool = ws_pool_inst
|
||||
|
||||
im_mgr_inst = im_mgr.PlatformManager(ap=ap)
|
||||
await im_mgr_inst.initialize()
|
||||
ap.platform_mgr = im_mgr_inst
|
||||
|
||||
# Inject WebSocket pool into WebChatAdapter
|
||||
if hasattr(ap.platform_mgr, 'webchat_proxy_bot') and ap.platform_mgr.webchat_proxy_bot:
|
||||
ap.platform_mgr.webchat_proxy_bot.adapter.set_ws_pool(ws_pool_inst)
|
||||
|
||||
pipeline_mgr = pipelinemgr.PipelineManager(ap)
|
||||
await pipeline_mgr.initialize()
|
||||
ap.pipeline_mgr = pipeline_mgr
|
||||
|
||||
@@ -213,7 +213,7 @@ class RuntimePipeline:
|
||||
await self._execute_from_stage(0, query)
|
||||
except Exception as e:
|
||||
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
|
||||
self.ap.logger.error(f'处理请求时出错 query_id={query.query_id} stage={inst_name} : {e}')
|
||||
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
|
||||
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
||||
finally:
|
||||
self.ap.logger.debug(f'Query {query.query_id} processed')
|
||||
|
||||
@@ -35,11 +35,17 @@ class PreProcessor(stage.PipelineStage):
|
||||
session = await self.ap.sess_mgr.get_session(query)
|
||||
|
||||
# When not local-agent, llm_model is None
|
||||
llm_model = (
|
||||
await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model'])
|
||||
if selected_runner == 'local-agent'
|
||||
else None
|
||||
)
|
||||
try:
|
||||
llm_model = (
|
||||
await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model'])
|
||||
if selected_runner == 'local-agent'
|
||||
else None
|
||||
)
|
||||
except ValueError:
|
||||
self.ap.logger.warning(
|
||||
f'LLM model {query.pipeline_config["ai"]["local-agent"]["model"] + " "}not found or not configured'
|
||||
)
|
||||
llm_model = None
|
||||
|
||||
conversation = await self.ap.sess_mgr.get_conversation(
|
||||
query,
|
||||
@@ -54,7 +60,7 @@ class PreProcessor(stage.PipelineStage):
|
||||
query.prompt = conversation.prompt.copy()
|
||||
query.messages = conversation.messages.copy()
|
||||
|
||||
if selected_runner == 'local-agent':
|
||||
if selected_runner == 'local-agent' and llm_model:
|
||||
query.use_funcs = []
|
||||
query.use_llm_model_uuid = llm_model.model_entity.uuid
|
||||
|
||||
@@ -72,7 +78,11 @@ class PreProcessor(stage.PipelineStage):
|
||||
|
||||
# Check if this model supports vision, if not, remove all images
|
||||
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
|
||||
if selected_runner == 'local-agent' and not llm_model.model_entity.abilities.__contains__('vision'):
|
||||
if (
|
||||
selected_runner == 'local-agent'
|
||||
and llm_model
|
||||
and not llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
for msg in query.messages:
|
||||
if isinstance(msg.content, list):
|
||||
for me in msg.content:
|
||||
@@ -89,7 +99,9 @@ class PreProcessor(stage.PipelineStage):
|
||||
content_list.append(provider_message.ContentElement.from_text(me.text))
|
||||
plain_text += me.text
|
||||
elif isinstance(me, platform_message.Image):
|
||||
if selected_runner != 'local-agent' or llm_model.model_entity.abilities.__contains__('vision'):
|
||||
if selected_runner != 'local-agent' or (
|
||||
llm_model and llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
if me.base64 is not None:
|
||||
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
|
||||
elif isinstance(me, platform_message.File):
|
||||
@@ -100,7 +112,9 @@ class PreProcessor(stage.PipelineStage):
|
||||
if isinstance(msg, platform_message.Plain):
|
||||
content_list.append(provider_message.ContentElement.from_text(msg.text))
|
||||
elif isinstance(msg, platform_message.Image):
|
||||
if selected_runner != 'local-agent' or llm_model.model_entity.abilities.__contains__('vision'):
|
||||
if selected_runner != 'local-agent' or (
|
||||
llm_model and llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
if msg.base64 is not None:
|
||||
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ from .. import handler
|
||||
from ... import entities
|
||||
from ....provider import runner as runner_module
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.events as events
|
||||
from ....utils import importutil
|
||||
from ....provider import runners
|
||||
@@ -47,18 +46,19 @@ class ChatMessageHandler(handler.MessageHandler):
|
||||
event_ctx = await self.ap.plugin_connector.emit_event(event)
|
||||
|
||||
is_create_card = False # 判断下是否需要创建流式卡片
|
||||
|
||||
if event_ctx.is_prevented_default():
|
||||
if event_ctx.event.reply is not None:
|
||||
mc = platform_message.MessageChain(event_ctx.event.reply)
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
mc = event_ctx.event.reply_message_chain
|
||||
query.resp_messages.append(mc)
|
||||
|
||||
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
|
||||
else:
|
||||
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
|
||||
else:
|
||||
if event_ctx.event.alter is not None:
|
||||
if event_ctx.event.user_message_alter is not None:
|
||||
# if isinstance(event_ctx.event, str): # 现在暂时不考虑多模态alter
|
||||
query.user_message.content = event_ctx.event.alter
|
||||
query.user_message.content = event_ctx.event.user_message_alter
|
||||
|
||||
text_length = 0
|
||||
try:
|
||||
|
||||
@@ -5,7 +5,6 @@ import typing
|
||||
from .. import handler
|
||||
from ... import entities
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.events as events
|
||||
@@ -49,8 +48,8 @@ class CommandHandler(handler.MessageHandler):
|
||||
event_ctx = await self.ap.plugin_connector.emit_event(event)
|
||||
|
||||
if event_ctx.is_prevented_default():
|
||||
if event_ctx.event.reply is not None:
|
||||
mc = platform_message.MessageChain(event_ctx.event.reply)
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
mc = event_ctx.event.reply_message_chain
|
||||
|
||||
query.resp_messages.append(mc)
|
||||
|
||||
@@ -59,11 +58,6 @@ class CommandHandler(handler.MessageHandler):
|
||||
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
|
||||
|
||||
else:
|
||||
if event_ctx.event.alter is not None:
|
||||
query.message_chain = platform_message.MessageChain(
|
||||
[platform_message.Plain(text=event_ctx.event.alter)]
|
||||
)
|
||||
|
||||
session = await self.ap.sess_mgr.get_session(query)
|
||||
|
||||
async for ret in self.ap.cmd_mgr.execute(
|
||||
@@ -80,8 +74,12 @@ class CommandHandler(handler.MessageHandler):
|
||||
self.ap.logger.info(f'Command({query.query_id}) error: {self.cut_str(str(ret.error))}')
|
||||
|
||||
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
|
||||
elif (ret.text is not None or ret.image_url is not None or ret.image_base64 is not None
|
||||
or ret.file_url is not None):
|
||||
elif (
|
||||
ret.text is not None
|
||||
or ret.image_url is not None
|
||||
or ret.image_base64 is not None
|
||||
or ret.file_url is not None
|
||||
):
|
||||
content: list[provider_message.ContentElement] = []
|
||||
|
||||
if ret.text is not None:
|
||||
|
||||
@@ -21,7 +21,7 @@ class AtBotRule(rule_model.GroupRespondRule):
|
||||
def remove_at(message_chain: platform_message.MessageChain):
|
||||
nonlocal found
|
||||
for component in message_chain.root:
|
||||
if isinstance(component, platform_message.At) and component.target == query.adapter.bot_account_id:
|
||||
if isinstance(component, platform_message.At) and str(component.target) == str(query.adapter.bot_account_id):
|
||||
message_chain.remove(component)
|
||||
found = True
|
||||
break
|
||||
|
||||
@@ -80,8 +80,8 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
new_query=query,
|
||||
)
|
||||
else:
|
||||
if event_ctx.event.reply is not None:
|
||||
query.resp_message_chain.append(platform_message.MessageChain(event_ctx.event.reply))
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
|
||||
|
||||
else:
|
||||
query.resp_message_chain.append(result.get_content_platform_message_chain())
|
||||
@@ -123,10 +123,8 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
new_query=query,
|
||||
)
|
||||
else:
|
||||
if event_ctx.event.reply is not None:
|
||||
query.resp_message_chain.append(
|
||||
platform_message.MessageChain(text=event_ctx.event.reply)
|
||||
)
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
|
||||
|
||||
else:
|
||||
query.resp_message_chain.append(
|
||||
|
||||
@@ -139,19 +139,15 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter
|
||||
event_converter: QQOfficialEventConverter = QQOfficialEventConverter()
|
||||
|
||||
def __init__(self, config: dict, logger: EventLogger):
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
bot = QQOfficialClient(
|
||||
app_id=config['appid'], secret=config['secret'], token=config['token'], logger=logger
|
||||
)
|
||||
|
||||
required_keys = [
|
||||
'appid',
|
||||
'secret',
|
||||
]
|
||||
missing_keys = [key for key in required_keys if key not in config]
|
||||
if missing_keys:
|
||||
raise command_errors.ParamNotEnoughError('QQ官方机器人缺少相关配置项,请查看文档或联系管理员')
|
||||
|
||||
self.bot = QQOfficialClient(
|
||||
app_id=config['appid'], secret=config['secret'], token=config['token'], logger=self.logger
|
||||
super().__init__(
|
||||
config=config,
|
||||
logger=logger,
|
||||
bot=bot,
|
||||
bot_account_id=config['appid'],
|
||||
)
|
||||
|
||||
async def reply_message(
|
||||
|
||||
@@ -58,6 +58,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
debug_messages: dict[str, list[dict]] = pydantic.Field(default_factory=dict, exclude=True)
|
||||
|
||||
ap: app.Application = pydantic.Field(exclude=True)
|
||||
ws_pool: typing.Any = pydantic.Field(exclude=True, default=None) # WebSocketConnectionPool
|
||||
|
||||
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs):
|
||||
super().__init__(
|
||||
@@ -72,6 +73,15 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
self.bot_account_id = 'webchatbot'
|
||||
|
||||
self.debug_messages = {}
|
||||
self.ws_pool = None
|
||||
|
||||
def set_ws_pool(self, ws_pool):
|
||||
"""设置 WebSocket 连接池
|
||||
|
||||
Args:
|
||||
ws_pool: WebSocketConnectionPool 实例
|
||||
"""
|
||||
self.ws_pool = ws_pool
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
@@ -130,7 +140,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
quote_origin: bool = False,
|
||||
is_final: bool = False,
|
||||
) -> dict:
|
||||
"""回复消息"""
|
||||
"""Reply message chunk - supports both SSE (legacy) and WebSocket"""
|
||||
message_data = WebChatMessage(
|
||||
id=-1,
|
||||
role='assistant',
|
||||
@@ -139,24 +149,32 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
timestamp=datetime.now().isoformat(),
|
||||
)
|
||||
|
||||
# notify waiter
|
||||
session = (
|
||||
self.webchat_group_session
|
||||
if isinstance(message_source, platform_events.GroupMessage)
|
||||
else self.webchat_person_session
|
||||
)
|
||||
if message_source.message_chain.message_id not in session.resp_waiters:
|
||||
# session.resp_waiters[message_source.message_chain.message_id] = asyncio.Queue()
|
||||
queue = session.resp_queues[message_source.message_chain.message_id]
|
||||
# Determine session type
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_type = 'group'
|
||||
session = self.webchat_group_session
|
||||
else: # FriendMessage
|
||||
session_type = 'person'
|
||||
session = self.webchat_person_session
|
||||
|
||||
# if isinstance(message_source, platform_events.FriendMessage):
|
||||
# queue = self.webchat_person_session.resp_queues[message_source.message_chain.message_id]
|
||||
# elif isinstance(message_source, platform_events.GroupMessage):
|
||||
# queue = self.webchat_group_session.resp_queues[message_source.message_chain.message_id]
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
message_data.is_final = True
|
||||
# print(message_data)
|
||||
await queue.put(message_data)
|
||||
# Legacy SSE support: put message into queue
|
||||
if message_source.message_chain.message_id in session.resp_queues:
|
||||
queue = session.resp_queues[message_source.message_chain.message_id]
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
message_data.is_final = True
|
||||
await queue.put(message_data)
|
||||
|
||||
# WebSocket support: broadcast to all connections
|
||||
if self.ws_pool:
|
||||
pipeline_uuid = self.ap.platform_mgr.webchat_proxy_bot.bot_entity.use_pipeline_uuid
|
||||
|
||||
# Determine event type
|
||||
event_type = 'message_complete' if (is_final and bot_message.tool_calls is None) else 'message_chunk'
|
||||
|
||||
# Broadcast to specified session only
|
||||
await self.ws_pool.broadcast_to_session(
|
||||
pipeline_uuid=pipeline_uuid, session_type=session_type, event_type=event_type, data=message_data.model_dump()
|
||||
)
|
||||
|
||||
return message_data.model_dump()
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
|
||||
pattern = r'@\S{1,20}'
|
||||
content_no_preifx = re.sub(pattern, '', content_no_preifx)
|
||||
|
||||
return platform_message.MessageChain([platform_message.Plain(content_no_preifx)])
|
||||
return platform_message.MessageChain([platform_message.Plain(text=content_no_preifx)])
|
||||
|
||||
async def _handler_image(self, message: Optional[dict], content_no_preifx: str) -> platform_message.MessageChain:
|
||||
"""处理图像消息 (msg_type=3)"""
|
||||
@@ -265,7 +265,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
|
||||
# 文本消息
|
||||
try:
|
||||
if '<msg>' not in quote_data:
|
||||
quote_data_message_list.append(platform_message.Plain(quote_data))
|
||||
quote_data_message_list.append(platform_message.Plain(text=quote_data))
|
||||
else:
|
||||
# 引用消息展开
|
||||
quote_data_xml = ET.fromstring(quote_data)
|
||||
@@ -280,7 +280,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
|
||||
quote_data_message_list.extend(await self._handler_compound(None, quote_data))
|
||||
except Exception as e:
|
||||
self.logger.error(f'处理引用消息异常 expcetion:{e}')
|
||||
quote_data_message_list.append(platform_message.Plain(quote_data))
|
||||
quote_data_message_list.append(platform_message.Plain(text=quote_data))
|
||||
message_list.append(
|
||||
platform_message.Quote(
|
||||
sender_id=sender_id,
|
||||
@@ -290,7 +290,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
|
||||
if len(user_data) > 0:
|
||||
pattern = r'@\S{1,20}'
|
||||
user_data = re.sub(pattern, '', user_data)
|
||||
message_list.append(platform_message.Plain(user_data))
|
||||
message_list.append(platform_message.Plain(text=user_data))
|
||||
|
||||
return platform_message.MessageChain(message_list)
|
||||
|
||||
@@ -543,7 +543,6 @@ class WeChatPadAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
] = {}
|
||||
|
||||
def __init__(self, config: dict, logger: EventLogger):
|
||||
|
||||
quart_app = quart.Quart(__name__)
|
||||
|
||||
message_converter = WeChatPadMessageConverter(config, logger)
|
||||
@@ -551,15 +550,14 @@ class WeChatPadAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
bot = WeChatPadClient(config['wechatpad_url'], config['token'])
|
||||
super().__init__(
|
||||
config=config,
|
||||
logger = logger,
|
||||
quart_app = quart_app,
|
||||
message_converter =message_converter,
|
||||
event_converter = event_converter,
|
||||
logger=logger,
|
||||
quart_app=quart_app,
|
||||
message_converter=message_converter,
|
||||
event_converter=event_converter,
|
||||
listeners={},
|
||||
bot_account_id ='',
|
||||
name="WeChatPad",
|
||||
bot_account_id='',
|
||||
name='WeChatPad',
|
||||
bot=bot,
|
||||
|
||||
)
|
||||
|
||||
async def ws_message(self, data):
|
||||
|
||||
@@ -18,7 +18,7 @@ from langbot_plugin.api.entities import events
|
||||
from langbot_plugin.api.entities import context
|
||||
import langbot_plugin.runtime.io.connection as base_connection
|
||||
from langbot_plugin.api.definition.components.manifest import ComponentManifest
|
||||
from langbot_plugin.api.entities.builtin.command import context as command_context
|
||||
from langbot_plugin.api.entities.builtin.command import context as command_context, errors as command_errors
|
||||
from langbot_plugin.runtime.plugin.mgr import PluginInstallSource
|
||||
from ..core import taskmgr
|
||||
|
||||
@@ -191,6 +191,9 @@ class PluginRuntimeConnector:
|
||||
task_context.trace(trace)
|
||||
|
||||
async def list_plugins(self) -> list[dict[str, Any]]:
|
||||
if not self.is_enable_plugin:
|
||||
return []
|
||||
|
||||
return await self.handler.list_plugins()
|
||||
|
||||
async def get_plugin_info(self, author: str, plugin_name: str) -> dict[str, Any]:
|
||||
@@ -211,21 +214,31 @@ class PluginRuntimeConnector:
|
||||
|
||||
if not self.is_enable_plugin:
|
||||
return event_ctx
|
||||
event_ctx_result = await self.handler.emit_event(event_ctx.model_dump(serialize_as_any=True))
|
||||
|
||||
event_ctx_result = await self.handler.emit_event(event_ctx.model_dump(serialize_as_any=False))
|
||||
|
||||
event_ctx = context.EventContext.model_validate(event_ctx_result['event_context'])
|
||||
|
||||
return event_ctx
|
||||
|
||||
async def list_tools(self) -> list[ComponentManifest]:
|
||||
if not self.is_enable_plugin:
|
||||
return []
|
||||
|
||||
list_tools_data = await self.handler.list_tools()
|
||||
|
||||
return [ComponentManifest.model_validate(tool) for tool in list_tools_data]
|
||||
|
||||
async def call_tool(self, tool_name: str, parameters: dict[str, Any]) -> dict[str, Any]:
|
||||
if not self.is_enable_plugin:
|
||||
return {'error': 'Tool not found: plugin system is disabled'}
|
||||
|
||||
return await self.handler.call_tool(tool_name, parameters)
|
||||
|
||||
async def list_commands(self) -> list[ComponentManifest]:
|
||||
if not self.is_enable_plugin:
|
||||
return []
|
||||
|
||||
list_commands_data = await self.handler.list_commands()
|
||||
|
||||
return [ComponentManifest.model_validate(command) for command in list_commands_data]
|
||||
@@ -233,6 +246,9 @@ class PluginRuntimeConnector:
|
||||
async def execute_command(
|
||||
self, command_ctx: command_context.ExecuteContext
|
||||
) -> typing.AsyncGenerator[command_context.CommandReturn, None]:
|
||||
if not self.is_enable_plugin:
|
||||
yield command_context.CommandReturn(error=command_errors.CommandNotFoundError(command_ctx.command))
|
||||
|
||||
gen = self.handler.execute_command(command_ctx.model_dump(serialize_as_any=True))
|
||||
|
||||
async for ret in gen:
|
||||
|
||||
312
pkg/provider/runners/cozeapi.py
Normal file
312
pkg/provider/runners/cozeapi.py
Normal file
@@ -0,0 +1,312 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import json
|
||||
import uuid
|
||||
import base64
|
||||
|
||||
from .. import runner
|
||||
from ...core import app
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
from ...utils import image
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
from libs.coze_server_api.client import AsyncCozeAPIClient
|
||||
|
||||
@runner.runner_class('coze-api')
|
||||
class CozeAPIRunner(runner.RequestRunner):
|
||||
"""Coze API 对话请求器"""
|
||||
|
||||
def __init__(self, ap: app.Application, pipeline_config: dict):
|
||||
self.pipeline_config = pipeline_config
|
||||
self.ap = ap
|
||||
self.agent_token = pipeline_config["ai"]['coze-api']['api-key']
|
||||
self.bot_id = pipeline_config["ai"]['coze-api'].get('bot-id')
|
||||
self.chat_timeout = pipeline_config["ai"]['coze-api'].get('timeout')
|
||||
self.auto_save_history = pipeline_config["ai"]['coze-api'].get('auto_save_history')
|
||||
self.api_base = pipeline_config["ai"]['coze-api'].get('api-base')
|
||||
|
||||
self.coze = AsyncCozeAPIClient(
|
||||
self.agent_token,
|
||||
self.api_base
|
||||
)
|
||||
|
||||
def _process_thinking_content(
|
||||
self,
|
||||
content: str,
|
||||
) -> tuple[str, str]:
|
||||
"""处理思维链内容
|
||||
|
||||
Args:
|
||||
content: 原始内容
|
||||
Returns:
|
||||
(处理后的内容, 提取的思维链内容)
|
||||
"""
|
||||
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
|
||||
thinking_content = ''
|
||||
# 从 content 中提取 <think> 标签内容
|
||||
if content and '<think>' in content and '</think>' in content:
|
||||
import re
|
||||
|
||||
think_pattern = r'<think>(.*?)</think>'
|
||||
think_matches = re.findall(think_pattern, content, re.DOTALL)
|
||||
if think_matches:
|
||||
thinking_content = '\n'.join(think_matches)
|
||||
# 移除 content 中的 <think> 标签
|
||||
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
|
||||
|
||||
# 根据 remove_think 参数决定是否保留思维链
|
||||
if remove_think:
|
||||
return content, ''
|
||||
else:
|
||||
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
|
||||
if thinking_content:
|
||||
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
|
||||
return content, thinking_content
|
||||
|
||||
async def _preprocess_user_message(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""预处理用户消息,转换为Coze消息格式
|
||||
|
||||
Returns:
|
||||
list[dict]: Coze消息列表
|
||||
"""
|
||||
messages = []
|
||||
|
||||
if isinstance(query.user_message.content, list):
|
||||
# 多模态消息处理
|
||||
content_parts = []
|
||||
|
||||
for ce in query.user_message.content:
|
||||
if ce.type == 'text':
|
||||
content_parts.append({"type": "text", "text": ce.text})
|
||||
elif ce.type == 'image_base64':
|
||||
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
|
||||
file_bytes = base64.b64decode(image_b64)
|
||||
file_id = await self._get_file_id(file_bytes)
|
||||
content_parts.append({"type": "image", "file_id": file_id})
|
||||
elif ce.type == 'file':
|
||||
# 处理文件,上传到Coze
|
||||
file_id = await self._get_file_id(ce.file)
|
||||
content_parts.append({"type": "file", "file_id": file_id})
|
||||
|
||||
# 创建多模态消息
|
||||
if content_parts:
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": json.dumps(content_parts),
|
||||
"content_type": "object_string",
|
||||
"meta_data": None
|
||||
})
|
||||
|
||||
elif isinstance(query.user_message.content, str):
|
||||
# 纯文本消息
|
||||
messages.append({
|
||||
"role": "user",
|
||||
"content": query.user_message.content,
|
||||
"content_type": "text",
|
||||
"meta_data": None
|
||||
})
|
||||
|
||||
return messages
|
||||
|
||||
async def _get_file_id(self, file) -> str:
|
||||
"""上传文件到Coze服务
|
||||
Args:
|
||||
file: 文件
|
||||
Returns:
|
||||
str: 文件ID
|
||||
"""
|
||||
file_id = await self.coze.upload(file=file)
|
||||
return file_id
|
||||
|
||||
async def _chat_messages(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""调用聊天助手(非流式)
|
||||
|
||||
注意:由于cozepy没有提供非流式API,这里使用流式API并在结束后一次性返回完整内容
|
||||
"""
|
||||
user_id = f'{query.launcher_id}_{query.sender_id}'
|
||||
|
||||
# 预处理用户消息
|
||||
additional_messages = await self._preprocess_user_message(query)
|
||||
|
||||
# 获取会话ID
|
||||
conversation_id = None
|
||||
|
||||
# 收集完整内容
|
||||
full_content = ''
|
||||
full_reasoning = ''
|
||||
|
||||
try:
|
||||
# 调用Coze API流式接口
|
||||
async for chunk in self.coze.chat_messages(
|
||||
bot_id=self.bot_id,
|
||||
user_id=user_id,
|
||||
additional_messages=additional_messages,
|
||||
conversation_id=conversation_id,
|
||||
timeout=self.chat_timeout,
|
||||
auto_save_history=self.auto_save_history,
|
||||
stream=True
|
||||
):
|
||||
self.ap.logger.debug(f'coze-chat-stream: {chunk}')
|
||||
|
||||
event_type = chunk.get('event')
|
||||
data = chunk.get('data', {})
|
||||
|
||||
if event_type == 'conversation.message.delta':
|
||||
# 收集内容
|
||||
if 'content' in data:
|
||||
full_content += data.get('content', '')
|
||||
|
||||
# 收集推理内容(如果有)
|
||||
if 'reasoning_content' in data:
|
||||
full_reasoning += data.get('reasoning_content', '')
|
||||
|
||||
elif event_type == 'done':
|
||||
# 保存会话ID
|
||||
if 'conversation_id' in data:
|
||||
conversation_id = data.get('conversation_id')
|
||||
|
||||
elif event_type == 'error':
|
||||
# 处理错误
|
||||
error_msg = f"Coze API错误: {data.get('message', '未知错误')}"
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=error_msg,
|
||||
)
|
||||
return
|
||||
|
||||
# 处理思维链内容
|
||||
content, thinking_content = self._process_thinking_content(full_content)
|
||||
if full_reasoning:
|
||||
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
|
||||
if not remove_think:
|
||||
content = f'<think>\n{full_reasoning}\n</think>\n{content}'.strip()
|
||||
|
||||
# 一次性返回完整内容
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=content,
|
||||
)
|
||||
|
||||
# 保存会话ID
|
||||
if conversation_id and query.session.using_conversation:
|
||||
query.session.using_conversation.uuid = conversation_id
|
||||
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Coze API错误: {str(e)}')
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=f'Coze API调用失败: {str(e)}',
|
||||
)
|
||||
|
||||
|
||||
async def _chat_messages_chunk(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""调用聊天助手(流式)"""
|
||||
user_id = f'{query.launcher_id}_{query.sender_id}'
|
||||
|
||||
# 预处理用户消息
|
||||
additional_messages = await self._preprocess_user_message(query)
|
||||
|
||||
# 获取会话ID
|
||||
conversation_id = None
|
||||
|
||||
start_reasoning = False
|
||||
stop_reasoning = False
|
||||
message_idx = 1
|
||||
is_final = False
|
||||
full_content = ''
|
||||
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
|
||||
|
||||
|
||||
|
||||
try:
|
||||
# 调用Coze API流式接口
|
||||
async for chunk in self.coze.chat_messages(
|
||||
bot_id=self.bot_id,
|
||||
user_id=user_id,
|
||||
additional_messages=additional_messages,
|
||||
conversation_id=conversation_id,
|
||||
timeout=self.chat_timeout,
|
||||
auto_save_history=self.auto_save_history,
|
||||
stream=True
|
||||
):
|
||||
self.ap.logger.debug(f'coze-chat-stream-chunk: {chunk}')
|
||||
|
||||
event_type = chunk.get('event')
|
||||
data = chunk.get('data', {})
|
||||
content = ""
|
||||
|
||||
if event_type == 'conversation.message.delta':
|
||||
message_idx += 1
|
||||
# 处理内容增量
|
||||
if "reasoning_content" in data and not remove_think:
|
||||
|
||||
reasoning_content = data.get('reasoning_content', '')
|
||||
if reasoning_content and not start_reasoning:
|
||||
content = f"<think/>\n"
|
||||
start_reasoning = True
|
||||
content += reasoning_content
|
||||
|
||||
if 'content' in data:
|
||||
if data.get('content', ''):
|
||||
content += data.get('content', '')
|
||||
if not stop_reasoning and start_reasoning:
|
||||
content = f"</think>\n{content}"
|
||||
stop_reasoning = True
|
||||
|
||||
|
||||
elif event_type == 'done':
|
||||
# 保存会话ID
|
||||
if 'conversation_id' in data:
|
||||
conversation_id = data.get('conversation_id')
|
||||
if query.session.using_conversation:
|
||||
query.session.using_conversation.uuid = conversation_id
|
||||
is_final = True
|
||||
|
||||
|
||||
elif event_type == 'error':
|
||||
# 处理错误
|
||||
error_msg = f"Coze API错误: {data.get('message', '未知错误')}"
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=error_msg,
|
||||
finish_reason='error'
|
||||
)
|
||||
return
|
||||
full_content += content
|
||||
if message_idx % 8 == 0 or is_final:
|
||||
if full_content:
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=full_content,
|
||||
is_final=is_final
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Coze API流式调用错误: {str(e)}')
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=f'Coze API流式调用失败: {str(e)}',
|
||||
finish_reason='error'
|
||||
)
|
||||
|
||||
|
||||
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""运行"""
|
||||
msg_seq = 0
|
||||
if await query.adapter.is_stream_output_supported():
|
||||
async for msg in self._chat_messages_chunk(query):
|
||||
if isinstance(msg, provider_message.MessageChunk):
|
||||
msg_seq += 1
|
||||
msg.msg_sequence = msg_seq
|
||||
yield msg
|
||||
else:
|
||||
async for msg in self._chat_messages(query):
|
||||
yield msg
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
semantic_version = 'v4.3.5'
|
||||
semantic_version = 'v4.3.9'
|
||||
|
||||
required_database_version = 8
|
||||
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "langbot"
|
||||
version = "4.3.5"
|
||||
version = "4.3.9"
|
||||
description = "Easy-to-use global IM bot platform designed for LLM era"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10.1,<4.0"
|
||||
@@ -62,7 +62,7 @@ dependencies = [
|
||||
"langchain>=0.2.0",
|
||||
"chromadb>=0.4.24",
|
||||
"qdrant-client (>=1.15.1,<2.0.0)",
|
||||
"langbot-plugin==0.1.3",
|
||||
"langbot-plugin==0.1.4",
|
||||
"asyncpg>=0.30.0",
|
||||
"line-bot-sdk>=3.19.0",
|
||||
"tboxsdk>=0.0.10",
|
||||
|
||||
@@ -41,4 +41,4 @@ plugin:
|
||||
enable: true
|
||||
runtime_ws_url: 'ws://langbot_plugin_runtime:5400/control/ws'
|
||||
enable_marketplace: true
|
||||
cloud_service_url: 'https://space.langbot.app'
|
||||
cloud_service_url: 'https://space.langbot.app'
|
||||
|
||||
@@ -43,6 +43,10 @@ stages:
|
||||
label:
|
||||
en_US: Langflow API
|
||||
zh_Hans: Langflow API
|
||||
- name: coze-api
|
||||
label:
|
||||
en_US: Coze API
|
||||
zh_Hans: 扣子 API
|
||||
- name: local-agent
|
||||
label:
|
||||
en_US: Local Agent
|
||||
@@ -380,4 +384,57 @@ stages:
|
||||
zh_Hans: 可选的流程调整参数
|
||||
type: json
|
||||
required: false
|
||||
default: '{}'
|
||||
default: '{}'
|
||||
- name: coze-api
|
||||
label:
|
||||
en_US: coze API
|
||||
zh_Hans: 扣子 API
|
||||
description:
|
||||
en_US: Configure the Coze API of the pipeline
|
||||
zh_Hans: 配置Coze API
|
||||
config:
|
||||
- name: api-key
|
||||
label:
|
||||
en_US: API Key
|
||||
zh_Hans: API 密钥
|
||||
description:
|
||||
en_US: The API key for the Coze server
|
||||
zh_Hans: Coze服务器的 API 密钥
|
||||
type: string
|
||||
required: true
|
||||
- name: bot-id
|
||||
label:
|
||||
en_US: Bot ID
|
||||
zh_Hans: 机器人 ID
|
||||
description:
|
||||
en_US: The ID of the bot to run
|
||||
zh_Hans: 要运行的机器人 ID
|
||||
type: string
|
||||
required: true
|
||||
- name: api-base
|
||||
label:
|
||||
en_US: API Base URL
|
||||
zh_Hans: API 基础 URL
|
||||
description:
|
||||
en_US: The base URL for the Coze API, please use https://api.coze.com for global Coze edition(coze.com).
|
||||
zh_Hans: Coze API 的基础 URL,请使用 https://api.coze.com 用于全球 Coze 版(coze.com)
|
||||
type: string
|
||||
default: "https://api.coze.cn"
|
||||
- name: auto-save-history
|
||||
label:
|
||||
en_US: Auto Save History
|
||||
zh_Hans: 自动保存历史
|
||||
description:
|
||||
en_US: Whether to automatically save conversation history
|
||||
zh_Hans: 是否自动保存对话历史
|
||||
type: boolean
|
||||
default: true
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Request Timeout
|
||||
zh_Hans: 请求超时
|
||||
description:
|
||||
en_US: Timeout in seconds for API requests
|
||||
zh_Hans: API 请求超时时间(秒)
|
||||
type: number
|
||||
default: 120
|
||||
@@ -11,6 +11,10 @@ import { Message } from '@/app/infra/entities/message';
|
||||
import { toast } from 'sonner';
|
||||
import AtBadge from './AtBadge';
|
||||
import { Switch } from '@/components/ui/switch';
|
||||
import {
|
||||
PipelineWebSocketClient,
|
||||
SessionType,
|
||||
} from '@/app/infra/websocket/PipelineWebSocketClient';
|
||||
|
||||
interface MessageComponent {
|
||||
type: 'At' | 'Plain';
|
||||
@@ -31,17 +35,27 @@ export default function DebugDialog({
|
||||
}: DebugDialogProps) {
|
||||
const { t } = useTranslation();
|
||||
const [selectedPipelineId, setSelectedPipelineId] = useState(pipelineId);
|
||||
const [sessionType, setSessionType] = useState<'person' | 'group'>('person');
|
||||
const [sessionType, setSessionType] = useState<SessionType>('person');
|
||||
const [messages, setMessages] = useState<Message[]>([]);
|
||||
const [inputValue, setInputValue] = useState('');
|
||||
const [showAtPopover, setShowAtPopover] = useState(false);
|
||||
const [hasAt, setHasAt] = useState(false);
|
||||
const [isHovering, setIsHovering] = useState(false);
|
||||
const [isStreaming, setIsStreaming] = useState(true);
|
||||
const messagesEndRef = useRef<HTMLDivElement>(null);
|
||||
const inputRef = useRef<HTMLInputElement>(null);
|
||||
const popoverRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// WebSocket states
|
||||
const [wsClient, setWsClient] = useState<PipelineWebSocketClient | null>(
|
||||
null,
|
||||
);
|
||||
const [connectionStatus, setConnectionStatus] = useState<
|
||||
'disconnected' | 'connecting' | 'connected'
|
||||
>('disconnected');
|
||||
const [pendingMessages, setPendingMessages] = useState<
|
||||
Map<string, Message>
|
||||
>(new Map());
|
||||
|
||||
const scrollToBottom = useCallback(() => {
|
||||
// 使用setTimeout确保在DOM更新后执行滚动
|
||||
setTimeout(() => {
|
||||
@@ -57,37 +71,161 @@ export default function DebugDialog({
|
||||
}, 0);
|
||||
}, []);
|
||||
|
||||
const loadMessages = useCallback(
|
||||
async (pipelineId: string) => {
|
||||
try {
|
||||
const response = await httpClient.getWebChatHistoryMessages(
|
||||
pipelineId,
|
||||
sessionType,
|
||||
);
|
||||
setMessages(response.messages);
|
||||
} catch (error) {
|
||||
console.error('Failed to load messages:', error);
|
||||
}
|
||||
},
|
||||
[sessionType],
|
||||
);
|
||||
// 在useEffect中监听messages变化时滚动
|
||||
// Scroll to bottom when messages change
|
||||
useEffect(() => {
|
||||
scrollToBottom();
|
||||
}, [messages, scrollToBottom]);
|
||||
|
||||
// WebSocket connection setup
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
setSelectedPipelineId(pipelineId);
|
||||
loadMessages(pipelineId);
|
||||
}
|
||||
}, [open, pipelineId]);
|
||||
if (!open) return;
|
||||
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
loadMessages(selectedPipelineId);
|
||||
}
|
||||
}, [sessionType, selectedPipelineId, open, loadMessages]);
|
||||
const client = new PipelineWebSocketClient(
|
||||
selectedPipelineId,
|
||||
sessionType,
|
||||
);
|
||||
|
||||
// Setup event handlers
|
||||
client.onConnected = (data) => {
|
||||
console.log('[DebugDialog] WebSocket connected:', data);
|
||||
setConnectionStatus('connected');
|
||||
// Load history messages after connection
|
||||
client.loadHistory();
|
||||
};
|
||||
|
||||
client.onHistory = (data) => {
|
||||
console.log('[DebugDialog] History loaded:', data?.messages.length);
|
||||
if (data) {
|
||||
setMessages(data.messages);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageSent = (data) => {
|
||||
console.log('[DebugDialog] Message sent confirmed:', data);
|
||||
if (data) {
|
||||
// Update client message ID to server message ID
|
||||
const clientMsgId = data.client_message_id;
|
||||
const serverMsgId = data.server_message_id;
|
||||
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === -1 && pendingMessages.has(clientMsgId)
|
||||
? { ...msg, id: serverMsgId }
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
|
||||
setPendingMessages((prev) => {
|
||||
const newMap = new Map(prev);
|
||||
newMap.delete(clientMsgId);
|
||||
return newMap;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageStart = (data) => {
|
||||
console.log('[DebugDialog] Message start:', data);
|
||||
if (data) {
|
||||
const placeholderMessage: Message = {
|
||||
id: data.message_id,
|
||||
role: 'assistant',
|
||||
content: '',
|
||||
message_chain: [],
|
||||
timestamp: data.timestamp,
|
||||
};
|
||||
setMessages((prev) => [...prev, placeholderMessage]);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageChunk = (data) => {
|
||||
if (data) {
|
||||
// Update streaming message (content is cumulative)
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === data.message_id
|
||||
? {
|
||||
...msg,
|
||||
content: data.content,
|
||||
message_chain: data.message_chain,
|
||||
}
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageComplete = (data) => {
|
||||
console.log('[DebugDialog] Message complete:', data);
|
||||
if (data) {
|
||||
// Mark message as complete
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === data.message_id
|
||||
? {
|
||||
...msg,
|
||||
content: data.final_content,
|
||||
message_chain: data.message_chain,
|
||||
}
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageError = (data) => {
|
||||
console.error('[DebugDialog] Message error:', data);
|
||||
if (data) {
|
||||
toast.error(`Message error: ${data.error}`);
|
||||
}
|
||||
};
|
||||
|
||||
client.onPluginMessage = (data) => {
|
||||
console.log('[DebugDialog] Plugin message:', data);
|
||||
if (data) {
|
||||
const pluginMessage: Message = {
|
||||
id: data.message_id,
|
||||
role: 'assistant',
|
||||
content: data.content,
|
||||
message_chain: data.message_chain,
|
||||
timestamp: data.timestamp,
|
||||
};
|
||||
setMessages((prev) => [...prev, pluginMessage]);
|
||||
}
|
||||
};
|
||||
|
||||
client.onError = (data) => {
|
||||
console.error('[DebugDialog] WebSocket error:', data);
|
||||
if (data) {
|
||||
toast.error(`WebSocket error: ${data.error}`);
|
||||
}
|
||||
};
|
||||
|
||||
client.onDisconnected = () => {
|
||||
console.log('[DebugDialog] WebSocket disconnected');
|
||||
setConnectionStatus('disconnected');
|
||||
};
|
||||
|
||||
// Connect to WebSocket
|
||||
setConnectionStatus('connecting');
|
||||
client
|
||||
.connect(httpClient.getSessionSync())
|
||||
.then(() => {
|
||||
console.log('[DebugDialog] WebSocket connection established');
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error('[DebugDialog] Failed to connect WebSocket:', err);
|
||||
toast.error('Failed to connect to server');
|
||||
setConnectionStatus('disconnected');
|
||||
});
|
||||
|
||||
setWsClient(client);
|
||||
|
||||
// Cleanup on unmount or session type change
|
||||
return () => {
|
||||
console.log('[DebugDialog] Cleaning up WebSocket connection');
|
||||
client.disconnect();
|
||||
};
|
||||
}, [open, selectedPipelineId, sessionType]); // Reconnect when session type changes
|
||||
|
||||
useEffect(() => {
|
||||
const handleClickOutside = (event: MouseEvent) => {
|
||||
@@ -150,6 +288,12 @@ export default function DebugDialog({
|
||||
const sendMessage = async () => {
|
||||
if (!inputValue.trim() && !hasAt) return;
|
||||
|
||||
// Check WebSocket connection
|
||||
if (!wsClient || connectionStatus !== 'connected') {
|
||||
toast.error('Not connected to server');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const messageChain = [];
|
||||
|
||||
@@ -170,7 +314,7 @@ export default function DebugDialog({
|
||||
});
|
||||
|
||||
if (hasAt) {
|
||||
// for showing
|
||||
// For display
|
||||
text_content = '@webchatbot' + text_content;
|
||||
}
|
||||
|
||||
@@ -181,97 +325,26 @@ export default function DebugDialog({
|
||||
timestamp: new Date().toISOString(),
|
||||
message_chain: messageChain,
|
||||
};
|
||||
// 根据isStreaming状态决定使用哪种传输方式
|
||||
if (isStreaming) {
|
||||
// streaming
|
||||
// 创建初始bot消息
|
||||
const placeholderRandomId = Math.floor(Math.random() * 1000000);
|
||||
const botMessagePlaceholder: Message = {
|
||||
id: placeholderRandomId,
|
||||
role: 'assistant',
|
||||
content: 'Generating...',
|
||||
timestamp: new Date().toISOString(),
|
||||
message_chain: [{ type: 'Plain', text: 'Generating...' }],
|
||||
};
|
||||
|
||||
// 添加用户消息和初始bot消息到状态
|
||||
// Add user message to UI immediately
|
||||
setMessages((prevMessages) => [...prevMessages, userMessage]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
|
||||
setMessages((prevMessages) => [
|
||||
...prevMessages,
|
||||
userMessage,
|
||||
botMessagePlaceholder,
|
||||
]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
try {
|
||||
await httpClient.sendStreamingWebChatMessage(
|
||||
sessionType,
|
||||
messageChain,
|
||||
selectedPipelineId,
|
||||
(data) => {
|
||||
// 处理流式响应数据
|
||||
console.log('data', data);
|
||||
if (data.message) {
|
||||
// 更新完整内容
|
||||
// Send via WebSocket
|
||||
const clientMessageId = wsClient.sendMessage(messageChain);
|
||||
|
||||
setMessages((prevMessages) => {
|
||||
const updatedMessages = [...prevMessages];
|
||||
const botMessageIndex = updatedMessages.findIndex(
|
||||
(message) => message.id === placeholderRandomId,
|
||||
);
|
||||
if (botMessageIndex !== -1) {
|
||||
updatedMessages[botMessageIndex] = {
|
||||
...updatedMessages[botMessageIndex],
|
||||
content: data.message.content,
|
||||
message_chain: [
|
||||
{ type: 'Plain', text: data.message.content },
|
||||
],
|
||||
};
|
||||
}
|
||||
return updatedMessages;
|
||||
});
|
||||
}
|
||||
},
|
||||
() => {},
|
||||
(error) => {
|
||||
// 处理错误
|
||||
console.error('Streaming error:', error);
|
||||
if (sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Failed to send streaming message:', error);
|
||||
if (sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// non-streaming
|
||||
setMessages((prevMessages) => [...prevMessages, userMessage]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
// Track pending message for ID mapping
|
||||
setPendingMessages((prev) => {
|
||||
const newMap = new Map(prev);
|
||||
newMap.set(clientMessageId, userMessage);
|
||||
return newMap;
|
||||
});
|
||||
|
||||
const response = await httpClient.sendWebChatMessage(
|
||||
sessionType,
|
||||
messageChain,
|
||||
selectedPipelineId,
|
||||
180000,
|
||||
);
|
||||
|
||||
setMessages((prevMessages) => [...prevMessages, response.message]);
|
||||
}
|
||||
} catch (
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
error: any
|
||||
) {
|
||||
console.log(error, 'type of error', typeof error);
|
||||
console.error('Failed to send message:', error);
|
||||
|
||||
if (!error.message.includes('timeout') && sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
console.log('[DebugDialog] Message sent:', clientMessageId);
|
||||
} catch (error) {
|
||||
console.error('[DebugDialog] Failed to send message:', error);
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
} finally {
|
||||
inputRef.current?.focus();
|
||||
}
|
||||
@@ -390,12 +463,6 @@ export default function DebugDialog({
|
||||
</ScrollArea>
|
||||
|
||||
<div className="p-4 pb-0 bg-white dark:bg-black flex gap-2">
|
||||
<div className="flex items-center gap-2">
|
||||
<span className="text-sm text-gray-600">
|
||||
{t('pipelines.debugDialog.streaming')}
|
||||
</span>
|
||||
<Switch checked={isStreaming} onCheckedChange={setIsStreaming} />
|
||||
</div>
|
||||
<div className="flex-1 flex items-center gap-2">
|
||||
{hasAt && (
|
||||
<AtBadge targetName="webchatbot" onRemove={handleAtRemove} />
|
||||
|
||||
@@ -29,7 +29,17 @@ export default function PluginConfigPage() {
|
||||
const [sortOrderValue, setSortOrderValue] = useState<string>('DESC');
|
||||
|
||||
useEffect(() => {
|
||||
getPipelines();
|
||||
// Load sort preference from localStorage
|
||||
const savedSortBy = localStorage.getItem('pipeline_sort_by');
|
||||
const savedSortOrder = localStorage.getItem('pipeline_sort_order');
|
||||
|
||||
if (savedSortBy && savedSortOrder) {
|
||||
setSortByValue(savedSortBy);
|
||||
setSortOrderValue(savedSortOrder);
|
||||
getPipelines(savedSortBy, savedSortOrder);
|
||||
} else {
|
||||
getPipelines();
|
||||
}
|
||||
}, []);
|
||||
|
||||
function getPipelines(
|
||||
@@ -91,6 +101,11 @@ export default function PluginConfigPage() {
|
||||
const [newSortBy, newSortOrder] = value.split(',').map((s) => s.trim());
|
||||
setSortByValue(newSortBy);
|
||||
setSortOrderValue(newSortOrder);
|
||||
|
||||
// Save sort preference to localStorage
|
||||
localStorage.setItem('pipeline_sort_by', newSortBy);
|
||||
localStorage.setItem('pipeline_sort_order', newSortOrder);
|
||||
|
||||
getPipelines(newSortBy, newSortOrder);
|
||||
}
|
||||
|
||||
@@ -135,6 +150,12 @@ export default function PluginConfigPage() {
|
||||
>
|
||||
{t('pipelines.newestCreated')}
|
||||
</SelectItem>
|
||||
<SelectItem
|
||||
value="created_at,ASC"
|
||||
className="text-gray-900 dark:text-gray-100"
|
||||
>
|
||||
{t('pipelines.earliestCreated')}
|
||||
</SelectItem>
|
||||
<SelectItem
|
||||
value="updated_at,DESC"
|
||||
className="text-gray-900 dark:text-gray-100"
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
import { PluginComponent } from '@/app/infra/entities/plugin';
|
||||
import { TFunction } from 'i18next';
|
||||
import { Wrench, AudioWaveform, Hash } from 'lucide-react';
|
||||
import { Badge } from '@/components/ui/badge';
|
||||
|
||||
export default function PluginComponentList({
|
||||
components,
|
||||
showComponentName,
|
||||
showTitle,
|
||||
useBadge,
|
||||
t,
|
||||
}: {
|
||||
components: PluginComponent[];
|
||||
showComponentName: boolean;
|
||||
showTitle: boolean;
|
||||
useBadge: boolean;
|
||||
t: TFunction;
|
||||
}) {
|
||||
const componentKindCount: Record<string, number> = {};
|
||||
|
||||
for (const component of components) {
|
||||
const kind = component.manifest.manifest.kind;
|
||||
if (componentKindCount[kind]) {
|
||||
componentKindCount[kind]++;
|
||||
} else {
|
||||
componentKindCount[kind] = 1;
|
||||
}
|
||||
}
|
||||
|
||||
const kindIconMap: Record<string, React.ReactNode> = {
|
||||
Tool: <Wrench className="w-5 h-5" />,
|
||||
EventListener: <AudioWaveform className="w-5 h-5" />,
|
||||
Command: <Hash className="w-5 h-5" />,
|
||||
};
|
||||
|
||||
const componentKindList = Object.keys(componentKindCount);
|
||||
|
||||
return (
|
||||
<>
|
||||
{showTitle && <div>{t('plugins.componentsList')}</div>}
|
||||
{componentKindList.length > 0 && (
|
||||
<>
|
||||
{componentKindList.map((kind) => {
|
||||
return (
|
||||
<>
|
||||
{useBadge && (
|
||||
<Badge variant="outline">
|
||||
{kindIconMap[kind]}
|
||||
{showComponentName &&
|
||||
t('plugins.componentName.' + kind) + ' '}
|
||||
{componentKindCount[kind]}
|
||||
</Badge>
|
||||
)}
|
||||
|
||||
{!useBadge && (
|
||||
<div
|
||||
key={kind}
|
||||
className="flex flex-row items-center justify-start gap-[0.2rem]"
|
||||
>
|
||||
{kindIconMap[kind]}
|
||||
{showComponentName &&
|
||||
t('plugins.componentName.' + kind) + ' '}
|
||||
{componentKindCount[kind]}
|
||||
</div>
|
||||
)}
|
||||
</>
|
||||
);
|
||||
})}
|
||||
</>
|
||||
)}
|
||||
|
||||
{componentKindList.length === 0 && <div>{t('plugins.noComponents')}</div>}
|
||||
</>
|
||||
);
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
'use client';
|
||||
|
||||
import { useState, useEffect, forwardRef, useImperativeHandle } from 'react';
|
||||
import { PluginCardVO } from '@/app/home/plugins/plugin-installed/PluginCardVO';
|
||||
import PluginCardComponent from '@/app/home/plugins/plugin-installed/plugin-card/PluginCardComponent';
|
||||
import PluginForm from '@/app/home/plugins/plugin-installed/plugin-form/PluginForm';
|
||||
import { PluginCardVO } from '@/app/home/plugins/components/plugin-installed/PluginCardVO';
|
||||
import PluginCardComponent from '@/app/home/plugins/components/plugin-installed/plugin-card/PluginCardComponent';
|
||||
import PluginForm from '@/app/home/plugins/components/plugin-installed/plugin-form/PluginForm';
|
||||
import styles from '@/app/home/plugins/plugins.module.css';
|
||||
import { httpClient } from '@/app/infra/http/HttpClient';
|
||||
import {
|
||||
@@ -1,21 +1,10 @@
|
||||
import { PluginCardVO } from '@/app/home/plugins/plugin-installed/PluginCardVO';
|
||||
import { PluginCardVO } from '@/app/home/plugins/components/plugin-installed/PluginCardVO';
|
||||
import { useState } from 'react';
|
||||
import { Badge } from '@/components/ui/badge';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { TFunction } from 'i18next';
|
||||
import {
|
||||
AudioWaveform,
|
||||
Wrench,
|
||||
Hash,
|
||||
BugIcon,
|
||||
ExternalLink,
|
||||
Ellipsis,
|
||||
Trash,
|
||||
ArrowUp,
|
||||
} from 'lucide-react';
|
||||
import { BugIcon, ExternalLink, Ellipsis, Trash, ArrowUp } from 'lucide-react';
|
||||
import { getCloudServiceClientSync } from '@/app/infra/http';
|
||||
import { httpClient } from '@/app/infra/http/HttpClient';
|
||||
import { PluginComponent } from '@/app/infra/entities/plugin';
|
||||
import { Button } from '@/components/ui/button';
|
||||
import {
|
||||
DropdownMenu,
|
||||
@@ -23,49 +12,7 @@ import {
|
||||
DropdownMenuItem,
|
||||
DropdownMenuTrigger,
|
||||
} from '@/components/ui/dropdown-menu';
|
||||
|
||||
function getComponentList(components: PluginComponent[], t: TFunction) {
|
||||
const componentKindCount: Record<string, number> = {};
|
||||
|
||||
for (const component of components) {
|
||||
const kind = component.manifest.manifest.kind;
|
||||
if (componentKindCount[kind]) {
|
||||
componentKindCount[kind]++;
|
||||
} else {
|
||||
componentKindCount[kind] = 1;
|
||||
}
|
||||
}
|
||||
|
||||
const kindIconMap: Record<string, React.ReactNode> = {
|
||||
Tool: <Wrench className="w-5 h-5" />,
|
||||
EventListener: <AudioWaveform className="w-5 h-5" />,
|
||||
Command: <Hash className="w-5 h-5" />,
|
||||
};
|
||||
|
||||
const componentKindList = Object.keys(componentKindCount);
|
||||
|
||||
return (
|
||||
<>
|
||||
<div>{t('plugins.componentsList')}</div>
|
||||
{componentKindList.length > 0 && (
|
||||
<>
|
||||
{componentKindList.map((kind) => {
|
||||
return (
|
||||
<div
|
||||
key={kind}
|
||||
className="flex flex-row items-center justify-start gap-[0.4rem]"
|
||||
>
|
||||
{kindIconMap[kind]} {componentKindCount[kind]}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</>
|
||||
)}
|
||||
|
||||
{componentKindList.length === 0 && <div>{t('plugins.noComponents')}</div>}
|
||||
</>
|
||||
);
|
||||
}
|
||||
import PluginComponentList from '@/app/home/plugins/components/plugin-installed/PluginComponentList';
|
||||
|
||||
export default function PluginCardComponent({
|
||||
cardVO,
|
||||
@@ -180,7 +127,13 @@ export default function PluginCardComponent({
|
||||
</div>
|
||||
|
||||
<div className="w-full flex flex-row items-start justify-start gap-[0.6rem]">
|
||||
{getComponentList(cardVO.components, t)}
|
||||
<PluginComponentList
|
||||
components={cardVO.components}
|
||||
showComponentName={false}
|
||||
showTitle={true}
|
||||
useBadge={false}
|
||||
t={t}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -7,6 +7,7 @@ import { Button } from '@/components/ui/button';
|
||||
import { toast } from 'sonner';
|
||||
import { extractI18nObject } from '@/i18n/I18nProvider';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import PluginComponentList from '@/app/home/plugins/components/plugin-installed/PluginComponentList';
|
||||
|
||||
export default function PluginForm({
|
||||
pluginAuthor,
|
||||
@@ -78,6 +79,17 @@ export default function PluginForm({
|
||||
},
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="mb-4 flex flex-row items-center justify-start gap-[0.4rem]">
|
||||
<PluginComponentList
|
||||
components={pluginInfo.components}
|
||||
showComponentName={true}
|
||||
showTitle={false}
|
||||
useBadge={true}
|
||||
t={t}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{pluginInfo.manifest.manifest.spec.config.length > 0 && (
|
||||
<DynamicFormComponent
|
||||
itemConfigList={pluginInfo.manifest.manifest.spec.config}
|
||||
@@ -1,8 +1,8 @@
|
||||
'use client';
|
||||
import PluginInstalledComponent, {
|
||||
PluginInstalledComponentRef,
|
||||
} from '@/app/home/plugins/plugin-installed/PluginInstalledComponent';
|
||||
import MarketPage from '@/app/home/plugins/plugin-market/PluginMarketComponent';
|
||||
} from '@/app/home/plugins/components/plugin-installed/PluginInstalledComponent';
|
||||
import MarketPage from '@/app/home/plugins/components/plugin-market/PluginMarketComponent';
|
||||
// import PluginSortDialog from '@/app/home/plugins/plugin-sort/PluginSortDialog';
|
||||
import styles from './plugins.module.css';
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
|
||||
|
||||
394
web/src/app/infra/websocket/PipelineWebSocketClient.ts
Normal file
394
web/src/app/infra/websocket/PipelineWebSocketClient.ts
Normal file
@@ -0,0 +1,394 @@
|
||||
/**
|
||||
* Pipeline WebSocket Client
|
||||
*
|
||||
* Provides real-time bidirectional communication for pipeline debugging.
|
||||
* Supports person and group session isolation.
|
||||
*/
|
||||
|
||||
import { Message } from '@/app/infra/entities/message';
|
||||
|
||||
export type SessionType = 'person' | 'group';
|
||||
|
||||
export interface WebSocketEventData {
|
||||
// Connected event
|
||||
connected?: {
|
||||
connection_id: string;
|
||||
session_type: SessionType;
|
||||
pipeline_uuid: string;
|
||||
};
|
||||
|
||||
// History event
|
||||
history?: {
|
||||
messages: Message[];
|
||||
has_more: boolean;
|
||||
};
|
||||
|
||||
// Message sent confirmation
|
||||
message_sent?: {
|
||||
client_message_id: string;
|
||||
server_message_id: number;
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message start
|
||||
message_start?: {
|
||||
message_id: number;
|
||||
role: 'assistant';
|
||||
timestamp: string;
|
||||
reply_to: number;
|
||||
};
|
||||
|
||||
// Message chunk
|
||||
message_chunk?: {
|
||||
message_id: number;
|
||||
content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message complete
|
||||
message_complete?: {
|
||||
message_id: number;
|
||||
final_content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message error
|
||||
message_error?: {
|
||||
message_id: number;
|
||||
error: string;
|
||||
error_code?: string;
|
||||
};
|
||||
|
||||
// Interrupted
|
||||
interrupted?: {
|
||||
message_id: number;
|
||||
partial_content: string;
|
||||
};
|
||||
|
||||
// Plugin message
|
||||
plugin_message?: {
|
||||
message_id: number;
|
||||
role: 'assistant';
|
||||
content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
source: 'plugin';
|
||||
};
|
||||
|
||||
// Error
|
||||
error?: {
|
||||
error: string;
|
||||
error_code: string;
|
||||
details?: object;
|
||||
};
|
||||
|
||||
// Pong
|
||||
pong?: {
|
||||
timestamp: number;
|
||||
};
|
||||
}
|
||||
|
||||
export class PipelineWebSocketClient {
|
||||
private ws: WebSocket | null = null;
|
||||
private pipelineId: string;
|
||||
private sessionType: SessionType;
|
||||
private reconnectAttempts = 0;
|
||||
private maxReconnectAttempts = 5;
|
||||
private pingInterval: NodeJS.Timeout | null = null;
|
||||
private reconnectTimeout: NodeJS.Timeout | null = null;
|
||||
private isManualDisconnect = false;
|
||||
|
||||
// Event callbacks
|
||||
public onConnected?: (data: WebSocketEventData['connected']) => void;
|
||||
public onHistory?: (data: WebSocketEventData['history']) => void;
|
||||
public onMessageSent?: (data: WebSocketEventData['message_sent']) => void;
|
||||
public onMessageStart?: (data: WebSocketEventData['message_start']) => void;
|
||||
public onMessageChunk?: (data: WebSocketEventData['message_chunk']) => void;
|
||||
public onMessageComplete?: (
|
||||
data: WebSocketEventData['message_complete'],
|
||||
) => void;
|
||||
public onMessageError?: (data: WebSocketEventData['message_error']) => void;
|
||||
public onInterrupted?: (data: WebSocketEventData['interrupted']) => void;
|
||||
public onPluginMessage?: (data: WebSocketEventData['plugin_message']) => void;
|
||||
public onError?: (data: WebSocketEventData['error']) => void;
|
||||
public onDisconnected?: () => void;
|
||||
|
||||
constructor(pipelineId: string, sessionType: SessionType) {
|
||||
this.pipelineId = pipelineId;
|
||||
this.sessionType = sessionType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to WebSocket server
|
||||
*/
|
||||
connect(token: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.isManualDisconnect = false;
|
||||
|
||||
const wsUrl = this.buildWebSocketUrl();
|
||||
console.log(`[WebSocket] Connecting to ${wsUrl}...`);
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(wsUrl);
|
||||
} catch (error) {
|
||||
console.error('[WebSocket] Failed to create WebSocket:', error);
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
|
||||
this.ws.onopen = () => {
|
||||
console.log('[WebSocket] Connection opened');
|
||||
|
||||
// Send connect event with session type and token
|
||||
this.send('connect', {
|
||||
pipeline_uuid: this.pipelineId,
|
||||
session_type: this.sessionType,
|
||||
token,
|
||||
});
|
||||
|
||||
// Start ping interval
|
||||
this.startPing();
|
||||
|
||||
// Reset reconnect attempts on successful connection
|
||||
this.reconnectAttempts = 0;
|
||||
|
||||
resolve();
|
||||
};
|
||||
|
||||
this.ws.onmessage = (event) => {
|
||||
this.handleMessage(event);
|
||||
};
|
||||
|
||||
this.ws.onerror = (error) => {
|
||||
console.error('[WebSocket] Error:', error);
|
||||
reject(error);
|
||||
};
|
||||
|
||||
this.ws.onclose = (event) => {
|
||||
console.log(
|
||||
`[WebSocket] Connection closed: code=${event.code}, reason=${event.reason}`,
|
||||
);
|
||||
this.handleDisconnect();
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming WebSocket message
|
||||
*/
|
||||
private handleMessage(event: MessageEvent) {
|
||||
try {
|
||||
const message = JSON.parse(event.data);
|
||||
const { type, data } = message;
|
||||
|
||||
console.log(`[WebSocket] Received: ${type}`, data);
|
||||
|
||||
switch (type) {
|
||||
case 'connected':
|
||||
this.onConnected?.(data);
|
||||
break;
|
||||
case 'history':
|
||||
this.onHistory?.(data);
|
||||
break;
|
||||
case 'message_sent':
|
||||
this.onMessageSent?.(data);
|
||||
break;
|
||||
case 'message_start':
|
||||
this.onMessageStart?.(data);
|
||||
break;
|
||||
case 'message_chunk':
|
||||
this.onMessageChunk?.(data);
|
||||
break;
|
||||
case 'message_complete':
|
||||
this.onMessageComplete?.(data);
|
||||
break;
|
||||
case 'message_error':
|
||||
this.onMessageError?.(data);
|
||||
break;
|
||||
case 'interrupted':
|
||||
this.onInterrupted?.(data);
|
||||
break;
|
||||
case 'plugin_message':
|
||||
this.onPluginMessage?.(data);
|
||||
break;
|
||||
case 'error':
|
||||
this.onError?.(data);
|
||||
break;
|
||||
case 'pong':
|
||||
// Heartbeat response, no action needed
|
||||
break;
|
||||
default:
|
||||
console.warn(`[WebSocket] Unknown message type: ${type}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[WebSocket] Failed to parse message:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message to server
|
||||
*/
|
||||
sendMessage(messageChain: object[]): string {
|
||||
const clientMessageId = this.generateMessageId();
|
||||
this.send('send_message', {
|
||||
message_chain: messageChain,
|
||||
client_message_id: clientMessageId,
|
||||
});
|
||||
return clientMessageId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load history messages
|
||||
*/
|
||||
loadHistory(beforeMessageId?: number, limit?: number) {
|
||||
this.send('load_history', {
|
||||
before_message_id: beforeMessageId,
|
||||
limit,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Interrupt streaming message
|
||||
*/
|
||||
interrupt(messageId: number) {
|
||||
this.send('interrupt', { message_id: messageId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Send event to server
|
||||
*/
|
||||
private send(type: string, data: object) {
|
||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||
const message = JSON.stringify({ type, data });
|
||||
console.log(`[WebSocket] Sending: ${type}`, data);
|
||||
this.ws.send(message);
|
||||
} else {
|
||||
console.warn(
|
||||
`[WebSocket] Cannot send message, connection not open (state: ${this.ws?.readyState})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start ping interval (heartbeat)
|
||||
*/
|
||||
private startPing() {
|
||||
this.stopPing();
|
||||
this.pingInterval = setInterval(() => {
|
||||
this.send('ping', { timestamp: Date.now() });
|
||||
}, 30000); // Ping every 30 seconds
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop ping interval
|
||||
*/
|
||||
private stopPing() {
|
||||
if (this.pingInterval) {
|
||||
clearInterval(this.pingInterval);
|
||||
this.pingInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle disconnection
|
||||
*/
|
||||
private handleDisconnect() {
|
||||
this.stopPing();
|
||||
this.onDisconnected?.();
|
||||
|
||||
// Auto reconnect if not manual disconnect
|
||||
if (
|
||||
!this.isManualDisconnect &&
|
||||
this.reconnectAttempts < this.maxReconnectAttempts
|
||||
) {
|
||||
const delay = Math.min(2000 * Math.pow(2, this.reconnectAttempts), 30000);
|
||||
console.log(
|
||||
`[WebSocket] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`,
|
||||
);
|
||||
|
||||
this.reconnectTimeout = setTimeout(() => {
|
||||
this.reconnectAttempts++;
|
||||
// Note: Need to get token again, should be handled by caller
|
||||
console.warn(
|
||||
'[WebSocket] Auto-reconnect requires token, please reconnect manually',
|
||||
);
|
||||
}, delay);
|
||||
} else if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
console.error(
|
||||
'[WebSocket] Max reconnect attempts reached, giving up',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from server
|
||||
*/
|
||||
disconnect() {
|
||||
this.isManualDisconnect = true;
|
||||
this.stopPing();
|
||||
|
||||
if (this.reconnectTimeout) {
|
||||
clearTimeout(this.reconnectTimeout);
|
||||
this.reconnectTimeout = null;
|
||||
}
|
||||
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, 'Client disconnect');
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
console.log('[WebSocket] Disconnected');
|
||||
}
|
||||
|
||||
/**
|
||||
* Build WebSocket URL
|
||||
*/
|
||||
private buildWebSocketUrl(): string {
|
||||
// Get current base URL
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
const host = window.location.host;
|
||||
|
||||
return `${protocol}//${host}/api/v1/pipelines/${this.pipelineId}/chat/ws`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate unique client message ID
|
||||
*/
|
||||
private generateMessageId(): string {
|
||||
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection state
|
||||
*/
|
||||
getState():
|
||||
| 'CONNECTING'
|
||||
| 'OPEN'
|
||||
| 'CLOSING'
|
||||
| 'CLOSED'
|
||||
| 'DISCONNECTED' {
|
||||
if (!this.ws) return 'DISCONNECTED';
|
||||
|
||||
switch (this.ws.readyState) {
|
||||
case WebSocket.CONNECTING:
|
||||
return 'CONNECTING';
|
||||
case WebSocket.OPEN:
|
||||
return 'OPEN';
|
||||
case WebSocket.CLOSING:
|
||||
return 'CLOSING';
|
||||
case WebSocket.CLOSED:
|
||||
return 'CLOSED';
|
||||
default:
|
||||
return 'DISCONNECTED';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
*/
|
||||
isConnected(): boolean {
|
||||
return this.ws?.readyState === WebSocket.OPEN;
|
||||
}
|
||||
}
|
||||
@@ -206,9 +206,11 @@ const enUS = {
|
||||
deleteConfirm: 'Delete Confirmation',
|
||||
deleteSuccess: 'Delete successful',
|
||||
modifyFailed: 'Modify failed: ',
|
||||
eventCount: 'Events: {{count}}',
|
||||
toolCount: 'Tools: {{count}}',
|
||||
starCount: 'Stars: {{count}}',
|
||||
componentName: {
|
||||
Tool: 'Tool',
|
||||
EventListener: 'Event Listener',
|
||||
Command: 'Command',
|
||||
},
|
||||
uploadLocal: 'Upload Local',
|
||||
debugging: 'Debugging',
|
||||
uploadLocalPlugin: 'Upload Local Plugin',
|
||||
@@ -298,6 +300,7 @@ const enUS = {
|
||||
defaultBadge: 'Default',
|
||||
sortBy: 'Sort by',
|
||||
newestCreated: 'Newest Created',
|
||||
earliestCreated: 'Earliest Created',
|
||||
recentlyEdited: 'Recently Edited',
|
||||
earliestEdited: 'Earliest Edited',
|
||||
basicInfo: 'Basic',
|
||||
|
||||
@@ -207,9 +207,11 @@ const jaJP = {
|
||||
deleteConfirm: '削除の確認',
|
||||
deleteSuccess: '削除に成功しました',
|
||||
modifyFailed: '変更に失敗しました:',
|
||||
eventCount: 'イベント:{{count}}',
|
||||
toolCount: 'ツール:{{count}}',
|
||||
starCount: 'スター:{{count}}',
|
||||
componentName: {
|
||||
Tool: 'ツール',
|
||||
EventListener: 'イベント監視器',
|
||||
Command: 'コマンド',
|
||||
},
|
||||
uploadLocal: 'ローカルアップロード',
|
||||
debugging: 'デバッグ中',
|
||||
uploadLocalPlugin: 'ローカルプラグインのアップロード',
|
||||
@@ -300,6 +302,7 @@ const jaJP = {
|
||||
defaultBadge: 'デフォルト',
|
||||
sortBy: '並び順',
|
||||
newestCreated: '最新作成',
|
||||
earliestCreated: '最古作成',
|
||||
recentlyEdited: '最近編集',
|
||||
earliestEdited: '最古編集',
|
||||
basicInfo: '基本情報',
|
||||
|
||||
@@ -199,9 +199,11 @@ const zhHans = {
|
||||
deleteConfirm: '删除确认',
|
||||
deleteSuccess: '删除成功',
|
||||
modifyFailed: '修改失败:',
|
||||
eventCount: '事件:{{count}}',
|
||||
toolCount: '工具:{{count}}',
|
||||
starCount: '星标:{{count}}',
|
||||
componentName: {
|
||||
Tool: '工具',
|
||||
EventListener: '事件监听器',
|
||||
Command: '命令',
|
||||
},
|
||||
uploadLocal: '本地上传',
|
||||
debugging: '调试中',
|
||||
uploadLocalPlugin: '上传本地插件',
|
||||
@@ -285,6 +287,7 @@ const zhHans = {
|
||||
defaultBadge: '默认',
|
||||
sortBy: '排序方式',
|
||||
newestCreated: '最新创建',
|
||||
earliestCreated: '最早创建',
|
||||
recentlyEdited: '最近编辑',
|
||||
earliestEdited: '最早编辑',
|
||||
basicInfo: '基础信息',
|
||||
|
||||
@@ -197,9 +197,11 @@ const zhHant = {
|
||||
close: '關閉',
|
||||
deleteConfirm: '刪除確認',
|
||||
modifyFailed: '修改失敗:',
|
||||
eventCount: '事件:{{count}}',
|
||||
toolCount: '工具:{{count}}',
|
||||
starCount: '星標:{{count}}',
|
||||
componentName: {
|
||||
Tool: '工具',
|
||||
EventListener: '事件監聽器',
|
||||
Command: '命令',
|
||||
},
|
||||
uploadLocal: '本地上傳',
|
||||
debugging: '調試中',
|
||||
uploadLocalPlugin: '上傳本地插件',
|
||||
@@ -283,6 +285,7 @@ const zhHant = {
|
||||
defaultBadge: '預設',
|
||||
sortBy: '排序方式',
|
||||
newestCreated: '最新建立',
|
||||
earliestCreated: '最早建立',
|
||||
recentlyEdited: '最近編輯',
|
||||
earliestEdited: '最早編輯',
|
||||
basicInfo: '基本資訊',
|
||||
|
||||
Reference in New Issue
Block a user