Compare commits

..

19 Commits

Author SHA1 Message Date
RockChinQ
a4995c8cd9 feat: Migrate pipeline debug from SSE to WebSocket
Replace Server-Sent Events (SSE) with WebSocket for pipeline debugging
to support bidirectional real-time communication and resolve timeout
limitations.

## Backend Changes

- Add WebSocketConnectionPool for managing client connections
- Implement WebSocket route handler at /api/v1/pipelines/<uuid>/chat/ws
- Modify WebChatAdapter to broadcast messages via WebSocket
- Support both legacy SSE and new WebSocket simultaneously
- Maintain person/group session isolation

## Frontend Changes

- Create PipelineWebSocketClient for WebSocket communication
- Update DebugDialog to use WebSocket instead of SSE
- Support auto-reconnection and connection status tracking
- Remove streaming toggle (WebSocket always supports streaming)

## Key Features

- Eliminates 120-second timeout limitation
- Supports multiple messages per request
- Enables backend push notifications (plugins)
- Maintains session isolation (person vs group)
- Backward compatible with SSE during transition

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-22 12:50:18 +00:00
Junyan Qin
127a38b15c chore: bump version 4.3.9 2025-10-22 18:52:45 +08:00
fdc310
e4729337c8 Feat/coze runner (#1714)
* feat:add coze api client and coze runner and coze config

* del print

* fix:Change the default setting of the plugin system to true

* fix:del multimodal-support config, default multimodal-support,and in cozeapi.py Obtain timeout and auto-save-history config

* chore: add comment for coze.com

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>
2025-10-17 18:13:03 +08:00
Junyan Qin
5fa75330cf perf: store pipeline sort method 2025-10-12 21:11:30 +08:00
Junyan Qin
547e3d098e perf: add component list in plugin detail dialog 2025-10-12 19:57:42 +08:00
Junyan Qin
f1ddddfe00 chore: bump version 4.3.8 2025-10-10 22:50:57 +08:00
Junyan Qin
4e61302156 fix: datetime serialization error in emit_event (#1713) 2025-10-10 22:37:39 +08:00
Junyan Qin
9e3cf418ba perf: output pipeline error in en 2025-10-10 17:55:49 +08:00
Junyan Qin
3e29ec7892 perf: allow not set llm model (#1703) 2025-10-10 16:34:01 +08:00
Junyan Qin
f452742cd2 fix: bad Plain component init in wechatpad (#1712) 2025-10-10 14:48:21 +08:00
Junyan Qin
b560432b0b chore: bump version 4.3.7 2025-10-08 14:36:48 +08:00
Junyan Qin (Chin)
99e5478ced fix: return empty data when plugin system disabled (#1710) 2025-10-07 16:24:38 +08:00
Junyan Qin
09dba91a37 chore: bump version 4.3.7b1 2025-10-07 15:30:33 +08:00
Junyan Qin
18ec4adac9 chore: bump langbot-plugin to 0.1.4b2 2025-10-07 15:25:49 +08:00
Junyan Qin
8bedaa468a chore: add codecov.yml 2025-10-07 00:15:56 +08:00
Guanchao Wang
0ab366fcac Fix/qqo (#1709)
* fix: qq official

* fix: appid
2025-10-07 00:06:07 +08:00
Junyan Qin
d664039e54 feat: update for new events fields 2025-10-06 23:22:38 +08:00
Junyan Qin
6535ba4f72 chore: bump version 4.3.6 2025-10-04 00:22:08 +08:00
Thetail001
3b181cff93 Fix: Correct data type mismatch in AtBotRule (#1705)
Fix can't '@' in QQ group.
2025-10-04 00:20:27 +08:00
40 changed files with 1867 additions and 277 deletions

4
codecov.yml Normal file
View File

@@ -0,0 +1,4 @@
coverage:
status:
project: off
patch: off

View File

View File

@@ -0,0 +1,192 @@
import json
import asyncio
import aiohttp
import io
from typing import Dict, List, Any, AsyncGenerator
import os
from pathlib import Path
class AsyncCozeAPIClient:
def __init__(self, api_key: str, api_base: str = "https://api.coze.cn"):
self.api_key = api_key
self.api_base = api_base
self.session = None
async def __aenter__(self):
"""支持异步上下文管理器"""
await self.coze_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出时自动关闭会话"""
await self.close()
async def coze_session(self):
"""确保HTTP session存在"""
if self.session is None:
connector = aiohttp.TCPConnector(
ssl=False if self.api_base.startswith("http://") else True,
limit=100,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True,
)
timeout = aiohttp.ClientTimeout(
total=120, # 默认超时时间
connect=30,
sock_read=120,
)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "text/event-stream",
}
self.session = aiohttp.ClientSession(
headers=headers, timeout=timeout, connector=connector
)
return self.session
async def close(self):
"""显式关闭会话"""
if self.session and not self.session.closed:
await self.session.close()
self.session = None
async def upload(
self,
file,
) -> str:
# 处理 Path 对象
if isinstance(file, Path):
if not file.exists():
raise ValueError(f"File not found: {file}")
with open(file, "rb") as f:
file = f.read()
# 处理文件路径字符串
elif isinstance(file, str):
if not os.path.isfile(file):
raise ValueError(f"File not found: {file}")
with open(file, "rb") as f:
file = f.read()
# 处理文件对象
elif hasattr(file, 'read'):
file = file.read()
session = await self.coze_session()
url = f"{self.api_base}/v1/files/upload"
try:
file_io = io.BytesIO(file)
async with session.post(
url,
data={
"file": file_io,
},
timeout=aiohttp.ClientTimeout(total=60),
) as response:
if response.status == 401:
raise Exception("Coze API 认证失败,请检查 API Key 是否正确")
response_text = await response.text()
if response.status != 200:
raise Exception(
f"文件上传失败,状态码: {response.status}, 响应: {response_text}"
)
try:
result = await response.json()
except json.JSONDecodeError:
raise Exception(f"文件上传响应解析失败: {response_text}")
if result.get("code") != 0:
raise Exception(f"文件上传失败: {result.get('msg', '未知错误')}")
file_id = result["data"]["id"]
return file_id
except asyncio.TimeoutError:
raise Exception("文件上传超时")
except Exception as e:
raise Exception(f"文件上传失败: {str(e)}")
async def chat_messages(
self,
bot_id: str,
user_id: str,
additional_messages: List[Dict] | None = None,
conversation_id: str | None = None,
auto_save_history: bool = True,
stream: bool = True,
timeout: float = 120,
) -> AsyncGenerator[Dict[str, Any], None]:
"""发送聊天消息并返回流式响应
Args:
bot_id: Bot ID
user_id: 用户ID
additional_messages: 额外消息列表
conversation_id: 会话ID
auto_save_history: 是否自动保存历史
stream: 是否流式响应
timeout: 超时时间
"""
session = await self.coze_session()
url = f"{self.api_base}/v3/chat"
payload = {
"bot_id": bot_id,
"user_id": user_id,
"stream": stream,
"auto_save_history": auto_save_history,
}
if additional_messages:
payload["additional_messages"] = additional_messages
params = {}
if conversation_id:
params["conversation_id"] = conversation_id
try:
async with session.post(
url,
json=payload,
params=params,
timeout=aiohttp.ClientTimeout(total=timeout),
) as response:
if response.status == 401:
raise Exception("Coze API 认证失败,请检查 API Key 是否正确")
if response.status != 200:
raise Exception(f"Coze API 流式请求失败,状态码: {response.status}")
async for chunk in response.content:
chunk = chunk.decode("utf-8")
if chunk != '\n':
if chunk.startswith("event:"):
chunk_type = chunk.replace("event:", "", 1).strip()
elif chunk.startswith("data:"):
chunk_data = chunk.replace("data:", "", 1).strip()
else:
yield {"event": chunk_type, "data": json.loads(chunk_data)}
except asyncio.TimeoutError:
raise Exception(f"Coze API 流式请求超时 ({timeout}秒)")
except Exception as e:
raise Exception(f"Coze API 流式请求失败: {str(e)}")

View File

@@ -0,0 +1,229 @@
"""流水线调试 WebSocket 路由
提供基于 WebSocket 的实时双向通信,用于流水线调试。
支持 person 和 group 两种会话类型的隔离。
"""
import asyncio
import logging
import uuid
from datetime import datetime
import quart
from ... import group
from ....service.websocket_pool import WebSocketConnection
logger = logging.getLogger(__name__)
async def handle_client_event(connection: WebSocketConnection, message: dict, ap):
"""处理客户端发送的事件
Args:
connection: WebSocket 连接对象
message: 客户端消息 {'type': 'xxx', 'data': {...}}
ap: Application 实例
"""
event_type = message.get('type')
data = message.get('data', {})
pipeline_uuid = connection.pipeline_uuid
session_type = connection.session_type
try:
webchat_adapter = ap.platform_mgr.webchat_proxy_bot.adapter
if event_type == 'send_message':
# 发送消息到指定会话
message_chain_obj = data.get('message_chain', [])
client_message_id = data.get('client_message_id')
if not message_chain_obj:
await connection.send('error', {'error': 'message_chain is required', 'error_code': 'INVALID_REQUEST'})
return
logger.info(
f"Received send_message: pipeline={pipeline_uuid}, "
f"session={session_type}, "
f"client_msg_id={client_message_id}"
)
# 调用 webchat_adapter.send_webchat_message
# 消息将通过 reply_message_chunk 自动推送到 WebSocket
result = None
async for msg in webchat_adapter.send_webchat_message(
pipeline_uuid=pipeline_uuid, session_type=session_type, message_chain_obj=message_chain_obj, is_stream=True
):
result = msg
# 发送确认
if result:
await connection.send(
'message_sent',
{
'client_message_id': client_message_id,
'server_message_id': result.get('id'),
'timestamp': result.get('timestamp'),
},
)
elif event_type == 'load_history':
# 加载指定会话的历史消息
before_message_id = data.get('before_message_id')
limit = data.get('limit', 50)
logger.info(f"Loading history: pipeline={pipeline_uuid}, session={session_type}, limit={limit}")
# 从对应会话获取历史消息
messages = webchat_adapter.get_webchat_messages(pipeline_uuid, session_type)
# 简单分页:返回最后 limit 条
if before_message_id:
# TODO: 实现基于 message_id 的分页
history_messages = messages[-limit:]
else:
history_messages = messages[-limit:] if len(messages) > limit else messages
await connection.send(
'history', {'messages': history_messages, 'has_more': len(messages) > len(history_messages)}
)
elif event_type == 'interrupt':
# 中断消息
message_id = data.get('message_id')
logger.info(f"Interrupt requested: message_id={message_id}")
# TODO: 实现中断逻辑
await connection.send('interrupted', {'message_id': message_id, 'partial_content': ''})
elif event_type == 'ping':
# 心跳
connection.last_ping = datetime.now()
await connection.send('pong', {'timestamp': data.get('timestamp')})
else:
logger.warning(f"Unknown event type: {event_type}")
await connection.send('error', {'error': f'Unknown event type: {event_type}', 'error_code': 'UNKNOWN_EVENT'})
except Exception as e:
logger.error(f"Error handling event {event_type}: {e}", exc_info=True)
await connection.send(
'error',
{'error': f'Internal server error: {str(e)}', 'error_code': 'INTERNAL_ERROR', 'details': {'event_type': event_type}},
)
@group.group_class('pipeline-websocket', '/api/v1/pipelines/<pipeline_uuid>/chat')
class PipelineWebSocketRouterGroup(group.RouterGroup):
"""流水线调试 WebSocket 路由组"""
async def initialize(self) -> None:
@self.route('/ws')
async def websocket_handler(pipeline_uuid: str):
"""WebSocket 连接处理 - 会话隔离
连接流程:
1. 客户端建立 WebSocket 连接
2. 客户端发送 connect 事件(携带 session_type 和 token
3. 服务端验证并创建连接对象
4. 进入消息循环,处理客户端事件
5. 断开时清理连接
Args:
pipeline_uuid: 流水线 UUID
"""
websocket = quart.websocket._get_current_object()
connection_id = str(uuid.uuid4())
session_key = None
connection = None
try:
# 1. 等待客户端发送 connect 事件
first_message = await websocket.receive_json()
if first_message.get('type') != 'connect':
await websocket.send_json(
{'type': 'error', 'data': {'error': 'First message must be connect event', 'error_code': 'INVALID_HANDSHAKE'}}
)
await websocket.close(1008)
return
connect_data = first_message.get('data', {})
session_type = connect_data.get('session_type')
token = connect_data.get('token')
# 验证参数
if session_type not in ['person', 'group']:
await websocket.send_json(
{'type': 'error', 'data': {'error': 'session_type must be person or group', 'error_code': 'INVALID_SESSION_TYPE'}}
)
await websocket.close(1008)
return
# 验证 token
if not token:
await websocket.send_json(
{'type': 'error', 'data': {'error': 'token is required', 'error_code': 'MISSING_TOKEN'}}
)
await websocket.close(1008)
return
# 验证用户身份
try:
user = await self.ap.user_service.verify_token(token)
if not user:
await websocket.send_json({'type': 'error', 'data': {'error': 'Unauthorized', 'error_code': 'UNAUTHORIZED'}})
await websocket.close(1008)
return
except Exception as e:
logger.error(f"Token verification failed: {e}")
await websocket.send_json(
{'type': 'error', 'data': {'error': 'Token verification failed', 'error_code': 'AUTH_ERROR'}}
)
await websocket.close(1008)
return
# 2. 创建连接对象并加入连接池
connection = WebSocketConnection(
connection_id=connection_id,
websocket=websocket,
pipeline_uuid=pipeline_uuid,
session_type=session_type,
created_at=datetime.now(),
last_ping=datetime.now(),
)
session_key = connection.session_key
ws_pool = self.ap.ws_pool
ws_pool.add_connection(connection)
# 3. 发送连接成功事件
await connection.send(
'connected', {'connection_id': connection_id, 'session_type': session_type, 'pipeline_uuid': pipeline_uuid}
)
logger.info(f"WebSocket connected: {connection_id} [pipeline={pipeline_uuid}, session={session_type}]")
# 4. 进入消息处理循环
while True:
try:
message = await websocket.receive_json()
await handle_client_event(connection, message, self.ap)
except asyncio.CancelledError:
logger.info(f"WebSocket connection cancelled: {connection_id}")
break
except Exception as e:
logger.error(f"Error receiving message from {connection_id}: {e}")
break
except quart.exceptions.WebsocketDisconnected:
logger.info(f"WebSocket disconnected: {connection_id}")
except Exception as e:
logger.error(f"WebSocket error for {connection_id}: {e}", exc_info=True)
finally:
# 清理连接
if connection and session_key:
ws_pool = self.ap.ws_pool
await ws_pool.remove_connection(connection_id, session_key)
logger.info(f"WebSocket connection cleaned up: {connection_id}")

View File

@@ -0,0 +1,211 @@
"""WebSocket 连接池管理
用于管理流水线调试的 WebSocket 连接,支持会话隔离和消息广播。
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import quart
logger = logging.getLogger(__name__)
@dataclass
class WebSocketConnection:
"""单个 WebSocket 连接"""
connection_id: str
websocket: quart.websocket.WebSocket
pipeline_uuid: str
session_type: str # 'person' 或 'group'
created_at: datetime
last_ping: datetime
@property
def session_key(self) -> str:
"""会话唯一标识: pipeline_uuid:session_type"""
return f"{self.pipeline_uuid}:{self.session_type}"
async def send(self, event_type: str, data: dict):
"""发送事件到客户端
Args:
event_type: 事件类型
data: 事件数据
"""
try:
await self.websocket.send_json({"type": event_type, "data": data})
except Exception as e:
logger.error(f"Failed to send message to {self.connection_id}: {e}")
raise
class WebSocketConnectionPool:
"""WebSocket 连接池 - 按会话隔离
连接池结构:
connections[session_key][connection_id] = WebSocketConnection
其中 session_key = f"{pipeline_uuid}:{session_type}"
这样可以确保:
- person 和 group 会话完全隔离
- 不同 pipeline 的会话隔离
- 同一会话的多个连接可以同步接收消息(多标签页)
"""
def __init__(self):
self.connections: dict[str, dict[str, WebSocketConnection]] = {}
self._lock = asyncio.Lock()
def add_connection(self, conn: WebSocketConnection):
"""添加连接到指定会话
Args:
conn: WebSocket 连接对象
"""
session_key = conn.session_key
if session_key not in self.connections:
self.connections[session_key] = {}
self.connections[session_key][conn.connection_id] = conn
logger.info(
f"WebSocket connection added: {conn.connection_id} "
f"to session {session_key} "
f"(total: {len(self.connections[session_key])} connections)"
)
async def remove_connection(self, connection_id: str, session_key: str):
"""从指定会话移除连接
Args:
connection_id: 连接 ID
session_key: 会话标识
"""
async with self._lock:
if session_key in self.connections:
conn = self.connections[session_key].pop(connection_id, None)
# 如果该会话没有连接了,清理会话
if not self.connections[session_key]:
del self.connections[session_key]
if conn:
logger.info(
f"WebSocket connection removed: {connection_id} "
f"from session {session_key} "
f"(remaining: {len(self.connections.get(session_key, {}))} connections)"
)
def get_connection(self, connection_id: str, session_key: str) -> Optional[WebSocketConnection]:
"""获取指定连接
Args:
connection_id: 连接 ID
session_key: 会话标识
Returns:
WebSocketConnection 或 None
"""
return self.connections.get(session_key, {}).get(connection_id)
def get_connections_by_session(self, pipeline_uuid: str, session_type: str) -> list[WebSocketConnection]:
"""获取指定会话的所有连接
Args:
pipeline_uuid: 流水线 UUID
session_type: 会话类型 ('person''group')
Returns:
连接列表
"""
session_key = f"{pipeline_uuid}:{session_type}"
return list(self.connections.get(session_key, {}).values())
async def broadcast_to_session(self, pipeline_uuid: str, session_type: str, event_type: str, data: dict):
"""广播消息到指定会话的所有连接
Args:
pipeline_uuid: 流水线 UUID
session_type: 会话类型 ('person''group')
event_type: 事件类型
data: 事件数据
"""
connections = self.get_connections_by_session(pipeline_uuid, session_type)
if not connections:
logger.debug(f"No connections for session {pipeline_uuid}:{session_type}, skipping broadcast")
return
logger.debug(
f"Broadcasting {event_type} to session {pipeline_uuid}:{session_type}, " f"{len(connections)} connections"
)
# 并发发送到所有连接,忽略失败的连接
results = await asyncio.gather(*[conn.send(event_type, data) for conn in connections], return_exceptions=True)
# 统计失败的连接
failed_count = sum(1 for result in results if isinstance(result, Exception))
if failed_count > 0:
logger.warning(f"Failed to send to {failed_count}/{len(connections)} connections")
def get_all_sessions(self) -> list[str]:
"""获取所有活跃会话的 session_key 列表
Returns:
会话标识列表
"""
return list(self.connections.keys())
def get_connection_count(self, pipeline_uuid: str, session_type: str) -> int:
"""获取指定会话的连接数量
Args:
pipeline_uuid: 流水线 UUID
session_type: 会话类型
Returns:
连接数量
"""
session_key = f"{pipeline_uuid}:{session_type}"
return len(self.connections.get(session_key, {}))
async def cleanup_stale_connections(self, timeout_seconds: int = 120):
"""清理超时的连接
Args:
timeout_seconds: 超时时间(秒)
"""
now = datetime.now()
stale_connections = []
# 查找超时连接
for session_key, session_conns in self.connections.items():
for conn_id, conn in session_conns.items():
elapsed = (now - conn.last_ping).total_seconds()
if elapsed > timeout_seconds:
stale_connections.append((conn_id, session_key))
# 移除超时连接
for conn_id, session_key in stale_connections:
logger.warning(f"Removing stale connection: {conn_id} from {session_key}")
await self.remove_connection(conn_id, session_key)
# 尝试关闭 WebSocket
try:
conn = self.get_connection(conn_id, session_key)
if conn:
await conn.websocket.close(1000, "Connection timeout")
except Exception as e:
logger.error(f"Error closing stale connection {conn_id}: {e}")
if stale_connections:
logger.info(f"Cleaned up {len(stale_connections)} stale connections")

View File

@@ -105,6 +105,10 @@ class Application:
storage_mgr: storagemgr.StorageMgr = None
# ========= WebSocket =========
ws_pool = None # WebSocketConnectionPool
# ========= HTTP Services =========
user_service: user_service.UserService = None

View File

@@ -19,6 +19,7 @@ from ...api.http.service import model as model_service
from ...api.http.service import pipeline as pipeline_service
from ...api.http.service import bot as bot_service
from ...api.http.service import knowledge as knowledge_service
from ...api.http.service import websocket_pool
from ...discover import engine as discover_engine
from ...storage import mgr as storagemgr
from ...utils import logcache
@@ -87,10 +88,18 @@ class BuildAppStage(stage.BootingStage):
await llm_tool_mgr_inst.initialize()
ap.tool_mgr = llm_tool_mgr_inst
# Initialize WebSocket connection pool
ws_pool_inst = websocket_pool.WebSocketConnectionPool()
ap.ws_pool = ws_pool_inst
im_mgr_inst = im_mgr.PlatformManager(ap=ap)
await im_mgr_inst.initialize()
ap.platform_mgr = im_mgr_inst
# Inject WebSocket pool into WebChatAdapter
if hasattr(ap.platform_mgr, 'webchat_proxy_bot') and ap.platform_mgr.webchat_proxy_bot:
ap.platform_mgr.webchat_proxy_bot.adapter.set_ws_pool(ws_pool_inst)
pipeline_mgr = pipelinemgr.PipelineManager(ap)
await pipeline_mgr.initialize()
ap.pipeline_mgr = pipeline_mgr

View File

@@ -213,7 +213,7 @@ class RuntimePipeline:
await self._execute_from_stage(0, query)
except Exception as e:
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'处理请求时出错 query_id={query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
finally:
self.ap.logger.debug(f'Query {query.query_id} processed')

View File

@@ -35,11 +35,17 @@ class PreProcessor(stage.PipelineStage):
session = await self.ap.sess_mgr.get_session(query)
# When not local-agent, llm_model is None
llm_model = (
await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model'])
if selected_runner == 'local-agent'
else None
)
try:
llm_model = (
await self.ap.model_mgr.get_model_by_uuid(query.pipeline_config['ai']['local-agent']['model'])
if selected_runner == 'local-agent'
else None
)
except ValueError:
self.ap.logger.warning(
f'LLM model {query.pipeline_config["ai"]["local-agent"]["model"] + " "}not found or not configured'
)
llm_model = None
conversation = await self.ap.sess_mgr.get_conversation(
query,
@@ -54,7 +60,7 @@ class PreProcessor(stage.PipelineStage):
query.prompt = conversation.prompt.copy()
query.messages = conversation.messages.copy()
if selected_runner == 'local-agent':
if selected_runner == 'local-agent' and llm_model:
query.use_funcs = []
query.use_llm_model_uuid = llm_model.model_entity.uuid
@@ -72,7 +78,11 @@ class PreProcessor(stage.PipelineStage):
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if selected_runner == 'local-agent' and not llm_model.model_entity.abilities.__contains__('vision'):
if (
selected_runner == 'local-agent'
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
@@ -89,7 +99,9 @@ class PreProcessor(stage.PipelineStage):
content_list.append(provider_message.ContentElement.from_text(me.text))
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or llm_model.model_entity.abilities.__contains__('vision'):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if me.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
elif isinstance(me, platform_message.File):
@@ -100,7 +112,9 @@ class PreProcessor(stage.PipelineStage):
if isinstance(msg, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or llm_model.model_entity.abilities.__contains__('vision'):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if msg.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))

View File

@@ -9,7 +9,6 @@ from .. import handler
from ... import entities
from ....provider import runner as runner_module
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.events as events
from ....utils import importutil
from ....provider import runners
@@ -47,18 +46,19 @@ class ChatMessageHandler(handler.MessageHandler):
event_ctx = await self.ap.plugin_connector.emit_event(event)
is_create_card = False # 判断下是否需要创建流式卡片
if event_ctx.is_prevented_default():
if event_ctx.event.reply is not None:
mc = platform_message.MessageChain(event_ctx.event.reply)
if event_ctx.event.reply_message_chain is not None:
mc = event_ctx.event.reply_message_chain
query.resp_messages.append(mc)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
else:
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
else:
if event_ctx.event.alter is not None:
if event_ctx.event.user_message_alter is not None:
# if isinstance(event_ctx.event, str): # 现在暂时不考虑多模态alter
query.user_message.content = event_ctx.event.alter
query.user_message.content = event_ctx.event.user_message_alter
text_length = 0
try:

View File

@@ -5,7 +5,6 @@ import typing
from .. import handler
from ... import entities
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.events as events
@@ -49,8 +48,8 @@ class CommandHandler(handler.MessageHandler):
event_ctx = await self.ap.plugin_connector.emit_event(event)
if event_ctx.is_prevented_default():
if event_ctx.event.reply is not None:
mc = platform_message.MessageChain(event_ctx.event.reply)
if event_ctx.event.reply_message_chain is not None:
mc = event_ctx.event.reply_message_chain
query.resp_messages.append(mc)
@@ -59,11 +58,6 @@ class CommandHandler(handler.MessageHandler):
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
else:
if event_ctx.event.alter is not None:
query.message_chain = platform_message.MessageChain(
[platform_message.Plain(text=event_ctx.event.alter)]
)
session = await self.ap.sess_mgr.get_session(query)
async for ret in self.ap.cmd_mgr.execute(
@@ -80,8 +74,12 @@ class CommandHandler(handler.MessageHandler):
self.ap.logger.info(f'Command({query.query_id}) error: {self.cut_str(str(ret.error))}')
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
elif (ret.text is not None or ret.image_url is not None or ret.image_base64 is not None
or ret.file_url is not None):
elif (
ret.text is not None
or ret.image_url is not None
or ret.image_base64 is not None
or ret.file_url is not None
):
content: list[provider_message.ContentElement] = []
if ret.text is not None:

View File

@@ -21,7 +21,7 @@ class AtBotRule(rule_model.GroupRespondRule):
def remove_at(message_chain: platform_message.MessageChain):
nonlocal found
for component in message_chain.root:
if isinstance(component, platform_message.At) and component.target == query.adapter.bot_account_id:
if isinstance(component, platform_message.At) and str(component.target) == str(query.adapter.bot_account_id):
message_chain.remove(component)
found = True
break

View File

@@ -80,8 +80,8 @@ class ResponseWrapper(stage.PipelineStage):
new_query=query,
)
else:
if event_ctx.event.reply is not None:
query.resp_message_chain.append(platform_message.MessageChain(event_ctx.event.reply))
if event_ctx.event.reply_message_chain is not None:
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
else:
query.resp_message_chain.append(result.get_content_platform_message_chain())
@@ -123,10 +123,8 @@ class ResponseWrapper(stage.PipelineStage):
new_query=query,
)
else:
if event_ctx.event.reply is not None:
query.resp_message_chain.append(
platform_message.MessageChain(text=event_ctx.event.reply)
)
if event_ctx.event.reply_message_chain is not None:
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
else:
query.resp_message_chain.append(

View File

@@ -139,19 +139,15 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter
event_converter: QQOfficialEventConverter = QQOfficialEventConverter()
def __init__(self, config: dict, logger: EventLogger):
self.config = config
self.logger = logger
bot = QQOfficialClient(
app_id=config['appid'], secret=config['secret'], token=config['token'], logger=logger
)
required_keys = [
'appid',
'secret',
]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise command_errors.ParamNotEnoughError('QQ官方机器人缺少相关配置项请查看文档或联系管理员')
self.bot = QQOfficialClient(
app_id=config['appid'], secret=config['secret'], token=config['token'], logger=self.logger
super().__init__(
config=config,
logger=logger,
bot=bot,
bot_account_id=config['appid'],
)
async def reply_message(

View File

@@ -58,6 +58,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
debug_messages: dict[str, list[dict]] = pydantic.Field(default_factory=dict, exclude=True)
ap: app.Application = pydantic.Field(exclude=True)
ws_pool: typing.Any = pydantic.Field(exclude=True, default=None) # WebSocketConnectionPool
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs):
super().__init__(
@@ -72,6 +73,15 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.bot_account_id = 'webchatbot'
self.debug_messages = {}
self.ws_pool = None
def set_ws_pool(self, ws_pool):
"""设置 WebSocket 连接池
Args:
ws_pool: WebSocketConnectionPool 实例
"""
self.ws_pool = ws_pool
async def send_message(
self,
@@ -130,7 +140,7 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
quote_origin: bool = False,
is_final: bool = False,
) -> dict:
"""回复消息"""
"""Reply message chunk - supports both SSE (legacy) and WebSocket"""
message_data = WebChatMessage(
id=-1,
role='assistant',
@@ -139,24 +149,32 @@ class WebChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
timestamp=datetime.now().isoformat(),
)
# notify waiter
session = (
self.webchat_group_session
if isinstance(message_source, platform_events.GroupMessage)
else self.webchat_person_session
)
if message_source.message_chain.message_id not in session.resp_waiters:
# session.resp_waiters[message_source.message_chain.message_id] = asyncio.Queue()
queue = session.resp_queues[message_source.message_chain.message_id]
# Determine session type
if isinstance(message_source, platform_events.GroupMessage):
session_type = 'group'
session = self.webchat_group_session
else: # FriendMessage
session_type = 'person'
session = self.webchat_person_session
# if isinstance(message_source, platform_events.FriendMessage):
# queue = self.webchat_person_session.resp_queues[message_source.message_chain.message_id]
# elif isinstance(message_source, platform_events.GroupMessage):
# queue = self.webchat_group_session.resp_queues[message_source.message_chain.message_id]
if is_final and bot_message.tool_calls is None:
message_data.is_final = True
# print(message_data)
await queue.put(message_data)
# Legacy SSE support: put message into queue
if message_source.message_chain.message_id in session.resp_queues:
queue = session.resp_queues[message_source.message_chain.message_id]
if is_final and bot_message.tool_calls is None:
message_data.is_final = True
await queue.put(message_data)
# WebSocket support: broadcast to all connections
if self.ws_pool:
pipeline_uuid = self.ap.platform_mgr.webchat_proxy_bot.bot_entity.use_pipeline_uuid
# Determine event type
event_type = 'message_complete' if (is_final and bot_message.tool_calls is None) else 'message_chunk'
# Broadcast to specified session only
await self.ws_pool.broadcast_to_session(
pipeline_uuid=pipeline_uuid, session_type=session_type, event_type=event_type, data=message_data.model_dump()
)
return message_data.model_dump()

View File

@@ -139,7 +139,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
pattern = r'@\S{1,20}'
content_no_preifx = re.sub(pattern, '', content_no_preifx)
return platform_message.MessageChain([platform_message.Plain(content_no_preifx)])
return platform_message.MessageChain([platform_message.Plain(text=content_no_preifx)])
async def _handler_image(self, message: Optional[dict], content_no_preifx: str) -> platform_message.MessageChain:
"""处理图像消息 (msg_type=3)"""
@@ -265,7 +265,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
# 文本消息
try:
if '<msg>' not in quote_data:
quote_data_message_list.append(platform_message.Plain(quote_data))
quote_data_message_list.append(platform_message.Plain(text=quote_data))
else:
# 引用消息展开
quote_data_xml = ET.fromstring(quote_data)
@@ -280,7 +280,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
quote_data_message_list.extend(await self._handler_compound(None, quote_data))
except Exception as e:
self.logger.error(f'处理引用消息异常 expcetion:{e}')
quote_data_message_list.append(platform_message.Plain(quote_data))
quote_data_message_list.append(platform_message.Plain(text=quote_data))
message_list.append(
platform_message.Quote(
sender_id=sender_id,
@@ -290,7 +290,7 @@ class WeChatPadMessageConverter(abstract_platform_adapter.AbstractMessageConvert
if len(user_data) > 0:
pattern = r'@\S{1,20}'
user_data = re.sub(pattern, '', user_data)
message_list.append(platform_message.Plain(user_data))
message_list.append(platform_message.Plain(text=user_data))
return platform_message.MessageChain(message_list)
@@ -543,7 +543,6 @@ class WeChatPadAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
] = {}
def __init__(self, config: dict, logger: EventLogger):
quart_app = quart.Quart(__name__)
message_converter = WeChatPadMessageConverter(config, logger)
@@ -551,15 +550,14 @@ class WeChatPadAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
bot = WeChatPadClient(config['wechatpad_url'], config['token'])
super().__init__(
config=config,
logger = logger,
quart_app = quart_app,
message_converter =message_converter,
event_converter = event_converter,
logger=logger,
quart_app=quart_app,
message_converter=message_converter,
event_converter=event_converter,
listeners={},
bot_account_id ='',
name="WeChatPad",
bot_account_id='',
name='WeChatPad',
bot=bot,
)
async def ws_message(self, data):

View File

@@ -18,7 +18,7 @@ from langbot_plugin.api.entities import events
from langbot_plugin.api.entities import context
import langbot_plugin.runtime.io.connection as base_connection
from langbot_plugin.api.definition.components.manifest import ComponentManifest
from langbot_plugin.api.entities.builtin.command import context as command_context
from langbot_plugin.api.entities.builtin.command import context as command_context, errors as command_errors
from langbot_plugin.runtime.plugin.mgr import PluginInstallSource
from ..core import taskmgr
@@ -191,6 +191,9 @@ class PluginRuntimeConnector:
task_context.trace(trace)
async def list_plugins(self) -> list[dict[str, Any]]:
if not self.is_enable_plugin:
return []
return await self.handler.list_plugins()
async def get_plugin_info(self, author: str, plugin_name: str) -> dict[str, Any]:
@@ -211,21 +214,31 @@ class PluginRuntimeConnector:
if not self.is_enable_plugin:
return event_ctx
event_ctx_result = await self.handler.emit_event(event_ctx.model_dump(serialize_as_any=True))
event_ctx_result = await self.handler.emit_event(event_ctx.model_dump(serialize_as_any=False))
event_ctx = context.EventContext.model_validate(event_ctx_result['event_context'])
return event_ctx
async def list_tools(self) -> list[ComponentManifest]:
if not self.is_enable_plugin:
return []
list_tools_data = await self.handler.list_tools()
return [ComponentManifest.model_validate(tool) for tool in list_tools_data]
async def call_tool(self, tool_name: str, parameters: dict[str, Any]) -> dict[str, Any]:
if not self.is_enable_plugin:
return {'error': 'Tool not found: plugin system is disabled'}
return await self.handler.call_tool(tool_name, parameters)
async def list_commands(self) -> list[ComponentManifest]:
if not self.is_enable_plugin:
return []
list_commands_data = await self.handler.list_commands()
return [ComponentManifest.model_validate(command) for command in list_commands_data]
@@ -233,6 +246,9 @@ class PluginRuntimeConnector:
async def execute_command(
self, command_ctx: command_context.ExecuteContext
) -> typing.AsyncGenerator[command_context.CommandReturn, None]:
if not self.is_enable_plugin:
yield command_context.CommandReturn(error=command_errors.CommandNotFoundError(command_ctx.command))
gen = self.handler.execute_command(command_ctx.model_dump(serialize_as_any=True))
async for ret in gen:

View File

@@ -0,0 +1,312 @@
from __future__ import annotations
import typing
import json
import uuid
import base64
from .. import runner
from ...core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from ...utils import image
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from libs.coze_server_api.client import AsyncCozeAPIClient
@runner.runner_class('coze-api')
class CozeAPIRunner(runner.RequestRunner):
"""Coze API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.pipeline_config = pipeline_config
self.ap = ap
self.agent_token = pipeline_config["ai"]['coze-api']['api-key']
self.bot_id = pipeline_config["ai"]['coze-api'].get('bot-id')
self.chat_timeout = pipeline_config["ai"]['coze-api'].get('timeout')
self.auto_save_history = pipeline_config["ai"]['coze-api'].get('auto_save_history')
self.api_base = pipeline_config["ai"]['coze-api'].get('api-base')
self.coze = AsyncCozeAPIClient(
self.agent_token,
self.api_base
)
def _process_thinking_content(
self,
content: str,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
Returns:
(处理后的内容, 提取的思维链内容)
"""
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
thinking_content = ''
# 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
async def _preprocess_user_message(self, query: pipeline_query.Query) -> list[dict]:
"""预处理用户消息转换为Coze消息格式
Returns:
list[dict]: Coze消息列表
"""
messages = []
if isinstance(query.user_message.content, list):
# 多模态消息处理
content_parts = []
for ce in query.user_message.content:
if ce.type == 'text':
content_parts.append({"type": "text", "text": ce.text})
elif ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
file_bytes = base64.b64decode(image_b64)
file_id = await self._get_file_id(file_bytes)
content_parts.append({"type": "image", "file_id": file_id})
elif ce.type == 'file':
# 处理文件上传到Coze
file_id = await self._get_file_id(ce.file)
content_parts.append({"type": "file", "file_id": file_id})
# 创建多模态消息
if content_parts:
messages.append({
"role": "user",
"content": json.dumps(content_parts),
"content_type": "object_string",
"meta_data": None
})
elif isinstance(query.user_message.content, str):
# 纯文本消息
messages.append({
"role": "user",
"content": query.user_message.content,
"content_type": "text",
"meta_data": None
})
return messages
async def _get_file_id(self, file) -> str:
"""上传文件到Coze服务
Args:
file: 文件
Returns:
str: 文件ID
"""
file_id = await self.coze.upload(file=file)
return file_id
async def _chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手(非流式)
注意由于cozepy没有提供非流式API这里使用流式API并在结束后一次性返回完整内容
"""
user_id = f'{query.launcher_id}_{query.sender_id}'
# 预处理用户消息
additional_messages = await self._preprocess_user_message(query)
# 获取会话ID
conversation_id = None
# 收集完整内容
full_content = ''
full_reasoning = ''
try:
# 调用Coze API流式接口
async for chunk in self.coze.chat_messages(
bot_id=self.bot_id,
user_id=user_id,
additional_messages=additional_messages,
conversation_id=conversation_id,
timeout=self.chat_timeout,
auto_save_history=self.auto_save_history,
stream=True
):
self.ap.logger.debug(f'coze-chat-stream: {chunk}')
event_type = chunk.get('event')
data = chunk.get('data', {})
if event_type == 'conversation.message.delta':
# 收集内容
if 'content' in data:
full_content += data.get('content', '')
# 收集推理内容(如果有)
if 'reasoning_content' in data:
full_reasoning += data.get('reasoning_content', '')
elif event_type == 'done':
# 保存会话ID
if 'conversation_id' in data:
conversation_id = data.get('conversation_id')
elif event_type == 'error':
# 处理错误
error_msg = f"Coze API错误: {data.get('message', '未知错误')}"
yield provider_message.Message(
role='assistant',
content=error_msg,
)
return
# 处理思维链内容
content, thinking_content = self._process_thinking_content(full_content)
if full_reasoning:
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
if not remove_think:
content = f'<think>\n{full_reasoning}\n</think>\n{content}'.strip()
# 一次性返回完整内容
yield provider_message.Message(
role='assistant',
content=content,
)
# 保存会话ID
if conversation_id and query.session.using_conversation:
query.session.using_conversation.uuid = conversation_id
except Exception as e:
self.ap.logger.error(f'Coze API错误: {str(e)}')
yield provider_message.Message(
role='assistant',
content=f'Coze API调用失败: {str(e)}',
)
async def _chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手(流式)"""
user_id = f'{query.launcher_id}_{query.sender_id}'
# 预处理用户消息
additional_messages = await self._preprocess_user_message(query)
# 获取会话ID
conversation_id = None
start_reasoning = False
stop_reasoning = False
message_idx = 1
is_final = False
full_content = ''
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
try:
# 调用Coze API流式接口
async for chunk in self.coze.chat_messages(
bot_id=self.bot_id,
user_id=user_id,
additional_messages=additional_messages,
conversation_id=conversation_id,
timeout=self.chat_timeout,
auto_save_history=self.auto_save_history,
stream=True
):
self.ap.logger.debug(f'coze-chat-stream-chunk: {chunk}')
event_type = chunk.get('event')
data = chunk.get('data', {})
content = ""
if event_type == 'conversation.message.delta':
message_idx += 1
# 处理内容增量
if "reasoning_content" in data and not remove_think:
reasoning_content = data.get('reasoning_content', '')
if reasoning_content and not start_reasoning:
content = f"<think/>\n"
start_reasoning = True
content += reasoning_content
if 'content' in data:
if data.get('content', ''):
content += data.get('content', '')
if not stop_reasoning and start_reasoning:
content = f"</think>\n{content}"
stop_reasoning = True
elif event_type == 'done':
# 保存会话ID
if 'conversation_id' in data:
conversation_id = data.get('conversation_id')
if query.session.using_conversation:
query.session.using_conversation.uuid = conversation_id
is_final = True
elif event_type == 'error':
# 处理错误
error_msg = f"Coze API错误: {data.get('message', '未知错误')}"
yield provider_message.MessageChunk(
role='assistant',
content=error_msg,
finish_reason='error'
)
return
full_content += content
if message_idx % 8 == 0 or is_final:
if full_content:
yield provider_message.MessageChunk(
role='assistant',
content=full_content,
is_final=is_final
)
except Exception as e:
self.ap.logger.error(f'Coze API流式调用错误: {str(e)}')
yield provider_message.MessageChunk(
role='assistant',
content=f'Coze API流式调用失败: {str(e)}',
finish_reason='error'
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行"""
msg_seq = 0
if await query.adapter.is_stream_output_supported():
async for msg in self._chat_messages_chunk(query):
if isinstance(msg, provider_message.MessageChunk):
msg_seq += 1
msg.msg_sequence = msg_seq
yield msg
else:
async for msg in self._chat_messages(query):
yield msg

View File

@@ -1,4 +1,4 @@
semantic_version = 'v4.3.5'
semantic_version = 'v4.3.9'
required_database_version = 8
"""Tag the version of the database schema, used to check if the database needs to be migrated"""

View File

@@ -1,6 +1,6 @@
[project]
name = "langbot"
version = "4.3.5"
version = "4.3.9"
description = "Easy-to-use global IM bot platform designed for LLM era"
readme = "README.md"
requires-python = ">=3.10.1,<4.0"
@@ -62,7 +62,7 @@ dependencies = [
"langchain>=0.2.0",
"chromadb>=0.4.24",
"qdrant-client (>=1.15.1,<2.0.0)",
"langbot-plugin==0.1.3",
"langbot-plugin==0.1.4",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"tboxsdk>=0.0.10",

View File

@@ -41,4 +41,4 @@ plugin:
enable: true
runtime_ws_url: 'ws://langbot_plugin_runtime:5400/control/ws'
enable_marketplace: true
cloud_service_url: 'https://space.langbot.app'
cloud_service_url: 'https://space.langbot.app'

View File

@@ -43,6 +43,10 @@ stages:
label:
en_US: Langflow API
zh_Hans: Langflow API
- name: coze-api
label:
en_US: Coze API
zh_Hans: 扣子 API
- name: local-agent
label:
en_US: Local Agent
@@ -380,4 +384,57 @@ stages:
zh_Hans: 可选的流程调整参数
type: json
required: false
default: '{}'
default: '{}'
- name: coze-api
label:
en_US: coze API
zh_Hans: 扣子 API
description:
en_US: Configure the Coze API of the pipeline
zh_Hans: 配置Coze API
config:
- name: api-key
label:
en_US: API Key
zh_Hans: API 密钥
description:
en_US: The API key for the Coze server
zh_Hans: Coze服务器的 API 密钥
type: string
required: true
- name: bot-id
label:
en_US: Bot ID
zh_Hans: 机器人 ID
description:
en_US: The ID of the bot to run
zh_Hans: 要运行的机器人 ID
type: string
required: true
- name: api-base
label:
en_US: API Base URL
zh_Hans: API 基础 URL
description:
en_US: The base URL for the Coze API, please use https://api.coze.com for global Coze edition(coze.com).
zh_Hans: Coze API 的基础 URL请使用 https://api.coze.com 用于全球 Coze 版coze.com
type: string
default: "https://api.coze.cn"
- name: auto-save-history
label:
en_US: Auto Save History
zh_Hans: 自动保存历史
description:
en_US: Whether to automatically save conversation history
zh_Hans: 是否自动保存对话历史
type: boolean
default: true
- name: timeout
label:
en_US: Request Timeout
zh_Hans: 请求超时
description:
en_US: Timeout in seconds for API requests
zh_Hans: API 请求超时时间(秒)
type: number
default: 120

View File

@@ -11,6 +11,10 @@ import { Message } from '@/app/infra/entities/message';
import { toast } from 'sonner';
import AtBadge from './AtBadge';
import { Switch } from '@/components/ui/switch';
import {
PipelineWebSocketClient,
SessionType,
} from '@/app/infra/websocket/PipelineWebSocketClient';
interface MessageComponent {
type: 'At' | 'Plain';
@@ -31,17 +35,27 @@ export default function DebugDialog({
}: DebugDialogProps) {
const { t } = useTranslation();
const [selectedPipelineId, setSelectedPipelineId] = useState(pipelineId);
const [sessionType, setSessionType] = useState<'person' | 'group'>('person');
const [sessionType, setSessionType] = useState<SessionType>('person');
const [messages, setMessages] = useState<Message[]>([]);
const [inputValue, setInputValue] = useState('');
const [showAtPopover, setShowAtPopover] = useState(false);
const [hasAt, setHasAt] = useState(false);
const [isHovering, setIsHovering] = useState(false);
const [isStreaming, setIsStreaming] = useState(true);
const messagesEndRef = useRef<HTMLDivElement>(null);
const inputRef = useRef<HTMLInputElement>(null);
const popoverRef = useRef<HTMLDivElement>(null);
// WebSocket states
const [wsClient, setWsClient] = useState<PipelineWebSocketClient | null>(
null,
);
const [connectionStatus, setConnectionStatus] = useState<
'disconnected' | 'connecting' | 'connected'
>('disconnected');
const [pendingMessages, setPendingMessages] = useState<
Map<string, Message>
>(new Map());
const scrollToBottom = useCallback(() => {
// 使用setTimeout确保在DOM更新后执行滚动
setTimeout(() => {
@@ -57,37 +71,161 @@ export default function DebugDialog({
}, 0);
}, []);
const loadMessages = useCallback(
async (pipelineId: string) => {
try {
const response = await httpClient.getWebChatHistoryMessages(
pipelineId,
sessionType,
);
setMessages(response.messages);
} catch (error) {
console.error('Failed to load messages:', error);
}
},
[sessionType],
);
// 在useEffect中监听messages变化时滚动
// Scroll to bottom when messages change
useEffect(() => {
scrollToBottom();
}, [messages, scrollToBottom]);
// WebSocket connection setup
useEffect(() => {
if (open) {
setSelectedPipelineId(pipelineId);
loadMessages(pipelineId);
}
}, [open, pipelineId]);
if (!open) return;
useEffect(() => {
if (open) {
loadMessages(selectedPipelineId);
}
}, [sessionType, selectedPipelineId, open, loadMessages]);
const client = new PipelineWebSocketClient(
selectedPipelineId,
sessionType,
);
// Setup event handlers
client.onConnected = (data) => {
console.log('[DebugDialog] WebSocket connected:', data);
setConnectionStatus('connected');
// Load history messages after connection
client.loadHistory();
};
client.onHistory = (data) => {
console.log('[DebugDialog] History loaded:', data?.messages.length);
if (data) {
setMessages(data.messages);
}
};
client.onMessageSent = (data) => {
console.log('[DebugDialog] Message sent confirmed:', data);
if (data) {
// Update client message ID to server message ID
const clientMsgId = data.client_message_id;
const serverMsgId = data.server_message_id;
setMessages((prev) =>
prev.map((msg) =>
msg.id === -1 && pendingMessages.has(clientMsgId)
? { ...msg, id: serverMsgId }
: msg,
),
);
setPendingMessages((prev) => {
const newMap = new Map(prev);
newMap.delete(clientMsgId);
return newMap;
});
}
};
client.onMessageStart = (data) => {
console.log('[DebugDialog] Message start:', data);
if (data) {
const placeholderMessage: Message = {
id: data.message_id,
role: 'assistant',
content: '',
message_chain: [],
timestamp: data.timestamp,
};
setMessages((prev) => [...prev, placeholderMessage]);
}
};
client.onMessageChunk = (data) => {
if (data) {
// Update streaming message (content is cumulative)
setMessages((prev) =>
prev.map((msg) =>
msg.id === data.message_id
? {
...msg,
content: data.content,
message_chain: data.message_chain,
}
: msg,
),
);
}
};
client.onMessageComplete = (data) => {
console.log('[DebugDialog] Message complete:', data);
if (data) {
// Mark message as complete
setMessages((prev) =>
prev.map((msg) =>
msg.id === data.message_id
? {
...msg,
content: data.final_content,
message_chain: data.message_chain,
}
: msg,
),
);
}
};
client.onMessageError = (data) => {
console.error('[DebugDialog] Message error:', data);
if (data) {
toast.error(`Message error: ${data.error}`);
}
};
client.onPluginMessage = (data) => {
console.log('[DebugDialog] Plugin message:', data);
if (data) {
const pluginMessage: Message = {
id: data.message_id,
role: 'assistant',
content: data.content,
message_chain: data.message_chain,
timestamp: data.timestamp,
};
setMessages((prev) => [...prev, pluginMessage]);
}
};
client.onError = (data) => {
console.error('[DebugDialog] WebSocket error:', data);
if (data) {
toast.error(`WebSocket error: ${data.error}`);
}
};
client.onDisconnected = () => {
console.log('[DebugDialog] WebSocket disconnected');
setConnectionStatus('disconnected');
};
// Connect to WebSocket
setConnectionStatus('connecting');
client
.connect(httpClient.getSessionSync())
.then(() => {
console.log('[DebugDialog] WebSocket connection established');
})
.catch((err) => {
console.error('[DebugDialog] Failed to connect WebSocket:', err);
toast.error('Failed to connect to server');
setConnectionStatus('disconnected');
});
setWsClient(client);
// Cleanup on unmount or session type change
return () => {
console.log('[DebugDialog] Cleaning up WebSocket connection');
client.disconnect();
};
}, [open, selectedPipelineId, sessionType]); // Reconnect when session type changes
useEffect(() => {
const handleClickOutside = (event: MouseEvent) => {
@@ -150,6 +288,12 @@ export default function DebugDialog({
const sendMessage = async () => {
if (!inputValue.trim() && !hasAt) return;
// Check WebSocket connection
if (!wsClient || connectionStatus !== 'connected') {
toast.error('Not connected to server');
return;
}
try {
const messageChain = [];
@@ -170,7 +314,7 @@ export default function DebugDialog({
});
if (hasAt) {
// for showing
// For display
text_content = '@webchatbot' + text_content;
}
@@ -181,97 +325,26 @@ export default function DebugDialog({
timestamp: new Date().toISOString(),
message_chain: messageChain,
};
// 根据isStreaming状态决定使用哪种传输方式
if (isStreaming) {
// streaming
// 创建初始bot消息
const placeholderRandomId = Math.floor(Math.random() * 1000000);
const botMessagePlaceholder: Message = {
id: placeholderRandomId,
role: 'assistant',
content: 'Generating...',
timestamp: new Date().toISOString(),
message_chain: [{ type: 'Plain', text: 'Generating...' }],
};
// 添加用户消息和初始bot消息到状态
// Add user message to UI immediately
setMessages((prevMessages) => [...prevMessages, userMessage]);
setInputValue('');
setHasAt(false);
setMessages((prevMessages) => [
...prevMessages,
userMessage,
botMessagePlaceholder,
]);
setInputValue('');
setHasAt(false);
try {
await httpClient.sendStreamingWebChatMessage(
sessionType,
messageChain,
selectedPipelineId,
(data) => {
// 处理流式响应数据
console.log('data', data);
if (data.message) {
// 更新完整内容
// Send via WebSocket
const clientMessageId = wsClient.sendMessage(messageChain);
setMessages((prevMessages) => {
const updatedMessages = [...prevMessages];
const botMessageIndex = updatedMessages.findIndex(
(message) => message.id === placeholderRandomId,
);
if (botMessageIndex !== -1) {
updatedMessages[botMessageIndex] = {
...updatedMessages[botMessageIndex],
content: data.message.content,
message_chain: [
{ type: 'Plain', text: data.message.content },
],
};
}
return updatedMessages;
});
}
},
() => {},
(error) => {
// 处理错误
console.error('Streaming error:', error);
if (sessionType === 'person') {
toast.error(t('pipelines.debugDialog.sendFailed'));
}
},
);
} catch (error) {
console.error('Failed to send streaming message:', error);
if (sessionType === 'person') {
toast.error(t('pipelines.debugDialog.sendFailed'));
}
}
} else {
// non-streaming
setMessages((prevMessages) => [...prevMessages, userMessage]);
setInputValue('');
setHasAt(false);
// Track pending message for ID mapping
setPendingMessages((prev) => {
const newMap = new Map(prev);
newMap.set(clientMessageId, userMessage);
return newMap;
});
const response = await httpClient.sendWebChatMessage(
sessionType,
messageChain,
selectedPipelineId,
180000,
);
setMessages((prevMessages) => [...prevMessages, response.message]);
}
} catch (
// eslint-disable-next-line @typescript-eslint/no-explicit-any
error: any
) {
console.log(error, 'type of error', typeof error);
console.error('Failed to send message:', error);
if (!error.message.includes('timeout') && sessionType === 'person') {
toast.error(t('pipelines.debugDialog.sendFailed'));
}
console.log('[DebugDialog] Message sent:', clientMessageId);
} catch (error) {
console.error('[DebugDialog] Failed to send message:', error);
toast.error(t('pipelines.debugDialog.sendFailed'));
} finally {
inputRef.current?.focus();
}
@@ -390,12 +463,6 @@ export default function DebugDialog({
</ScrollArea>
<div className="p-4 pb-0 bg-white dark:bg-black flex gap-2">
<div className="flex items-center gap-2">
<span className="text-sm text-gray-600">
{t('pipelines.debugDialog.streaming')}
</span>
<Switch checked={isStreaming} onCheckedChange={setIsStreaming} />
</div>
<div className="flex-1 flex items-center gap-2">
{hasAt && (
<AtBadge targetName="webchatbot" onRemove={handleAtRemove} />

View File

@@ -29,7 +29,17 @@ export default function PluginConfigPage() {
const [sortOrderValue, setSortOrderValue] = useState<string>('DESC');
useEffect(() => {
getPipelines();
// Load sort preference from localStorage
const savedSortBy = localStorage.getItem('pipeline_sort_by');
const savedSortOrder = localStorage.getItem('pipeline_sort_order');
if (savedSortBy && savedSortOrder) {
setSortByValue(savedSortBy);
setSortOrderValue(savedSortOrder);
getPipelines(savedSortBy, savedSortOrder);
} else {
getPipelines();
}
}, []);
function getPipelines(
@@ -91,6 +101,11 @@ export default function PluginConfigPage() {
const [newSortBy, newSortOrder] = value.split(',').map((s) => s.trim());
setSortByValue(newSortBy);
setSortOrderValue(newSortOrder);
// Save sort preference to localStorage
localStorage.setItem('pipeline_sort_by', newSortBy);
localStorage.setItem('pipeline_sort_order', newSortOrder);
getPipelines(newSortBy, newSortOrder);
}
@@ -135,6 +150,12 @@ export default function PluginConfigPage() {
>
{t('pipelines.newestCreated')}
</SelectItem>
<SelectItem
value="created_at,ASC"
className="text-gray-900 dark:text-gray-100"
>
{t('pipelines.earliestCreated')}
</SelectItem>
<SelectItem
value="updated_at,DESC"
className="text-gray-900 dark:text-gray-100"

View File

@@ -0,0 +1,75 @@
import { PluginComponent } from '@/app/infra/entities/plugin';
import { TFunction } from 'i18next';
import { Wrench, AudioWaveform, Hash } from 'lucide-react';
import { Badge } from '@/components/ui/badge';
export default function PluginComponentList({
components,
showComponentName,
showTitle,
useBadge,
t,
}: {
components: PluginComponent[];
showComponentName: boolean;
showTitle: boolean;
useBadge: boolean;
t: TFunction;
}) {
const componentKindCount: Record<string, number> = {};
for (const component of components) {
const kind = component.manifest.manifest.kind;
if (componentKindCount[kind]) {
componentKindCount[kind]++;
} else {
componentKindCount[kind] = 1;
}
}
const kindIconMap: Record<string, React.ReactNode> = {
Tool: <Wrench className="w-5 h-5" />,
EventListener: <AudioWaveform className="w-5 h-5" />,
Command: <Hash className="w-5 h-5" />,
};
const componentKindList = Object.keys(componentKindCount);
return (
<>
{showTitle && <div>{t('plugins.componentsList')}</div>}
{componentKindList.length > 0 && (
<>
{componentKindList.map((kind) => {
return (
<>
{useBadge && (
<Badge variant="outline">
{kindIconMap[kind]}
{showComponentName &&
t('plugins.componentName.' + kind) + ' '}
{componentKindCount[kind]}
</Badge>
)}
{!useBadge && (
<div
key={kind}
className="flex flex-row items-center justify-start gap-[0.2rem]"
>
{kindIconMap[kind]}
{showComponentName &&
t('plugins.componentName.' + kind) + ' '}
{componentKindCount[kind]}
</div>
)}
</>
);
})}
</>
)}
{componentKindList.length === 0 && <div>{t('plugins.noComponents')}</div>}
</>
);
}

View File

@@ -1,9 +1,9 @@
'use client';
import { useState, useEffect, forwardRef, useImperativeHandle } from 'react';
import { PluginCardVO } from '@/app/home/plugins/plugin-installed/PluginCardVO';
import PluginCardComponent from '@/app/home/plugins/plugin-installed/plugin-card/PluginCardComponent';
import PluginForm from '@/app/home/plugins/plugin-installed/plugin-form/PluginForm';
import { PluginCardVO } from '@/app/home/plugins/components/plugin-installed/PluginCardVO';
import PluginCardComponent from '@/app/home/plugins/components/plugin-installed/plugin-card/PluginCardComponent';
import PluginForm from '@/app/home/plugins/components/plugin-installed/plugin-form/PluginForm';
import styles from '@/app/home/plugins/plugins.module.css';
import { httpClient } from '@/app/infra/http/HttpClient';
import {

View File

@@ -1,21 +1,10 @@
import { PluginCardVO } from '@/app/home/plugins/plugin-installed/PluginCardVO';
import { PluginCardVO } from '@/app/home/plugins/components/plugin-installed/PluginCardVO';
import { useState } from 'react';
import { Badge } from '@/components/ui/badge';
import { useTranslation } from 'react-i18next';
import { TFunction } from 'i18next';
import {
AudioWaveform,
Wrench,
Hash,
BugIcon,
ExternalLink,
Ellipsis,
Trash,
ArrowUp,
} from 'lucide-react';
import { BugIcon, ExternalLink, Ellipsis, Trash, ArrowUp } from 'lucide-react';
import { getCloudServiceClientSync } from '@/app/infra/http';
import { httpClient } from '@/app/infra/http/HttpClient';
import { PluginComponent } from '@/app/infra/entities/plugin';
import { Button } from '@/components/ui/button';
import {
DropdownMenu,
@@ -23,49 +12,7 @@ import {
DropdownMenuItem,
DropdownMenuTrigger,
} from '@/components/ui/dropdown-menu';
function getComponentList(components: PluginComponent[], t: TFunction) {
const componentKindCount: Record<string, number> = {};
for (const component of components) {
const kind = component.manifest.manifest.kind;
if (componentKindCount[kind]) {
componentKindCount[kind]++;
} else {
componentKindCount[kind] = 1;
}
}
const kindIconMap: Record<string, React.ReactNode> = {
Tool: <Wrench className="w-5 h-5" />,
EventListener: <AudioWaveform className="w-5 h-5" />,
Command: <Hash className="w-5 h-5" />,
};
const componentKindList = Object.keys(componentKindCount);
return (
<>
<div>{t('plugins.componentsList')}</div>
{componentKindList.length > 0 && (
<>
{componentKindList.map((kind) => {
return (
<div
key={kind}
className="flex flex-row items-center justify-start gap-[0.4rem]"
>
{kindIconMap[kind]} {componentKindCount[kind]}
</div>
);
})}
</>
)}
{componentKindList.length === 0 && <div>{t('plugins.noComponents')}</div>}
</>
);
}
import PluginComponentList from '@/app/home/plugins/components/plugin-installed/PluginComponentList';
export default function PluginCardComponent({
cardVO,
@@ -180,7 +127,13 @@ export default function PluginCardComponent({
</div>
<div className="w-full flex flex-row items-start justify-start gap-[0.6rem]">
{getComponentList(cardVO.components, t)}
<PluginComponentList
components={cardVO.components}
showComponentName={false}
showTitle={true}
useBadge={false}
t={t}
/>
</div>
</div>

View File

@@ -7,6 +7,7 @@ import { Button } from '@/components/ui/button';
import { toast } from 'sonner';
import { extractI18nObject } from '@/i18n/I18nProvider';
import { useTranslation } from 'react-i18next';
import PluginComponentList from '@/app/home/plugins/components/plugin-installed/PluginComponentList';
export default function PluginForm({
pluginAuthor,
@@ -78,6 +79,17 @@ export default function PluginForm({
},
)}
</div>
<div className="mb-4 flex flex-row items-center justify-start gap-[0.4rem]">
<PluginComponentList
components={pluginInfo.components}
showComponentName={true}
showTitle={false}
useBadge={true}
t={t}
/>
</div>
{pluginInfo.manifest.manifest.spec.config.length > 0 && (
<DynamicFormComponent
itemConfigList={pluginInfo.manifest.manifest.spec.config}

View File

@@ -1,8 +1,8 @@
'use client';
import PluginInstalledComponent, {
PluginInstalledComponentRef,
} from '@/app/home/plugins/plugin-installed/PluginInstalledComponent';
import MarketPage from '@/app/home/plugins/plugin-market/PluginMarketComponent';
} from '@/app/home/plugins/components/plugin-installed/PluginInstalledComponent';
import MarketPage from '@/app/home/plugins/components/plugin-market/PluginMarketComponent';
// import PluginSortDialog from '@/app/home/plugins/plugin-sort/PluginSortDialog';
import styles from './plugins.module.css';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';

View File

@@ -0,0 +1,394 @@
/**
* Pipeline WebSocket Client
*
* Provides real-time bidirectional communication for pipeline debugging.
* Supports person and group session isolation.
*/
import { Message } from '@/app/infra/entities/message';
export type SessionType = 'person' | 'group';
export interface WebSocketEventData {
// Connected event
connected?: {
connection_id: string;
session_type: SessionType;
pipeline_uuid: string;
};
// History event
history?: {
messages: Message[];
has_more: boolean;
};
// Message sent confirmation
message_sent?: {
client_message_id: string;
server_message_id: number;
timestamp: string;
};
// Message start
message_start?: {
message_id: number;
role: 'assistant';
timestamp: string;
reply_to: number;
};
// Message chunk
message_chunk?: {
message_id: number;
content: string;
message_chain: object[];
timestamp: string;
};
// Message complete
message_complete?: {
message_id: number;
final_content: string;
message_chain: object[];
timestamp: string;
};
// Message error
message_error?: {
message_id: number;
error: string;
error_code?: string;
};
// Interrupted
interrupted?: {
message_id: number;
partial_content: string;
};
// Plugin message
plugin_message?: {
message_id: number;
role: 'assistant';
content: string;
message_chain: object[];
timestamp: string;
source: 'plugin';
};
// Error
error?: {
error: string;
error_code: string;
details?: object;
};
// Pong
pong?: {
timestamp: number;
};
}
export class PipelineWebSocketClient {
private ws: WebSocket | null = null;
private pipelineId: string;
private sessionType: SessionType;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private pingInterval: NodeJS.Timeout | null = null;
private reconnectTimeout: NodeJS.Timeout | null = null;
private isManualDisconnect = false;
// Event callbacks
public onConnected?: (data: WebSocketEventData['connected']) => void;
public onHistory?: (data: WebSocketEventData['history']) => void;
public onMessageSent?: (data: WebSocketEventData['message_sent']) => void;
public onMessageStart?: (data: WebSocketEventData['message_start']) => void;
public onMessageChunk?: (data: WebSocketEventData['message_chunk']) => void;
public onMessageComplete?: (
data: WebSocketEventData['message_complete'],
) => void;
public onMessageError?: (data: WebSocketEventData['message_error']) => void;
public onInterrupted?: (data: WebSocketEventData['interrupted']) => void;
public onPluginMessage?: (data: WebSocketEventData['plugin_message']) => void;
public onError?: (data: WebSocketEventData['error']) => void;
public onDisconnected?: () => void;
constructor(pipelineId: string, sessionType: SessionType) {
this.pipelineId = pipelineId;
this.sessionType = sessionType;
}
/**
* Connect to WebSocket server
*/
connect(token: string): Promise<void> {
return new Promise((resolve, reject) => {
this.isManualDisconnect = false;
const wsUrl = this.buildWebSocketUrl();
console.log(`[WebSocket] Connecting to ${wsUrl}...`);
try {
this.ws = new WebSocket(wsUrl);
} catch (error) {
console.error('[WebSocket] Failed to create WebSocket:', error);
reject(error);
return;
}
this.ws.onopen = () => {
console.log('[WebSocket] Connection opened');
// Send connect event with session type and token
this.send('connect', {
pipeline_uuid: this.pipelineId,
session_type: this.sessionType,
token,
});
// Start ping interval
this.startPing();
// Reset reconnect attempts on successful connection
this.reconnectAttempts = 0;
resolve();
};
this.ws.onmessage = (event) => {
this.handleMessage(event);
};
this.ws.onerror = (error) => {
console.error('[WebSocket] Error:', error);
reject(error);
};
this.ws.onclose = (event) => {
console.log(
`[WebSocket] Connection closed: code=${event.code}, reason=${event.reason}`,
);
this.handleDisconnect();
};
});
}
/**
* Handle incoming WebSocket message
*/
private handleMessage(event: MessageEvent) {
try {
const message = JSON.parse(event.data);
const { type, data } = message;
console.log(`[WebSocket] Received: ${type}`, data);
switch (type) {
case 'connected':
this.onConnected?.(data);
break;
case 'history':
this.onHistory?.(data);
break;
case 'message_sent':
this.onMessageSent?.(data);
break;
case 'message_start':
this.onMessageStart?.(data);
break;
case 'message_chunk':
this.onMessageChunk?.(data);
break;
case 'message_complete':
this.onMessageComplete?.(data);
break;
case 'message_error':
this.onMessageError?.(data);
break;
case 'interrupted':
this.onInterrupted?.(data);
break;
case 'plugin_message':
this.onPluginMessage?.(data);
break;
case 'error':
this.onError?.(data);
break;
case 'pong':
// Heartbeat response, no action needed
break;
default:
console.warn(`[WebSocket] Unknown message type: ${type}`);
}
} catch (error) {
console.error('[WebSocket] Failed to parse message:', error);
}
}
/**
* Send message to server
*/
sendMessage(messageChain: object[]): string {
const clientMessageId = this.generateMessageId();
this.send('send_message', {
message_chain: messageChain,
client_message_id: clientMessageId,
});
return clientMessageId;
}
/**
* Load history messages
*/
loadHistory(beforeMessageId?: number, limit?: number) {
this.send('load_history', {
before_message_id: beforeMessageId,
limit,
});
}
/**
* Interrupt streaming message
*/
interrupt(messageId: number) {
this.send('interrupt', { message_id: messageId });
}
/**
* Send event to server
*/
private send(type: string, data: object) {
if (this.ws?.readyState === WebSocket.OPEN) {
const message = JSON.stringify({ type, data });
console.log(`[WebSocket] Sending: ${type}`, data);
this.ws.send(message);
} else {
console.warn(
`[WebSocket] Cannot send message, connection not open (state: ${this.ws?.readyState})`,
);
}
}
/**
* Start ping interval (heartbeat)
*/
private startPing() {
this.stopPing();
this.pingInterval = setInterval(() => {
this.send('ping', { timestamp: Date.now() });
}, 30000); // Ping every 30 seconds
}
/**
* Stop ping interval
*/
private stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
/**
* Handle disconnection
*/
private handleDisconnect() {
this.stopPing();
this.onDisconnected?.();
// Auto reconnect if not manual disconnect
if (
!this.isManualDisconnect &&
this.reconnectAttempts < this.maxReconnectAttempts
) {
const delay = Math.min(2000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(
`[WebSocket] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`,
);
this.reconnectTimeout = setTimeout(() => {
this.reconnectAttempts++;
// Note: Need to get token again, should be handled by caller
console.warn(
'[WebSocket] Auto-reconnect requires token, please reconnect manually',
);
}, delay);
} else if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error(
'[WebSocket] Max reconnect attempts reached, giving up',
);
}
}
/**
* Disconnect from server
*/
disconnect() {
this.isManualDisconnect = true;
this.stopPing();
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
this.ws = null;
}
console.log('[WebSocket] Disconnected');
}
/**
* Build WebSocket URL
*/
private buildWebSocketUrl(): string {
// Get current base URL
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host;
return `${protocol}//${host}/api/v1/pipelines/${this.pipelineId}/chat/ws`;
}
/**
* Generate unique client message ID
*/
private generateMessageId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Get connection state
*/
getState():
| 'CONNECTING'
| 'OPEN'
| 'CLOSING'
| 'CLOSED'
| 'DISCONNECTED' {
if (!this.ws) return 'DISCONNECTED';
switch (this.ws.readyState) {
case WebSocket.CONNECTING:
return 'CONNECTING';
case WebSocket.OPEN:
return 'OPEN';
case WebSocket.CLOSING:
return 'CLOSING';
case WebSocket.CLOSED:
return 'CLOSED';
default:
return 'DISCONNECTED';
}
}
/**
* Check if connected
*/
isConnected(): boolean {
return this.ws?.readyState === WebSocket.OPEN;
}
}

View File

@@ -206,9 +206,11 @@ const enUS = {
deleteConfirm: 'Delete Confirmation',
deleteSuccess: 'Delete successful',
modifyFailed: 'Modify failed: ',
eventCount: 'Events: {{count}}',
toolCount: 'Tools: {{count}}',
starCount: 'Stars: {{count}}',
componentName: {
Tool: 'Tool',
EventListener: 'Event Listener',
Command: 'Command',
},
uploadLocal: 'Upload Local',
debugging: 'Debugging',
uploadLocalPlugin: 'Upload Local Plugin',
@@ -298,6 +300,7 @@ const enUS = {
defaultBadge: 'Default',
sortBy: 'Sort by',
newestCreated: 'Newest Created',
earliestCreated: 'Earliest Created',
recentlyEdited: 'Recently Edited',
earliestEdited: 'Earliest Edited',
basicInfo: 'Basic',

View File

@@ -207,9 +207,11 @@ const jaJP = {
deleteConfirm: '削除の確認',
deleteSuccess: '削除に成功しました',
modifyFailed: '変更に失敗しました:',
eventCount: 'イベント:{{count}}',
toolCount: 'ツール:{{count}}',
starCount: 'スター:{{count}}',
componentName: {
Tool: 'ツール',
EventListener: 'イベント監視器',
Command: 'コマンド',
},
uploadLocal: 'ローカルアップロード',
debugging: 'デバッグ中',
uploadLocalPlugin: 'ローカルプラグインのアップロード',
@@ -300,6 +302,7 @@ const jaJP = {
defaultBadge: 'デフォルト',
sortBy: '並び順',
newestCreated: '最新作成',
earliestCreated: '最古作成',
recentlyEdited: '最近編集',
earliestEdited: '最古編集',
basicInfo: '基本情報',

View File

@@ -199,9 +199,11 @@ const zhHans = {
deleteConfirm: '删除确认',
deleteSuccess: '删除成功',
modifyFailed: '修改失败:',
eventCount: '事件:{{count}}',
toolCount: '工具:{{count}}',
starCount: '星标:{{count}}',
componentName: {
Tool: '工具',
EventListener: '事件监听器',
Command: '命令',
},
uploadLocal: '本地上传',
debugging: '调试中',
uploadLocalPlugin: '上传本地插件',
@@ -285,6 +287,7 @@ const zhHans = {
defaultBadge: '默认',
sortBy: '排序方式',
newestCreated: '最新创建',
earliestCreated: '最早创建',
recentlyEdited: '最近编辑',
earliestEdited: '最早编辑',
basicInfo: '基础信息',

View File

@@ -197,9 +197,11 @@ const zhHant = {
close: '關閉',
deleteConfirm: '刪除確認',
modifyFailed: '修改失敗:',
eventCount: '事件:{{count}}',
toolCount: '工具:{{count}}',
starCount: '星標:{{count}}',
componentName: {
Tool: '工具',
EventListener: '事件監聽器',
Command: '命令',
},
uploadLocal: '本地上傳',
debugging: '調試中',
uploadLocalPlugin: '上傳本地插件',
@@ -283,6 +285,7 @@ const zhHant = {
defaultBadge: '預設',
sortBy: '排序方式',
newestCreated: '最新建立',
earliestCreated: '最早建立',
recentlyEdited: '最近編輯',
earliestEdited: '最早編輯',
basicInfo: '基本資訊',