diff --git a/pkg/provider/runners/dashscopeapi.py b/pkg/provider/runners/dashscopeapi.py index 7f66e6f0..f488d23c 100644 --- a/pkg/provider/runners/dashscopeapi.py +++ b/pkg/provider/runners/dashscopeapi.py @@ -99,8 +99,14 @@ class DashScopeAPIRunner(runner.RequestRunner): plain_text = '' # 用户输入的纯文本信息 image_ids = [] # 用户输入的图片ID列表 (暂不支持) - plain_text, image_ids = await self._preprocess_user_message(query) + think_start = False + think_end = False + plain_text, image_ids = await self._preprocess_user_message(query) + has_thoughts = True # 获取思考过程 + remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') + if remove_think: + has_thoughts = False # 发送对话请求 response = dashscope.Application.call( api_key=self.api_key, # 智能体应用的API Key @@ -109,6 +115,7 @@ class DashScopeAPIRunner(runner.RequestRunner): stream=True, # 流式输出 incremental_output=True, # 增量输出,使用流式输出需要开启增量输出 session_id=query.session.using_conversation.uuid, # 会话ID用于,多轮对话 + has_thoughts=has_thoughts, # rag_options={ # 主要用于文件交互,暂不支持 # "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个 # } @@ -131,6 +138,17 @@ class DashScopeAPIRunner(runner.RequestRunner): idx_chunk += 1 # 获取流式传输的output stream_output = chunk.get('output', {}) + stream_think = stream_output.get('thoughts', []) + if stream_think[0].get('thought'): + if not think_start: + think_start = True + pending_content += f"\n{stream_think[0].get('thought')}" + else: + # 继续输出 reasoning_content + pending_content += stream_think[0].get('thought') + elif stream_think[0].get('thought') == "" and not think_end: + think_end = True + pending_content += "\n\n" if stream_output.get('text') is not None: pending_content += stream_output.get('text') # 是否是流式最后一个chunk @@ -167,6 +185,17 @@ class DashScopeAPIRunner(runner.RequestRunner): idx_chunk += 1 # 获取流式传输的output stream_output = chunk.get('output', {}) + stream_think = stream_output.get('thoughts', []) + if stream_think[0].get('thought'): + if not think_start: + think_start = True + pending_content += f"\n{stream_think[0].get('thought')}" + else: + # 继续输出 reasoning_content + pending_content += stream_think[0].get('thought') + elif stream_think[0].get('thought') == "" and not think_end: + think_end = True + pending_content += "\n\n" if stream_output.get('text') is not None: pending_content += stream_output.get('text') @@ -302,11 +331,18 @@ class DashScopeAPIRunner(runner.RequestRunner): async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: """运行""" + msg_seq = 0 if self.app_type == 'agent': async for msg in self._agent_messages(query): + if isinstance(msg, llm_entities.MessageChunk): + msg_seq += 1 + msg.msg_sequence = msg_seq yield msg elif self.app_type == 'workflow': async for msg in self._workflow_messages(query): + if isinstance(msg, llm_entities.MessageChunk): + msg_seq += 1 + msg.msg_sequence = msg_seq yield msg else: raise DashscopeAPIError(f'不支持的 Dashscope 应用类型: {self.app_type}')