mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-25 03:15:06 +08:00
Compare commits
1 Commits
763c1a885c
...
feature/we
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4995c8cd9 |
229
pkg/api/http/controller/groups/pipelines/websocket.py
Normal file
229
pkg/api/http/controller/groups/pipelines/websocket.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""流水线调试 WebSocket 路由
|
||||
|
||||
提供基于 WebSocket 的实时双向通信,用于流水线调试。
|
||||
支持 person 和 group 两种会话类型的隔离。
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import quart
|
||||
|
||||
from ... import group
|
||||
from ....service.websocket_pool import WebSocketConnection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_client_event(connection: WebSocketConnection, message: dict, ap):
|
||||
"""处理客户端发送的事件
|
||||
|
||||
Args:
|
||||
connection: WebSocket 连接对象
|
||||
message: 客户端消息 {'type': 'xxx', 'data': {...}}
|
||||
ap: Application 实例
|
||||
"""
|
||||
event_type = message.get('type')
|
||||
data = message.get('data', {})
|
||||
|
||||
pipeline_uuid = connection.pipeline_uuid
|
||||
session_type = connection.session_type
|
||||
|
||||
try:
|
||||
webchat_adapter = ap.platform_mgr.webchat_proxy_bot.adapter
|
||||
|
||||
if event_type == 'send_message':
|
||||
# 发送消息到指定会话
|
||||
message_chain_obj = data.get('message_chain', [])
|
||||
client_message_id = data.get('client_message_id')
|
||||
|
||||
if not message_chain_obj:
|
||||
await connection.send('error', {'error': 'message_chain is required', 'error_code': 'INVALID_REQUEST'})
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Received send_message: pipeline={pipeline_uuid}, "
|
||||
f"session={session_type}, "
|
||||
f"client_msg_id={client_message_id}"
|
||||
)
|
||||
|
||||
# 调用 webchat_adapter.send_webchat_message
|
||||
# 消息将通过 reply_message_chunk 自动推送到 WebSocket
|
||||
result = None
|
||||
async for msg in webchat_adapter.send_webchat_message(
|
||||
pipeline_uuid=pipeline_uuid, session_type=session_type, message_chain_obj=message_chain_obj, is_stream=True
|
||||
):
|
||||
result = msg
|
||||
|
||||
# 发送确认
|
||||
if result:
|
||||
await connection.send(
|
||||
'message_sent',
|
||||
{
|
||||
'client_message_id': client_message_id,
|
||||
'server_message_id': result.get('id'),
|
||||
'timestamp': result.get('timestamp'),
|
||||
},
|
||||
)
|
||||
|
||||
elif event_type == 'load_history':
|
||||
# 加载指定会话的历史消息
|
||||
before_message_id = data.get('before_message_id')
|
||||
limit = data.get('limit', 50)
|
||||
|
||||
logger.info(f"Loading history: pipeline={pipeline_uuid}, session={session_type}, limit={limit}")
|
||||
|
||||
# 从对应会话获取历史消息
|
||||
messages = webchat_adapter.get_webchat_messages(pipeline_uuid, session_type)
|
||||
|
||||
# 简单分页:返回最后 limit 条
|
||||
if before_message_id:
|
||||
# TODO: 实现基于 message_id 的分页
|
||||
history_messages = messages[-limit:]
|
||||
else:
|
||||
history_messages = messages[-limit:] if len(messages) > limit else messages
|
||||
|
||||
await connection.send(
|
||||
'history', {'messages': history_messages, 'has_more': len(messages) > len(history_messages)}
|
||||
)
|
||||
|
||||
elif event_type == 'interrupt':
|
||||
# 中断消息
|
||||
message_id = data.get('message_id')
|
||||
logger.info(f"Interrupt requested: message_id={message_id}")
|
||||
|
||||
# TODO: 实现中断逻辑
|
||||
await connection.send('interrupted', {'message_id': message_id, 'partial_content': ''})
|
||||
|
||||
elif event_type == 'ping':
|
||||
# 心跳
|
||||
connection.last_ping = datetime.now()
|
||||
await connection.send('pong', {'timestamp': data.get('timestamp')})
|
||||
|
||||
else:
|
||||
logger.warning(f"Unknown event type: {event_type}")
|
||||
await connection.send('error', {'error': f'Unknown event type: {event_type}', 'error_code': 'UNKNOWN_EVENT'})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling event {event_type}: {e}", exc_info=True)
|
||||
await connection.send(
|
||||
'error',
|
||||
{'error': f'Internal server error: {str(e)}', 'error_code': 'INTERNAL_ERROR', 'details': {'event_type': event_type}},
|
||||
)
|
||||
|
||||
|
||||
@group.group_class('pipeline-websocket', '/api/v1/pipelines/<pipeline_uuid>/chat')
|
||||
class PipelineWebSocketRouterGroup(group.RouterGroup):
|
||||
"""流水线调试 WebSocket 路由组"""
|
||||
|
||||
async def initialize(self) -> None:
|
||||
@self.route('/ws')
|
||||
async def websocket_handler(pipeline_uuid: str):
|
||||
"""WebSocket 连接处理 - 会话隔离
|
||||
|
||||
连接流程:
|
||||
1. 客户端建立 WebSocket 连接
|
||||
2. 客户端发送 connect 事件(携带 session_type 和 token)
|
||||
3. 服务端验证并创建连接对象
|
||||
4. 进入消息循环,处理客户端事件
|
||||
5. 断开时清理连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
"""
|
||||
websocket = quart.websocket._get_current_object()
|
||||
connection_id = str(uuid.uuid4())
|
||||
session_key = None
|
||||
connection = None
|
||||
|
||||
try:
|
||||
# 1. 等待客户端发送 connect 事件
|
||||
first_message = await websocket.receive_json()
|
||||
|
||||
if first_message.get('type') != 'connect':
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'First message must be connect event', 'error_code': 'INVALID_HANDSHAKE'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
connect_data = first_message.get('data', {})
|
||||
session_type = connect_data.get('session_type')
|
||||
token = connect_data.get('token')
|
||||
|
||||
# 验证参数
|
||||
if session_type not in ['person', 'group']:
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'session_type must be person or group', 'error_code': 'INVALID_SESSION_TYPE'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 验证 token
|
||||
if not token:
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'token is required', 'error_code': 'MISSING_TOKEN'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 验证用户身份
|
||||
try:
|
||||
user = await self.ap.user_service.verify_token(token)
|
||||
if not user:
|
||||
await websocket.send_json({'type': 'error', 'data': {'error': 'Unauthorized', 'error_code': 'UNAUTHORIZED'}})
|
||||
await websocket.close(1008)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Token verification failed: {e}")
|
||||
await websocket.send_json(
|
||||
{'type': 'error', 'data': {'error': 'Token verification failed', 'error_code': 'AUTH_ERROR'}}
|
||||
)
|
||||
await websocket.close(1008)
|
||||
return
|
||||
|
||||
# 2. 创建连接对象并加入连接池
|
||||
connection = WebSocketConnection(
|
||||
connection_id=connection_id,
|
||||
websocket=websocket,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
session_type=session_type,
|
||||
created_at=datetime.now(),
|
||||
last_ping=datetime.now(),
|
||||
)
|
||||
|
||||
session_key = connection.session_key
|
||||
ws_pool = self.ap.ws_pool
|
||||
ws_pool.add_connection(connection)
|
||||
|
||||
# 3. 发送连接成功事件
|
||||
await connection.send(
|
||||
'connected', {'connection_id': connection_id, 'session_type': session_type, 'pipeline_uuid': pipeline_uuid}
|
||||
)
|
||||
|
||||
logger.info(f"WebSocket connected: {connection_id} [pipeline={pipeline_uuid}, session={session_type}]")
|
||||
|
||||
# 4. 进入消息处理循环
|
||||
while True:
|
||||
try:
|
||||
message = await websocket.receive_json()
|
||||
await handle_client_event(connection, message, self.ap)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"WebSocket connection cancelled: {connection_id}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error receiving message from {connection_id}: {e}")
|
||||
break
|
||||
|
||||
except quart.exceptions.WebsocketDisconnected:
|
||||
logger.info(f"WebSocket disconnected: {connection_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket error for {connection_id}: {e}", exc_info=True)
|
||||
finally:
|
||||
# 清理连接
|
||||
if connection and session_key:
|
||||
ws_pool = self.ap.ws_pool
|
||||
await ws_pool.remove_connection(connection_id, session_key)
|
||||
logger.info(f"WebSocket connection cleaned up: {connection_id}")
|
||||
211
pkg/api/http/service/websocket_pool.py
Normal file
211
pkg/api/http/service/websocket_pool.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""WebSocket 连接池管理
|
||||
|
||||
用于管理流水线调试的 WebSocket 连接,支持会话隔离和消息广播。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import quart
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebSocketConnection:
|
||||
"""单个 WebSocket 连接"""
|
||||
|
||||
connection_id: str
|
||||
websocket: quart.websocket.WebSocket
|
||||
pipeline_uuid: str
|
||||
session_type: str # 'person' 或 'group'
|
||||
created_at: datetime
|
||||
last_ping: datetime
|
||||
|
||||
@property
|
||||
def session_key(self) -> str:
|
||||
"""会话唯一标识: pipeline_uuid:session_type"""
|
||||
return f"{self.pipeline_uuid}:{self.session_type}"
|
||||
|
||||
async def send(self, event_type: str, data: dict):
|
||||
"""发送事件到客户端
|
||||
|
||||
Args:
|
||||
event_type: 事件类型
|
||||
data: 事件数据
|
||||
"""
|
||||
try:
|
||||
await self.websocket.send_json({"type": event_type, "data": data})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send message to {self.connection_id}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
class WebSocketConnectionPool:
|
||||
"""WebSocket 连接池 - 按会话隔离
|
||||
|
||||
连接池结构:
|
||||
connections[session_key][connection_id] = WebSocketConnection
|
||||
其中 session_key = f"{pipeline_uuid}:{session_type}"
|
||||
|
||||
这样可以确保:
|
||||
- person 和 group 会话完全隔离
|
||||
- 不同 pipeline 的会话隔离
|
||||
- 同一会话的多个连接可以同步接收消息(多标签页)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.connections: dict[str, dict[str, WebSocketConnection]] = {}
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
def add_connection(self, conn: WebSocketConnection):
|
||||
"""添加连接到指定会话
|
||||
|
||||
Args:
|
||||
conn: WebSocket 连接对象
|
||||
"""
|
||||
session_key = conn.session_key
|
||||
|
||||
if session_key not in self.connections:
|
||||
self.connections[session_key] = {}
|
||||
|
||||
self.connections[session_key][conn.connection_id] = conn
|
||||
|
||||
logger.info(
|
||||
f"WebSocket connection added: {conn.connection_id} "
|
||||
f"to session {session_key} "
|
||||
f"(total: {len(self.connections[session_key])} connections)"
|
||||
)
|
||||
|
||||
async def remove_connection(self, connection_id: str, session_key: str):
|
||||
"""从指定会话移除连接
|
||||
|
||||
Args:
|
||||
connection_id: 连接 ID
|
||||
session_key: 会话标识
|
||||
"""
|
||||
async with self._lock:
|
||||
if session_key in self.connections:
|
||||
conn = self.connections[session_key].pop(connection_id, None)
|
||||
|
||||
# 如果该会话没有连接了,清理会话
|
||||
if not self.connections[session_key]:
|
||||
del self.connections[session_key]
|
||||
|
||||
if conn:
|
||||
logger.info(
|
||||
f"WebSocket connection removed: {connection_id} "
|
||||
f"from session {session_key} "
|
||||
f"(remaining: {len(self.connections.get(session_key, {}))} connections)"
|
||||
)
|
||||
|
||||
def get_connection(self, connection_id: str, session_key: str) -> Optional[WebSocketConnection]:
|
||||
"""获取指定连接
|
||||
|
||||
Args:
|
||||
connection_id: 连接 ID
|
||||
session_key: 会话标识
|
||||
|
||||
Returns:
|
||||
WebSocketConnection 或 None
|
||||
"""
|
||||
return self.connections.get(session_key, {}).get(connection_id)
|
||||
|
||||
def get_connections_by_session(self, pipeline_uuid: str, session_type: str) -> list[WebSocketConnection]:
|
||||
"""获取指定会话的所有连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型 ('person' 或 'group')
|
||||
|
||||
Returns:
|
||||
连接列表
|
||||
"""
|
||||
session_key = f"{pipeline_uuid}:{session_type}"
|
||||
return list(self.connections.get(session_key, {}).values())
|
||||
|
||||
async def broadcast_to_session(self, pipeline_uuid: str, session_type: str, event_type: str, data: dict):
|
||||
"""广播消息到指定会话的所有连接
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型 ('person' 或 'group')
|
||||
event_type: 事件类型
|
||||
data: 事件数据
|
||||
"""
|
||||
connections = self.get_connections_by_session(pipeline_uuid, session_type)
|
||||
|
||||
if not connections:
|
||||
logger.debug(f"No connections for session {pipeline_uuid}:{session_type}, skipping broadcast")
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Broadcasting {event_type} to session {pipeline_uuid}:{session_type}, " f"{len(connections)} connections"
|
||||
)
|
||||
|
||||
# 并发发送到所有连接,忽略失败的连接
|
||||
results = await asyncio.gather(*[conn.send(event_type, data) for conn in connections], return_exceptions=True)
|
||||
|
||||
# 统计失败的连接
|
||||
failed_count = sum(1 for result in results if isinstance(result, Exception))
|
||||
if failed_count > 0:
|
||||
logger.warning(f"Failed to send to {failed_count}/{len(connections)} connections")
|
||||
|
||||
def get_all_sessions(self) -> list[str]:
|
||||
"""获取所有活跃会话的 session_key 列表
|
||||
|
||||
Returns:
|
||||
会话标识列表
|
||||
"""
|
||||
return list(self.connections.keys())
|
||||
|
||||
def get_connection_count(self, pipeline_uuid: str, session_type: str) -> int:
|
||||
"""获取指定会话的连接数量
|
||||
|
||||
Args:
|
||||
pipeline_uuid: 流水线 UUID
|
||||
session_type: 会话类型
|
||||
|
||||
Returns:
|
||||
连接数量
|
||||
"""
|
||||
session_key = f"{pipeline_uuid}:{session_type}"
|
||||
return len(self.connections.get(session_key, {}))
|
||||
|
||||
async def cleanup_stale_connections(self, timeout_seconds: int = 120):
|
||||
"""清理超时的连接
|
||||
|
||||
Args:
|
||||
timeout_seconds: 超时时间(秒)
|
||||
"""
|
||||
now = datetime.now()
|
||||
stale_connections = []
|
||||
|
||||
# 查找超时连接
|
||||
for session_key, session_conns in self.connections.items():
|
||||
for conn_id, conn in session_conns.items():
|
||||
elapsed = (now - conn.last_ping).total_seconds()
|
||||
if elapsed > timeout_seconds:
|
||||
stale_connections.append((conn_id, session_key))
|
||||
|
||||
# 移除超时连接
|
||||
for conn_id, session_key in stale_connections:
|
||||
logger.warning(f"Removing stale connection: {conn_id} from {session_key}")
|
||||
await self.remove_connection(conn_id, session_key)
|
||||
|
||||
# 尝试关闭 WebSocket
|
||||
try:
|
||||
conn = self.get_connection(conn_id, session_key)
|
||||
if conn:
|
||||
await conn.websocket.close(1000, "Connection timeout")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing stale connection {conn_id}: {e}")
|
||||
|
||||
if stale_connections:
|
||||
logger.info(f"Cleaned up {len(stale_connections)} stale connections")
|
||||
@@ -105,6 +105,10 @@ class Application:
|
||||
|
||||
storage_mgr: storagemgr.StorageMgr = None
|
||||
|
||||
# ========= WebSocket =========
|
||||
|
||||
ws_pool = None # WebSocketConnectionPool
|
||||
|
||||
# ========= HTTP Services =========
|
||||
|
||||
user_service: user_service.UserService = None
|
||||
|
||||
@@ -19,6 +19,7 @@ from ...api.http.service import model as model_service
|
||||
from ...api.http.service import pipeline as pipeline_service
|
||||
from ...api.http.service import bot as bot_service
|
||||
from ...api.http.service import knowledge as knowledge_service
|
||||
from ...api.http.service import websocket_pool
|
||||
from ...discover import engine as discover_engine
|
||||
from ...storage import mgr as storagemgr
|
||||
from ...utils import logcache
|
||||
@@ -87,10 +88,18 @@ class BuildAppStage(stage.BootingStage):
|
||||
await llm_tool_mgr_inst.initialize()
|
||||
ap.tool_mgr = llm_tool_mgr_inst
|
||||
|
||||
# Initialize WebSocket connection pool
|
||||
ws_pool_inst = websocket_pool.WebSocketConnectionPool()
|
||||
ap.ws_pool = ws_pool_inst
|
||||
|
||||
im_mgr_inst = im_mgr.PlatformManager(ap=ap)
|
||||
await im_mgr_inst.initialize()
|
||||
ap.platform_mgr = im_mgr_inst
|
||||
|
||||
# Inject WebSocket pool into WebChatAdapter
|
||||
if hasattr(ap.platform_mgr, 'webchat_proxy_bot') and ap.platform_mgr.webchat_proxy_bot:
|
||||
ap.platform_mgr.webchat_proxy_bot.adapter.set_ws_pool(ws_pool_inst)
|
||||
|
||||
pipeline_mgr = pipelinemgr.PipelineManager(ap)
|
||||
await pipeline_mgr.initialize()
|
||||
ap.pipeline_mgr = pipeline_mgr
|
||||
|
||||
@@ -58,6 +58,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
debug_messages: dict[str, list[dict]] = pydantic.Field(default_factory=dict, exclude=True)
|
||||
|
||||
ap: app.Application = pydantic.Field(exclude=True)
|
||||
ws_pool: typing.Any = pydantic.Field(exclude=True, default=None) # WebSocketConnectionPool
|
||||
|
||||
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs):
|
||||
super().__init__(
|
||||
@@ -72,6 +73,15 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
self.bot_account_id = 'webchatbot'
|
||||
|
||||
self.debug_messages = {}
|
||||
self.ws_pool = None
|
||||
|
||||
def set_ws_pool(self, ws_pool):
|
||||
"""设置 WebSocket 连接池
|
||||
|
||||
Args:
|
||||
ws_pool: WebSocketConnectionPool 实例
|
||||
"""
|
||||
self.ws_pool = ws_pool
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
@@ -130,7 +140,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
quote_origin: bool = False,
|
||||
is_final: bool = False,
|
||||
) -> dict:
|
||||
"""回复消息"""
|
||||
"""Reply message chunk - supports both SSE (legacy) and WebSocket"""
|
||||
message_data = WebChatMessage(
|
||||
id=-1,
|
||||
role='assistant',
|
||||
@@ -139,24 +149,32 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
timestamp=datetime.now().isoformat(),
|
||||
)
|
||||
|
||||
# notify waiter
|
||||
session = (
|
||||
self.webchat_group_session
|
||||
if isinstance(message_source, platform_events.GroupMessage)
|
||||
else self.webchat_person_session
|
||||
)
|
||||
if message_source.message_chain.message_id not in session.resp_waiters:
|
||||
# session.resp_waiters[message_source.message_chain.message_id] = asyncio.Queue()
|
||||
queue = session.resp_queues[message_source.message_chain.message_id]
|
||||
# Determine session type
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_type = 'group'
|
||||
session = self.webchat_group_session
|
||||
else: # FriendMessage
|
||||
session_type = 'person'
|
||||
session = self.webchat_person_session
|
||||
|
||||
# if isinstance(message_source, platform_events.FriendMessage):
|
||||
# queue = self.webchat_person_session.resp_queues[message_source.message_chain.message_id]
|
||||
# elif isinstance(message_source, platform_events.GroupMessage):
|
||||
# queue = self.webchat_group_session.resp_queues[message_source.message_chain.message_id]
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
message_data.is_final = True
|
||||
# print(message_data)
|
||||
await queue.put(message_data)
|
||||
# Legacy SSE support: put message into queue
|
||||
if message_source.message_chain.message_id in session.resp_queues:
|
||||
queue = session.resp_queues[message_source.message_chain.message_id]
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
message_data.is_final = True
|
||||
await queue.put(message_data)
|
||||
|
||||
# WebSocket support: broadcast to all connections
|
||||
if self.ws_pool:
|
||||
pipeline_uuid = self.ap.platform_mgr.webchat_proxy_bot.bot_entity.use_pipeline_uuid
|
||||
|
||||
# Determine event type
|
||||
event_type = 'message_complete' if (is_final and bot_message.tool_calls is None) else 'message_chunk'
|
||||
|
||||
# Broadcast to specified session only
|
||||
await self.ws_pool.broadcast_to_session(
|
||||
pipeline_uuid=pipeline_uuid, session_type=session_type, event_type=event_type, data=message_data.model_dump()
|
||||
)
|
||||
|
||||
return message_data.model_dump()
|
||||
|
||||
|
||||
@@ -11,6 +11,10 @@ import { Message } from '@/app/infra/entities/message';
|
||||
import { toast } from 'sonner';
|
||||
import AtBadge from './AtBadge';
|
||||
import { Switch } from '@/components/ui/switch';
|
||||
import {
|
||||
PipelineWebSocketClient,
|
||||
SessionType,
|
||||
} from '@/app/infra/websocket/PipelineWebSocketClient';
|
||||
|
||||
interface MessageComponent {
|
||||
type: 'At' | 'Plain';
|
||||
@@ -31,17 +35,27 @@ export default function DebugDialog({
|
||||
}: DebugDialogProps) {
|
||||
const { t } = useTranslation();
|
||||
const [selectedPipelineId, setSelectedPipelineId] = useState(pipelineId);
|
||||
const [sessionType, setSessionType] = useState<'person' | 'group'>('person');
|
||||
const [sessionType, setSessionType] = useState<SessionType>('person');
|
||||
const [messages, setMessages] = useState<Message[]>([]);
|
||||
const [inputValue, setInputValue] = useState('');
|
||||
const [showAtPopover, setShowAtPopover] = useState(false);
|
||||
const [hasAt, setHasAt] = useState(false);
|
||||
const [isHovering, setIsHovering] = useState(false);
|
||||
const [isStreaming, setIsStreaming] = useState(true);
|
||||
const messagesEndRef = useRef<HTMLDivElement>(null);
|
||||
const inputRef = useRef<HTMLInputElement>(null);
|
||||
const popoverRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// WebSocket states
|
||||
const [wsClient, setWsClient] = useState<PipelineWebSocketClient | null>(
|
||||
null,
|
||||
);
|
||||
const [connectionStatus, setConnectionStatus] = useState<
|
||||
'disconnected' | 'connecting' | 'connected'
|
||||
>('disconnected');
|
||||
const [pendingMessages, setPendingMessages] = useState<
|
||||
Map<string, Message>
|
||||
>(new Map());
|
||||
|
||||
const scrollToBottom = useCallback(() => {
|
||||
// 使用setTimeout确保在DOM更新后执行滚动
|
||||
setTimeout(() => {
|
||||
@@ -57,37 +71,161 @@ export default function DebugDialog({
|
||||
}, 0);
|
||||
}, []);
|
||||
|
||||
const loadMessages = useCallback(
|
||||
async (pipelineId: string) => {
|
||||
try {
|
||||
const response = await httpClient.getWebChatHistoryMessages(
|
||||
pipelineId,
|
||||
sessionType,
|
||||
);
|
||||
setMessages(response.messages);
|
||||
} catch (error) {
|
||||
console.error('Failed to load messages:', error);
|
||||
}
|
||||
},
|
||||
[sessionType],
|
||||
);
|
||||
// 在useEffect中监听messages变化时滚动
|
||||
// Scroll to bottom when messages change
|
||||
useEffect(() => {
|
||||
scrollToBottom();
|
||||
}, [messages, scrollToBottom]);
|
||||
|
||||
// WebSocket connection setup
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
setSelectedPipelineId(pipelineId);
|
||||
loadMessages(pipelineId);
|
||||
}
|
||||
}, [open, pipelineId]);
|
||||
if (!open) return;
|
||||
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
loadMessages(selectedPipelineId);
|
||||
}
|
||||
}, [sessionType, selectedPipelineId, open, loadMessages]);
|
||||
const client = new PipelineWebSocketClient(
|
||||
selectedPipelineId,
|
||||
sessionType,
|
||||
);
|
||||
|
||||
// Setup event handlers
|
||||
client.onConnected = (data) => {
|
||||
console.log('[DebugDialog] WebSocket connected:', data);
|
||||
setConnectionStatus('connected');
|
||||
// Load history messages after connection
|
||||
client.loadHistory();
|
||||
};
|
||||
|
||||
client.onHistory = (data) => {
|
||||
console.log('[DebugDialog] History loaded:', data?.messages.length);
|
||||
if (data) {
|
||||
setMessages(data.messages);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageSent = (data) => {
|
||||
console.log('[DebugDialog] Message sent confirmed:', data);
|
||||
if (data) {
|
||||
// Update client message ID to server message ID
|
||||
const clientMsgId = data.client_message_id;
|
||||
const serverMsgId = data.server_message_id;
|
||||
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === -1 && pendingMessages.has(clientMsgId)
|
||||
? { ...msg, id: serverMsgId }
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
|
||||
setPendingMessages((prev) => {
|
||||
const newMap = new Map(prev);
|
||||
newMap.delete(clientMsgId);
|
||||
return newMap;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageStart = (data) => {
|
||||
console.log('[DebugDialog] Message start:', data);
|
||||
if (data) {
|
||||
const placeholderMessage: Message = {
|
||||
id: data.message_id,
|
||||
role: 'assistant',
|
||||
content: '',
|
||||
message_chain: [],
|
||||
timestamp: data.timestamp,
|
||||
};
|
||||
setMessages((prev) => [...prev, placeholderMessage]);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageChunk = (data) => {
|
||||
if (data) {
|
||||
// Update streaming message (content is cumulative)
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === data.message_id
|
||||
? {
|
||||
...msg,
|
||||
content: data.content,
|
||||
message_chain: data.message_chain,
|
||||
}
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageComplete = (data) => {
|
||||
console.log('[DebugDialog] Message complete:', data);
|
||||
if (data) {
|
||||
// Mark message as complete
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === data.message_id
|
||||
? {
|
||||
...msg,
|
||||
content: data.final_content,
|
||||
message_chain: data.message_chain,
|
||||
}
|
||||
: msg,
|
||||
),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
client.onMessageError = (data) => {
|
||||
console.error('[DebugDialog] Message error:', data);
|
||||
if (data) {
|
||||
toast.error(`Message error: ${data.error}`);
|
||||
}
|
||||
};
|
||||
|
||||
client.onPluginMessage = (data) => {
|
||||
console.log('[DebugDialog] Plugin message:', data);
|
||||
if (data) {
|
||||
const pluginMessage: Message = {
|
||||
id: data.message_id,
|
||||
role: 'assistant',
|
||||
content: data.content,
|
||||
message_chain: data.message_chain,
|
||||
timestamp: data.timestamp,
|
||||
};
|
||||
setMessages((prev) => [...prev, pluginMessage]);
|
||||
}
|
||||
};
|
||||
|
||||
client.onError = (data) => {
|
||||
console.error('[DebugDialog] WebSocket error:', data);
|
||||
if (data) {
|
||||
toast.error(`WebSocket error: ${data.error}`);
|
||||
}
|
||||
};
|
||||
|
||||
client.onDisconnected = () => {
|
||||
console.log('[DebugDialog] WebSocket disconnected');
|
||||
setConnectionStatus('disconnected');
|
||||
};
|
||||
|
||||
// Connect to WebSocket
|
||||
setConnectionStatus('connecting');
|
||||
client
|
||||
.connect(httpClient.getSessionSync())
|
||||
.then(() => {
|
||||
console.log('[DebugDialog] WebSocket connection established');
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error('[DebugDialog] Failed to connect WebSocket:', err);
|
||||
toast.error('Failed to connect to server');
|
||||
setConnectionStatus('disconnected');
|
||||
});
|
||||
|
||||
setWsClient(client);
|
||||
|
||||
// Cleanup on unmount or session type change
|
||||
return () => {
|
||||
console.log('[DebugDialog] Cleaning up WebSocket connection');
|
||||
client.disconnect();
|
||||
};
|
||||
}, [open, selectedPipelineId, sessionType]); // Reconnect when session type changes
|
||||
|
||||
useEffect(() => {
|
||||
const handleClickOutside = (event: MouseEvent) => {
|
||||
@@ -150,6 +288,12 @@ export default function DebugDialog({
|
||||
const sendMessage = async () => {
|
||||
if (!inputValue.trim() && !hasAt) return;
|
||||
|
||||
// Check WebSocket connection
|
||||
if (!wsClient || connectionStatus !== 'connected') {
|
||||
toast.error('Not connected to server');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const messageChain = [];
|
||||
|
||||
@@ -170,7 +314,7 @@ export default function DebugDialog({
|
||||
});
|
||||
|
||||
if (hasAt) {
|
||||
// for showing
|
||||
// For display
|
||||
text_content = '@webchatbot' + text_content;
|
||||
}
|
||||
|
||||
@@ -181,97 +325,26 @@ export default function DebugDialog({
|
||||
timestamp: new Date().toISOString(),
|
||||
message_chain: messageChain,
|
||||
};
|
||||
// 根据isStreaming状态决定使用哪种传输方式
|
||||
if (isStreaming) {
|
||||
// streaming
|
||||
// 创建初始bot消息
|
||||
const placeholderRandomId = Math.floor(Math.random() * 1000000);
|
||||
const botMessagePlaceholder: Message = {
|
||||
id: placeholderRandomId,
|
||||
role: 'assistant',
|
||||
content: 'Generating...',
|
||||
timestamp: new Date().toISOString(),
|
||||
message_chain: [{ type: 'Plain', text: 'Generating...' }],
|
||||
};
|
||||
|
||||
// 添加用户消息和初始bot消息到状态
|
||||
// Add user message to UI immediately
|
||||
setMessages((prevMessages) => [...prevMessages, userMessage]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
|
||||
setMessages((prevMessages) => [
|
||||
...prevMessages,
|
||||
userMessage,
|
||||
botMessagePlaceholder,
|
||||
]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
try {
|
||||
await httpClient.sendStreamingWebChatMessage(
|
||||
sessionType,
|
||||
messageChain,
|
||||
selectedPipelineId,
|
||||
(data) => {
|
||||
// 处理流式响应数据
|
||||
console.log('data', data);
|
||||
if (data.message) {
|
||||
// 更新完整内容
|
||||
// Send via WebSocket
|
||||
const clientMessageId = wsClient.sendMessage(messageChain);
|
||||
|
||||
setMessages((prevMessages) => {
|
||||
const updatedMessages = [...prevMessages];
|
||||
const botMessageIndex = updatedMessages.findIndex(
|
||||
(message) => message.id === placeholderRandomId,
|
||||
);
|
||||
if (botMessageIndex !== -1) {
|
||||
updatedMessages[botMessageIndex] = {
|
||||
...updatedMessages[botMessageIndex],
|
||||
content: data.message.content,
|
||||
message_chain: [
|
||||
{ type: 'Plain', text: data.message.content },
|
||||
],
|
||||
};
|
||||
}
|
||||
return updatedMessages;
|
||||
});
|
||||
}
|
||||
},
|
||||
() => {},
|
||||
(error) => {
|
||||
// 处理错误
|
||||
console.error('Streaming error:', error);
|
||||
if (sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Failed to send streaming message:', error);
|
||||
if (sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// non-streaming
|
||||
setMessages((prevMessages) => [...prevMessages, userMessage]);
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
// Track pending message for ID mapping
|
||||
setPendingMessages((prev) => {
|
||||
const newMap = new Map(prev);
|
||||
newMap.set(clientMessageId, userMessage);
|
||||
return newMap;
|
||||
});
|
||||
|
||||
const response = await httpClient.sendWebChatMessage(
|
||||
sessionType,
|
||||
messageChain,
|
||||
selectedPipelineId,
|
||||
180000,
|
||||
);
|
||||
|
||||
setMessages((prevMessages) => [...prevMessages, response.message]);
|
||||
}
|
||||
} catch (
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
error: any
|
||||
) {
|
||||
console.log(error, 'type of error', typeof error);
|
||||
console.error('Failed to send message:', error);
|
||||
|
||||
if (!error.message.includes('timeout') && sessionType === 'person') {
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
}
|
||||
console.log('[DebugDialog] Message sent:', clientMessageId);
|
||||
} catch (error) {
|
||||
console.error('[DebugDialog] Failed to send message:', error);
|
||||
toast.error(t('pipelines.debugDialog.sendFailed'));
|
||||
} finally {
|
||||
inputRef.current?.focus();
|
||||
}
|
||||
@@ -390,12 +463,6 @@ export default function DebugDialog({
|
||||
</ScrollArea>
|
||||
|
||||
<div className="p-4 pb-0 bg-white dark:bg-black flex gap-2">
|
||||
<div className="flex items-center gap-2">
|
||||
<span className="text-sm text-gray-600">
|
||||
{t('pipelines.debugDialog.streaming')}
|
||||
</span>
|
||||
<Switch checked={isStreaming} onCheckedChange={setIsStreaming} />
|
||||
</div>
|
||||
<div className="flex-1 flex items-center gap-2">
|
||||
{hasAt && (
|
||||
<AtBadge targetName="webchatbot" onRemove={handleAtRemove} />
|
||||
|
||||
394
web/src/app/infra/websocket/PipelineWebSocketClient.ts
Normal file
394
web/src/app/infra/websocket/PipelineWebSocketClient.ts
Normal file
@@ -0,0 +1,394 @@
|
||||
/**
|
||||
* Pipeline WebSocket Client
|
||||
*
|
||||
* Provides real-time bidirectional communication for pipeline debugging.
|
||||
* Supports person and group session isolation.
|
||||
*/
|
||||
|
||||
import { Message } from '@/app/infra/entities/message';
|
||||
|
||||
export type SessionType = 'person' | 'group';
|
||||
|
||||
export interface WebSocketEventData {
|
||||
// Connected event
|
||||
connected?: {
|
||||
connection_id: string;
|
||||
session_type: SessionType;
|
||||
pipeline_uuid: string;
|
||||
};
|
||||
|
||||
// History event
|
||||
history?: {
|
||||
messages: Message[];
|
||||
has_more: boolean;
|
||||
};
|
||||
|
||||
// Message sent confirmation
|
||||
message_sent?: {
|
||||
client_message_id: string;
|
||||
server_message_id: number;
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message start
|
||||
message_start?: {
|
||||
message_id: number;
|
||||
role: 'assistant';
|
||||
timestamp: string;
|
||||
reply_to: number;
|
||||
};
|
||||
|
||||
// Message chunk
|
||||
message_chunk?: {
|
||||
message_id: number;
|
||||
content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message complete
|
||||
message_complete?: {
|
||||
message_id: number;
|
||||
final_content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// Message error
|
||||
message_error?: {
|
||||
message_id: number;
|
||||
error: string;
|
||||
error_code?: string;
|
||||
};
|
||||
|
||||
// Interrupted
|
||||
interrupted?: {
|
||||
message_id: number;
|
||||
partial_content: string;
|
||||
};
|
||||
|
||||
// Plugin message
|
||||
plugin_message?: {
|
||||
message_id: number;
|
||||
role: 'assistant';
|
||||
content: string;
|
||||
message_chain: object[];
|
||||
timestamp: string;
|
||||
source: 'plugin';
|
||||
};
|
||||
|
||||
// Error
|
||||
error?: {
|
||||
error: string;
|
||||
error_code: string;
|
||||
details?: object;
|
||||
};
|
||||
|
||||
// Pong
|
||||
pong?: {
|
||||
timestamp: number;
|
||||
};
|
||||
}
|
||||
|
||||
export class PipelineWebSocketClient {
|
||||
private ws: WebSocket | null = null;
|
||||
private pipelineId: string;
|
||||
private sessionType: SessionType;
|
||||
private reconnectAttempts = 0;
|
||||
private maxReconnectAttempts = 5;
|
||||
private pingInterval: NodeJS.Timeout | null = null;
|
||||
private reconnectTimeout: NodeJS.Timeout | null = null;
|
||||
private isManualDisconnect = false;
|
||||
|
||||
// Event callbacks
|
||||
public onConnected?: (data: WebSocketEventData['connected']) => void;
|
||||
public onHistory?: (data: WebSocketEventData['history']) => void;
|
||||
public onMessageSent?: (data: WebSocketEventData['message_sent']) => void;
|
||||
public onMessageStart?: (data: WebSocketEventData['message_start']) => void;
|
||||
public onMessageChunk?: (data: WebSocketEventData['message_chunk']) => void;
|
||||
public onMessageComplete?: (
|
||||
data: WebSocketEventData['message_complete'],
|
||||
) => void;
|
||||
public onMessageError?: (data: WebSocketEventData['message_error']) => void;
|
||||
public onInterrupted?: (data: WebSocketEventData['interrupted']) => void;
|
||||
public onPluginMessage?: (data: WebSocketEventData['plugin_message']) => void;
|
||||
public onError?: (data: WebSocketEventData['error']) => void;
|
||||
public onDisconnected?: () => void;
|
||||
|
||||
constructor(pipelineId: string, sessionType: SessionType) {
|
||||
this.pipelineId = pipelineId;
|
||||
this.sessionType = sessionType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to WebSocket server
|
||||
*/
|
||||
connect(token: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.isManualDisconnect = false;
|
||||
|
||||
const wsUrl = this.buildWebSocketUrl();
|
||||
console.log(`[WebSocket] Connecting to ${wsUrl}...`);
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(wsUrl);
|
||||
} catch (error) {
|
||||
console.error('[WebSocket] Failed to create WebSocket:', error);
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
|
||||
this.ws.onopen = () => {
|
||||
console.log('[WebSocket] Connection opened');
|
||||
|
||||
// Send connect event with session type and token
|
||||
this.send('connect', {
|
||||
pipeline_uuid: this.pipelineId,
|
||||
session_type: this.sessionType,
|
||||
token,
|
||||
});
|
||||
|
||||
// Start ping interval
|
||||
this.startPing();
|
||||
|
||||
// Reset reconnect attempts on successful connection
|
||||
this.reconnectAttempts = 0;
|
||||
|
||||
resolve();
|
||||
};
|
||||
|
||||
this.ws.onmessage = (event) => {
|
||||
this.handleMessage(event);
|
||||
};
|
||||
|
||||
this.ws.onerror = (error) => {
|
||||
console.error('[WebSocket] Error:', error);
|
||||
reject(error);
|
||||
};
|
||||
|
||||
this.ws.onclose = (event) => {
|
||||
console.log(
|
||||
`[WebSocket] Connection closed: code=${event.code}, reason=${event.reason}`,
|
||||
);
|
||||
this.handleDisconnect();
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming WebSocket message
|
||||
*/
|
||||
private handleMessage(event: MessageEvent) {
|
||||
try {
|
||||
const message = JSON.parse(event.data);
|
||||
const { type, data } = message;
|
||||
|
||||
console.log(`[WebSocket] Received: ${type}`, data);
|
||||
|
||||
switch (type) {
|
||||
case 'connected':
|
||||
this.onConnected?.(data);
|
||||
break;
|
||||
case 'history':
|
||||
this.onHistory?.(data);
|
||||
break;
|
||||
case 'message_sent':
|
||||
this.onMessageSent?.(data);
|
||||
break;
|
||||
case 'message_start':
|
||||
this.onMessageStart?.(data);
|
||||
break;
|
||||
case 'message_chunk':
|
||||
this.onMessageChunk?.(data);
|
||||
break;
|
||||
case 'message_complete':
|
||||
this.onMessageComplete?.(data);
|
||||
break;
|
||||
case 'message_error':
|
||||
this.onMessageError?.(data);
|
||||
break;
|
||||
case 'interrupted':
|
||||
this.onInterrupted?.(data);
|
||||
break;
|
||||
case 'plugin_message':
|
||||
this.onPluginMessage?.(data);
|
||||
break;
|
||||
case 'error':
|
||||
this.onError?.(data);
|
||||
break;
|
||||
case 'pong':
|
||||
// Heartbeat response, no action needed
|
||||
break;
|
||||
default:
|
||||
console.warn(`[WebSocket] Unknown message type: ${type}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[WebSocket] Failed to parse message:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message to server
|
||||
*/
|
||||
sendMessage(messageChain: object[]): string {
|
||||
const clientMessageId = this.generateMessageId();
|
||||
this.send('send_message', {
|
||||
message_chain: messageChain,
|
||||
client_message_id: clientMessageId,
|
||||
});
|
||||
return clientMessageId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load history messages
|
||||
*/
|
||||
loadHistory(beforeMessageId?: number, limit?: number) {
|
||||
this.send('load_history', {
|
||||
before_message_id: beforeMessageId,
|
||||
limit,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Interrupt streaming message
|
||||
*/
|
||||
interrupt(messageId: number) {
|
||||
this.send('interrupt', { message_id: messageId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Send event to server
|
||||
*/
|
||||
private send(type: string, data: object) {
|
||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||
const message = JSON.stringify({ type, data });
|
||||
console.log(`[WebSocket] Sending: ${type}`, data);
|
||||
this.ws.send(message);
|
||||
} else {
|
||||
console.warn(
|
||||
`[WebSocket] Cannot send message, connection not open (state: ${this.ws?.readyState})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start ping interval (heartbeat)
|
||||
*/
|
||||
private startPing() {
|
||||
this.stopPing();
|
||||
this.pingInterval = setInterval(() => {
|
||||
this.send('ping', { timestamp: Date.now() });
|
||||
}, 30000); // Ping every 30 seconds
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop ping interval
|
||||
*/
|
||||
private stopPing() {
|
||||
if (this.pingInterval) {
|
||||
clearInterval(this.pingInterval);
|
||||
this.pingInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle disconnection
|
||||
*/
|
||||
private handleDisconnect() {
|
||||
this.stopPing();
|
||||
this.onDisconnected?.();
|
||||
|
||||
// Auto reconnect if not manual disconnect
|
||||
if (
|
||||
!this.isManualDisconnect &&
|
||||
this.reconnectAttempts < this.maxReconnectAttempts
|
||||
) {
|
||||
const delay = Math.min(2000 * Math.pow(2, this.reconnectAttempts), 30000);
|
||||
console.log(
|
||||
`[WebSocket] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`,
|
||||
);
|
||||
|
||||
this.reconnectTimeout = setTimeout(() => {
|
||||
this.reconnectAttempts++;
|
||||
// Note: Need to get token again, should be handled by caller
|
||||
console.warn(
|
||||
'[WebSocket] Auto-reconnect requires token, please reconnect manually',
|
||||
);
|
||||
}, delay);
|
||||
} else if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
console.error(
|
||||
'[WebSocket] Max reconnect attempts reached, giving up',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from server
|
||||
*/
|
||||
disconnect() {
|
||||
this.isManualDisconnect = true;
|
||||
this.stopPing();
|
||||
|
||||
if (this.reconnectTimeout) {
|
||||
clearTimeout(this.reconnectTimeout);
|
||||
this.reconnectTimeout = null;
|
||||
}
|
||||
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, 'Client disconnect');
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
console.log('[WebSocket] Disconnected');
|
||||
}
|
||||
|
||||
/**
|
||||
* Build WebSocket URL
|
||||
*/
|
||||
private buildWebSocketUrl(): string {
|
||||
// Get current base URL
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
const host = window.location.host;
|
||||
|
||||
return `${protocol}//${host}/api/v1/pipelines/${this.pipelineId}/chat/ws`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate unique client message ID
|
||||
*/
|
||||
private generateMessageId(): string {
|
||||
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection state
|
||||
*/
|
||||
getState():
|
||||
| 'CONNECTING'
|
||||
| 'OPEN'
|
||||
| 'CLOSING'
|
||||
| 'CLOSED'
|
||||
| 'DISCONNECTED' {
|
||||
if (!this.ws) return 'DISCONNECTED';
|
||||
|
||||
switch (this.ws.readyState) {
|
||||
case WebSocket.CONNECTING:
|
||||
return 'CONNECTING';
|
||||
case WebSocket.OPEN:
|
||||
return 'OPEN';
|
||||
case WebSocket.CLOSING:
|
||||
return 'CLOSING';
|
||||
case WebSocket.CLOSED:
|
||||
return 'CLOSED';
|
||||
default:
|
||||
return 'DISCONNECTED';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if connected
|
||||
*/
|
||||
isConnected(): boolean {
|
||||
return this.ws?.readyState === WebSocket.OPEN;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user