mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-25 03:15:06 +08:00
fix: fix n8n streaming support issue (#1787)
* fix: fix n8n streaming support issue Add streaming support detection and proper message type handling for n8n service API runner. Previously, when streaming was enabled, n8n integration would fail due to incorrect message type usage. 1. Added streaming capability detection by checking adapter's is_stream_output_supported method 2. Implemented conditional message generation using MessageChunk for streaming mode and Message for non-streaming mode 3. Added proper error handling for adapters that don't support streaming detection * fix: add n8n webhook streaming model ,Optimized the streaming output when calling n8n. --------- Co-authored-by: Dong_master <2213070223@qq.com>
This commit is contained in:
@@ -68,6 +68,33 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
return plain_text
|
||||
|
||||
async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[
|
||||
provider_message.Message, None]:
|
||||
"""处理流式响应"""
|
||||
full_content = ""
|
||||
message_idx = 0
|
||||
is_final = False
|
||||
async for chunk in response.content.iter_chunked(1024):
|
||||
if not chunk:
|
||||
continue
|
||||
|
||||
try:
|
||||
data = json.loads(chunk)
|
||||
if data.get('type') == 'item' and 'content' in data:
|
||||
message_idx += 1
|
||||
content = data['content']
|
||||
full_content += content
|
||||
elif data.get('type') == 'end':
|
||||
is_final = True
|
||||
if is_final or message_idx % 8 == 0:
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=full_content,
|
||||
is_final=is_final,
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}")
|
||||
|
||||
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""调用n8n webhook"""
|
||||
# 生成会话ID(如果不存在)
|
||||
@@ -80,6 +107,7 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
# 准备请求数据
|
||||
payload = {
|
||||
# 基本消息内容
|
||||
'chatInput' :plain_text, # 考虑到之前用户直接用的message model这里添加新键
|
||||
'message': plain_text,
|
||||
'user_message_text': plain_text,
|
||||
'conversation_id': query.session.using_conversation.uuid,
|
||||
@@ -91,6 +119,11 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
# 添加所有变量到payload
|
||||
payload.update(query.variables)
|
||||
|
||||
try:
|
||||
is_stream = await query.adapter.is_stream_output_supported()
|
||||
except AttributeError:
|
||||
is_stream = False
|
||||
|
||||
try:
|
||||
# 准备请求头和认证信息
|
||||
headers = {}
|
||||
@@ -126,30 +159,57 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
# 调用webhook
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
if is_stream:
|
||||
# 流式请求
|
||||
async with session.post(
|
||||
self.webhook_url,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
timeout=self.timeout
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
|
||||
# 解析响应
|
||||
response_data = await response.json()
|
||||
self.ap.logger.debug(f'n8n webhook response: {response_data}')
|
||||
|
||||
# 从响应中提取输出
|
||||
if self.output_key in response_data:
|
||||
output_content = response_data[self.output_key]
|
||||
# 处理流式响应
|
||||
async for chunk in self._process_stream_response(response):
|
||||
yield chunk
|
||||
else:
|
||||
# 如果没有指定的输出键,则使用整个响应
|
||||
output_content = json.dumps(response_data, ensure_ascii=False)
|
||||
async with session.post(
|
||||
self.webhook_url,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
timeout=self.timeout
|
||||
) as response:
|
||||
try:
|
||||
async for chunk in self._process_stream_response(response):
|
||||
output_content = chunk.content if chunk.is_final else ''
|
||||
except:
|
||||
# 非流式请求(保持原有逻辑)
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
|
||||
# 返回消息
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=output_content,
|
||||
)
|
||||
# 解析响应
|
||||
response_data = await response.json()
|
||||
self.ap.logger.debug(f'n8n webhook response: {response_data}')
|
||||
|
||||
# 从响应中提取输出
|
||||
if self.output_key in response_data:
|
||||
output_content = response_data[self.output_key]
|
||||
else:
|
||||
# 如果没有指定的输出键,则使用整个响应
|
||||
output_content = json.dumps(response_data, ensure_ascii=False)
|
||||
|
||||
# 返回消息
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=output_content,
|
||||
)
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
|
||||
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
|
||||
@@ -157,4 +217,4 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""运行请求"""
|
||||
async for msg in self._call_webhook(query):
|
||||
yield msg
|
||||
yield msg
|
||||
Reference in New Issue
Block a user