diff --git a/src/langbot/pkg/provider/runners/n8nsvapi.py b/src/langbot/pkg/provider/runners/n8nsvapi.py index d2b5aa78..d23b18e3 100644 --- a/src/langbot/pkg/provider/runners/n8nsvapi.py +++ b/src/langbot/pkg/provider/runners/n8nsvapi.py @@ -68,6 +68,33 @@ class N8nServiceAPIRunner(runner.RequestRunner): return plain_text + async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[ + provider_message.Message, None]: + """处理流式响应""" + full_content = "" + message_idx = 0 + is_final = False + async for chunk in response.content.iter_chunked(1024): + if not chunk: + continue + + try: + data = json.loads(chunk) + if data.get('type') == 'item' and 'content' in data: + message_idx += 1 + content = data['content'] + full_content += content + elif data.get('type') == 'end': + is_final = True + if is_final or message_idx % 8 == 0: + yield provider_message.MessageChunk( + role='assistant', + content=full_content, + is_final=is_final, + ) + except json.JSONDecodeError: + self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}") + async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: """调用n8n webhook""" # 生成会话ID(如果不存在) @@ -80,6 +107,7 @@ class N8nServiceAPIRunner(runner.RequestRunner): # 准备请求数据 payload = { # 基本消息内容 + 'chatInput' :plain_text, # 考虑到之前用户直接用的message model这里添加新键 'message': plain_text, 'user_message_text': plain_text, 'conversation_id': query.session.using_conversation.uuid, @@ -91,6 +119,11 @@ class N8nServiceAPIRunner(runner.RequestRunner): # 添加所有变量到payload payload.update(query.variables) + try: + is_stream = await query.adapter.is_stream_output_supported() + except AttributeError: + is_stream = False + try: # 准备请求头和认证信息 headers = {} @@ -126,30 +159,57 @@ class N8nServiceAPIRunner(runner.RequestRunner): # 调用webhook async with aiohttp.ClientSession() as session: - async with session.post( - self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout - ) as response: - if response.status != 200: - error_text = await response.text() - self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') - raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') + if is_stream: + # 流式请求 + async with session.post( + self.webhook_url, + json=payload, + headers=headers, + auth=auth, + timeout=self.timeout + ) as response: + if response.status != 200: + error_text = await response.text() + self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') + raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') - # 解析响应 - response_data = await response.json() - self.ap.logger.debug(f'n8n webhook response: {response_data}') - - # 从响应中提取输出 - if self.output_key in response_data: - output_content = response_data[self.output_key] + # 处理流式响应 + async for chunk in self._process_stream_response(response): + yield chunk else: - # 如果没有指定的输出键,则使用整个响应 - output_content = json.dumps(response_data, ensure_ascii=False) + async with session.post( + self.webhook_url, + json=payload, + headers=headers, + auth=auth, + timeout=self.timeout + ) as response: + try: + async for chunk in self._process_stream_response(response): + output_content = chunk.content if chunk.is_final else '' + except: + # 非流式请求(保持原有逻辑) + if response.status != 200: + error_text = await response.text() + self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') + raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') - # 返回消息 - yield provider_message.Message( - role='assistant', - content=output_content, - ) + # 解析响应 + response_data = await response.json() + self.ap.logger.debug(f'n8n webhook response: {response_data}') + + # 从响应中提取输出 + if self.output_key in response_data: + output_content = response_data[self.output_key] + else: + # 如果没有指定的输出键,则使用整个响应 + output_content = json.dumps(response_data, ensure_ascii=False) + + # 返回消息 + yield provider_message.Message( + role='assistant', + content=output_content, + ) except Exception as e: self.ap.logger.error(f'n8n webhook call exception: {str(e)}') raise N8nAPIError(f'n8n webhook call exception: {str(e)}') @@ -157,4 +217,4 @@ class N8nServiceAPIRunner(runner.RequestRunner): async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: """运行请求""" async for msg in self._call_webhook(query): - yield msg + yield msg \ No newline at end of file