From f9a5507029095651bc9c396810475ec0bd53d107 Mon Sep 17 00:00:00 2001 From: Dong_master <2213070223@qq.com> Date: Sun, 13 Jul 2025 22:41:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E4=BA=86=E5=9B=A0?= =?UTF-8?q?=E4=B8=BA=E8=BF=AD=E4=BB=A3=E6=95=B0=E6=8D=AE=E5=8F=AA=E6=8E=A8?= =?UTF-8?q?=E5=85=A5resq=5Fmessages=E5=92=8Cresq=5Fmessage=5Fchain?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E7=BC=93=E5=AD=98=E5=88=B0=E5=86=85=E5=AD=98?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E6=95=B0=E6=8D=AE=E5=92=8C=E5=86=99=E5=85=A5?= =?UTF-8?q?log=E4=B8=AD=E7=9A=84=E6=95=B0=E6=8D=AE=E9=87=8F=E5=BA=9E?= =?UTF-8?q?=E5=A4=A7=EF=BC=8C=E4=BB=A5=E5=8F=8A=E6=9C=89=E6=80=9D=E8=80=83?= =?UTF-8?q?=E7=9A=84think=E5=A4=84=E7=90=86=20feat:=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=B8=A6=E6=9C=89=E6=B7=B1=E5=BA=A6=E6=80=9D=E8=80=83=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E7=9A=84think=E7=9A=84=E5=8E=BBthink=E6=93=8D?= =?UTF-8?q?=E4=BD=9C=20feat:dify=E4=B8=AD=E8=81=8A=E5=A4=A9=E6=9C=BA?= =?UTF-8?q?=E5=99=A8=E4=BA=BA=EF=BC=8Cchatflow=E5=AF=B9=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/platform/sources/lark.py | 8 +-- pkg/provider/modelmgr/requesters/chatcmpl.py | 66 +++++++++++++----- pkg/provider/runners/difysvapi.py | 73 ++++++++++++++------ templates/metadata/pipeline/trigger.yaml | 7 ++ 4 files changed, 113 insertions(+), 41 deletions(-) diff --git a/pkg/platform/sources/lark.py b/pkg/platform/sources/lark.py index 503ef225..1816db8f 100644 --- a/pkg/platform/sources/lark.py +++ b/pkg/platform/sources/lark.py @@ -359,7 +359,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): self.listeners = {} self.message_id_to_card_id = {} self.card_id_dict = {} - self.seq = 0 + self.seq = 1 @self.quart_app.route('/lark/callback', methods=['POST']) async def lark_callback(): @@ -456,7 +456,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): card_data = {"schema": "2.0", "header": {"title": {"content": "bot", "tag": "plain_text"}}, "body": {"elements": [{"tag": "markdown", "content": "[思考中.....]","element_id":"markdown_1"}]}, "config": {"streaming_mode": True, - "streaming_config": {"print_strategy": "fast"}}} + "streaming_config": {"print_strategy": "delay"}}} # delay / fast request: CreateCardRequest = CreateCardRequest.builder() \ .request_body( @@ -620,7 +620,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): 'type': 'card_json', 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}}, } - + print(self.seq) request: ContentCardElementRequest = ContentCardElementRequest.builder() \ .card_id(self.card_id_dict[message_id]) \ .element_id("markdown_1") \ @@ -632,7 +632,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): .build() if is_final: - self.seq = 0 + self.seq = 1 # 发起请求 response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request) diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py index 844aa83f..69c09b8e 100644 --- a/pkg/provider/modelmgr/requesters/chatcmpl.py +++ b/pkg/provider/modelmgr/requesters/chatcmpl.py @@ -8,7 +8,7 @@ import openai.types.chat.chat_completion as chat_completion import httpx from .. import errors, requester -from ....core import entities as core_entities +from ....core import entities as core_entities, app from ... import entities as llm_entities from ...tools import entities as tools_entities @@ -25,7 +25,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): } - async def initialize(self): self.client = openai.AsyncClient( api_key='', @@ -53,6 +52,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): async def _make_msg( self, + pipeline_config: dict[str, typing.Any], chat_completion: chat_completion.ChatCompletion, ) -> llm_entities.Message: chatcmpl_message = chat_completion.choices[0].message.model_dump() @@ -64,8 +64,12 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None # deepseek的reasoner模型 - if reasoning_content is not None: - chatcmpl_message['content'] = '\n' + reasoning_content + '\n\n' + chatcmpl_message['content'] + print(pipeline_config['trigger'].get('misc', '').get('remove_think')) + if pipeline_config['trigger'].get('misc', '').get('remove_think'): + pass + else: + if reasoning_content is not None : + chatcmpl_message['content'] = '\n' + reasoning_content + '\n\n' + chatcmpl_message['content'] message = llm_entities.Message(**chatcmpl_message) @@ -73,7 +77,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): async def _make_msg_chunk( self, - index:int, + pipeline_config: dict[str, typing.Any], chat_completion: chat_completion.ChatCompletion, ) -> llm_entities.MessageChunk: @@ -96,16 +100,22 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None # deepseek的reasoner模型 - if reasoning_content is not None and index == 0: - delta['content'] += f'\n{reasoning_content}' - elif reasoning_content is None: - if self.is_content: - delta['content'] = delta['content'] + if pipeline_config['trigger'].get('misc', '').get('remove_think'): + if reasoning_content is not None : + pass else: - delta['content'] = f'\n\n\n{delta["content"]}' - self.is_content = True + delta['content'] = delta['content'] else: - delta['content'] += reasoning_content + if reasoning_content is not None: + delta['content'] += f'\n{reasoning_content}' + elif reasoning_content is None: + if self.is_content: + delta['content'] = delta['content'] + else: + delta['content'] = f'\n\n\n{delta["content"]}' + self.is_content = True + else: + delta['content'] += reasoning_content message = llm_entities.MessageChunk(**delta) @@ -151,20 +161,41 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): args["stream"] = True chunk_idx = 0 self.is_content = False + tool_calls_map: dict[str, llm_entities.ToolCall] = {} + pipeline_config = query.pipeline_config async for chunk in self._req_stream(args, extra_body=extra_args): # 处理流式消息 - delta_message = await self._make_msg_chunk(chunk_idx,chunk) - # print(delta_message) + delta_message = await self._make_msg_chunk(pipeline_config,chunk) if delta_message.content: current_content += delta_message.content delta_message.content = current_content + print(current_content) # delta_message.all_content = current_content + if delta_message.tool_calls: + for tool_call in delta_message.tool_calls: + if tool_call.id not in tool_calls_map: + tool_calls_map[tool_call.id] = llm_entities.ToolCall( + id=tool_call.id, + type=tool_call.type, + function=llm_entities.FunctionCall( + name=tool_call.function.name if tool_call.function else '', + arguments='' + ), + ) + if tool_call.function and tool_call.function.arguments: + # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 + tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments + + chunk_idx += 1 chunk_choices = getattr(chunk, 'choices', None) if chunk_choices and getattr(chunk_choices[0], 'finish_reason', None): delta_message.is_final = True + delta_message.content = current_content - yield delta_message + if chunk_idx % 64 == 0 or delta_message.is_final: + + yield delta_message # return @@ -208,7 +239,8 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): resp = await self._req(args, extra_body=extra_args) # 处理请求结果 - message = await self._make_msg(resp) + pipeline_config = query.pipeline_config + message = await self._make_msg(pipeline_config,resp) return message diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index 98b50f86..1dfde547 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -95,6 +95,11 @@ class DifyServiceAPIRunner(runner.RequestRunner): cov_id = query.session.using_conversation.uuid or '' query.variables['conversation_id'] = cov_id + try: + is_stream = query.adapter.is_stream + except AttributeError: + is_stream = False + plain_text, image_ids = await self._preprocess_user_message(query) files = [ @@ -144,40 +149,54 @@ class DifyServiceAPIRunner(runner.RequestRunner): if mode == 'workflow': if chunk['event'] == 'node_finished': - if chunk['data']['node_type'] == 'answer': - yield llm_entities.Message( - role='assistant', - content=self._try_convert_thinking(chunk['data']['outputs']['answer']), - ) + if not is_stream: + + if chunk['data']['node_type'] == 'answer': + yield llm_entities.Message( + role='assistant', + content=self._try_convert_thinking(chunk['data']['outputs']['answer']), + ) + else: + if chunk['data']['node_type'] == 'answer': + yield llm_entities.MessageChunk( + role='assistant', + content=self._try_convert_thinking(chunk['data']['outputs']['answer']), + is_final=True, + ) elif chunk['event'] == 'message': stream_output_pending_chunk += chunk['answer'] - if self.pipeline_config['ai']['dify-service-api'].get('enable-streaming', False): + if is_stream: # 消息数超过量就输出,从而达到streaming的效果 batch_pending_index += 1 if batch_pending_index >= batch_pending_max_size: - yield llm_entities.Message( + yield llm_entities.MessageChunk( role='assistant', content=self._try_convert_thinking(stream_output_pending_chunk), ) batch_pending_index = 0 elif mode == 'basic': - if chunk['event'] == 'message': - stream_output_pending_chunk += chunk['answer'] - if self.pipeline_config['ai']['dify-service-api'].get('enable-streaming', False): - # 消息数超过量就输出,从而达到streaming的效果 - batch_pending_index += 1 - if batch_pending_index >= batch_pending_max_size: + if chunk['event'] == 'message' or chunk['event'] == 'message_end': + if chunk['event'] == 'message_end': + is_final = True + if is_stream and batch_pending_index % batch_pending_max_size == 0: + # 消息数超过量就输出,从而达到streaming的效果 + batch_pending_index += 1 + # if batch_pending_index >= batch_pending_max_size: + yield llm_entities.MessageChunk( + role='assistant', + content=self._try_convert_thinking(stream_output_pending_chunk), + is_final=is_final, + ) + # batch_pending_index = 0 + elif not is_stream: yield llm_entities.Message( role='assistant', content=self._try_convert_thinking(stream_output_pending_chunk), ) - batch_pending_index = 0 - elif chunk['event'] == 'message_end': - yield llm_entities.Message( - role='assistant', - content=self._try_convert_thinking(stream_output_pending_chunk), - ) - stream_output_pending_chunk = '' + stream_output_pending_chunk = '' + else: + stream_output_pending_chunk += chunk['answer'] + is_final = False if chunk is None: raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置') @@ -191,6 +210,13 @@ class DifyServiceAPIRunner(runner.RequestRunner): cov_id = query.session.using_conversation.uuid or '' query.variables['conversation_id'] = cov_id + try: + is_stream = query.adapter.is_stream + except AttributeError: + is_stream = False + + batch_pending_index = 0 + plain_text, image_ids = await self._preprocess_user_message(query) files = [ @@ -285,6 +311,13 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.variables['conversation_id'] = query.session.using_conversation.uuid + try: + is_stream = query.adapter.is_stream + except AttributeError: + is_stream = False + + batch_pending_index = 0 + plain_text, image_ids = await self._preprocess_user_message(query) files = [ diff --git a/templates/metadata/pipeline/trigger.yaml b/templates/metadata/pipeline/trigger.yaml index 949b2698..165e488e 100644 --- a/templates/metadata/pipeline/trigger.yaml +++ b/templates/metadata/pipeline/trigger.yaml @@ -132,3 +132,10 @@ stages: type: boolean required: true default: true + - name: remove_think + label: + en_US: remove think + zh_Hans: 删除深度思考消息 + type: boolean + required: true + default: true