From a7d638cc9a2e6e4b05e9e96bb5cb7c7b31f5bcfd Mon Sep 17 00:00:00 2001 From: Dong_master <2213070223@qq.com> Date: Mon, 14 Jul 2025 23:53:55 +0800 Subject: [PATCH] feat:add deepseek and modelscope llm stream,and giteeai think in content remove_think --- pkg/provider/modelmgr/requesters/chatcmpl.py | 11 +- .../modelmgr/requesters/deepseekchatcmpl.py | 6 +- .../modelmgr/requesters/giteeaichatcmpl.py | 167 +++++++++++++++++- .../modelmgr/requesters/modelscopechatcmpl.py | 48 +++++ 4 files changed, 226 insertions(+), 6 deletions(-) diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py index fbaf96fd..8e350bf6 100644 --- a/pkg/provider/modelmgr/requesters/chatcmpl.py +++ b/pkg/provider/modelmgr/requesters/chatcmpl.py @@ -52,10 +52,11 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): async def _make_msg( self, - pipeline_config: dict[str, typing.Any], chat_completion: chat_completion.ChatCompletion, + pipeline_config: dict[str, typing.Any] = {'trigger': {'misc': {'remove_think': False}}}, ) -> llm_entities.Message: chatcmpl_message = chat_completion.choices[0].message.model_dump() + # print(chatcmpl_message.keys(),chatcmpl_message.values()) # 确保 role 字段存在且不为 None if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None: @@ -65,6 +66,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): # deepseek的reasoner模型 if pipeline_config['trigger'].get('misc', '').get('remove_think'): + pass else: if reasoning_content is not None : @@ -92,13 +94,16 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} # 确保 role 字段存在且不为 None - # print(delta.values()) + # print(delta.keys(),delta.values()) if 'role' not in delta or delta['role'] is None: delta['role'] = 'assistant' reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None + delta['content'] = '' if delta['content'] is None else delta['content'] + # print(reasoning_content) + # deepseek的reasoner模型 if pipeline_config['trigger'].get('misc', '').get('remove_think'): if reasoning_content is not None : @@ -239,7 +244,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): resp = await self._req(args, extra_body=extra_args) # 处理请求结果 pipeline_config = query.pipeline_config - message = await self._make_msg(pipeline_config,resp) + message = await self._make_msg(resp,pipeline_config) return message diff --git a/pkg/provider/modelmgr/requesters/deepseekchatcmpl.py b/pkg/provider/modelmgr/requesters/deepseekchatcmpl.py index 6d664b01..f57f624f 100644 --- a/pkg/provider/modelmgr/requesters/deepseekchatcmpl.py +++ b/pkg/provider/modelmgr/requesters/deepseekchatcmpl.py @@ -49,10 +49,12 @@ class DeepseekChatCompletions(chatcmpl.OpenAIChatCompletions): # 发送请求 resp = await self._req(args, extra_body=extra_args) + # print(resp) + if resp is None: raise errors.RequesterError('接口返回为空,请确定模型提供商服务是否正常') - + pipeline_config = query.pipeline_config # 处理请求结果 - message = await self._make_msg(resp) + message = await self._make_msg(resp,pipeline_config) return message diff --git a/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py b/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py index 3795ef99..ce1b075f 100644 --- a/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py +++ b/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py @@ -8,6 +8,9 @@ from .. import requester from ....core import entities as core_entities from ... import entities as llm_entities from ...tools import entities as tools_entities +import re +import openai.types.chat.chat_completion as chat_completion + class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): @@ -17,6 +20,7 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): 'base_url': 'https://ai.gitee.com/v1', 'timeout': 120, } + is_think:bool = False async def _closure( self, @@ -46,6 +50,167 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): resp = await self._req(args, extra_body=extra_args) - message = await self._make_msg(resp) + pipeline_config = query.pipeline_config + + message = await self._make_msg(resp,pipeline_config) return message + + + async def _make_msg( + self, + chat_completion: chat_completion.ChatCompletion, + pipeline_config: dict[str, typing.Any] = {'trigger': {'misc': {'remove_think': False}}}, + ) -> llm_entities.Message: + chatcmpl_message = chat_completion.choices[0].message.model_dump() + # print(chatcmpl_message.keys(), chatcmpl_message.values()) + + # 确保 role 字段存在且不为 None + if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None: + chatcmpl_message['role'] = 'assistant' + + reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None + + # deepseek的reasoner模型 + if pipeline_config['trigger'].get('misc', '').get('remove_think'): + chatcmpl_message['content'] = re.sub(r'.*?', '', chatcmpl_message['content'], flags=re.DOTALL) + else: + if reasoning_content is not None: + chatcmpl_message['content'] = '\n' + reasoning_content + '\n\n' + chatcmpl_message['content'] + + message = llm_entities.Message(**chatcmpl_message) + + return message + + + async def _make_msg_chunk( + self, + pipeline_config: dict[str, typing.Any], + chat_completion: chat_completion.ChatCompletion, + idx: int, + ) -> llm_entities.MessageChunk: + + # 处理流式chunk和完整响应的差异 + # print(chat_completion.choices[0]) + if hasattr(chat_completion, 'choices'): + # 完整响应模式 + choice = chat_completion.choices[0] + delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() + else: + # 流式chunk模式 + delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} + + # 确保 role 字段存在且不为 None + if 'role' not in delta or delta['role'] is None: + delta['role'] = 'assistant' + + + reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None + + delta['content'] = '' if delta['content'] is None else delta['content'] + # print(reasoning_content) + + # deepseek的reasoner模型 + if pipeline_config['trigger'].get('misc', '').get('remove_think'): + if delta['content'] == '': + self.is_think = True + delta['content'] = '' + if delta['content'] == rf'': + self.is_think = False + delta['content'] = '' + if not self.is_think: + delta['content'] = delta['content'] + else: + delta['content'] = '' + else: + if reasoning_content is not None and idx == 0: + delta['content'] += f'\n{reasoning_content}' + elif reasoning_content is None: + if self.is_content: + delta['content'] = delta['content'] + else: + delta['content'] = f'\n\n\n{delta["content"]}' + self.is_content = True + else: + delta['content'] += reasoning_content + + + message = llm_entities.MessageChunk(**delta) + + return message + + async def _closure_stream( + self, + query: core_entities.Query, + req_messages: list[dict], + use_model: requester.RuntimeLLMModel, + use_funcs: list[tools_entities.LLMFunction] = None, + stream: bool = False, + extra_args: dict[str, typing.Any] = {}, + ) -> llm_entities.Message | typing.AsyncGenerator[llm_entities.MessageChunk, None]: + self.client.api_key = use_model.token_mgr.get_token() + + args = {} + args['model'] = use_model.model_entity.name + + if use_funcs: + tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs) + + if tools: + args['tools'] = tools + + # 设置此次请求中的messages + messages = req_messages.copy() + + # 检查vision + for msg in messages: + if 'content' in msg and isinstance(msg['content'], list): + for me in msg['content']: + if me['type'] == 'image_base64': + me['image_url'] = {'url': me['image_base64']} + me['type'] = 'image_url' + del me['image_base64'] + + args['messages'] = messages + + if stream: + current_content = '' + args["stream"] = True + chunk_idx = 0 + self.is_content = False + tool_calls_map: dict[str, llm_entities.ToolCall] = {} + pipeline_config = query.pipeline_config + async for chunk in self._req_stream(args, extra_body=extra_args): + # 处理流式消息 + delta_message = await self._make_msg_chunk(pipeline_config,chunk,chunk_idx) + if delta_message.content: + current_content += delta_message.content + delta_message.content = current_content + # delta_message.all_content = current_content + if delta_message.tool_calls: + for tool_call in delta_message.tool_calls: + if tool_call.id not in tool_calls_map: + tool_calls_map[tool_call.id] = llm_entities.ToolCall( + id=tool_call.id, + type=tool_call.type, + function=llm_entities.FunctionCall( + name=tool_call.function.name if tool_call.function else '', + arguments='' + ), + ) + if tool_call.function and tool_call.function.arguments: + # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 + tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments + + + chunk_idx += 1 + chunk_choices = getattr(chunk, 'choices', None) + if chunk_choices and getattr(chunk_choices[0], 'finish_reason', None): + delta_message.is_final = True + delta_message.content = current_content + + if chunk_idx % 64 == 0 or delta_message.is_final: + + yield delta_message + + diff --git a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py index 4708f671..20a315a8 100644 --- a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py +++ b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py @@ -202,3 +202,51 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}') except openai.APIError as e: raise errors.RequesterError(f'请求错误: {e.message}') + + + async def invoke_llm_stream( + self, + query: core_entities.Query, + model: requester.RuntimeLLMModel, + messages: typing.List[llm_entities.Message], + funcs: typing.List[tools_entities.LLMFunction] = None, + stream: bool = False, + extra_args: dict[str, typing.Any] = {}, + ) -> llm_entities.MessageChunk: + req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行 + for m in messages: + msg_dict = m.dict(exclude_none=True) + content = msg_dict.get('content') + if isinstance(content, list): + # 检查 content 列表中是否每个部分都是文本 + if all(isinstance(part, dict) and part.get('type') == 'text' for part in content): + # 将所有文本部分合并为一个字符串 + msg_dict['content'] = '\n'.join(part['text'] for part in content) + req_messages.append(msg_dict) + + try: + async for item in self._closure_stream( + query=query, + req_messages=req_messages, + use_model=model, + use_funcs=funcs, + stream=stream, + extra_args=extra_args, + ): + yield item + + except asyncio.TimeoutError: + raise errors.RequesterError('请求超时') + except openai.BadRequestError as e: + if 'context_length_exceeded' in e.message: + raise errors.RequesterError(f'上文过长,请重置会话: {e.message}') + else: + raise errors.RequesterError(f'请求参数错误: {e.message}') + except openai.AuthenticationError as e: + raise errors.RequesterError(f'无效的 api-key: {e.message}') + except openai.NotFoundError as e: + raise errors.RequesterError(f'请求路径错误: {e.message}') + except openai.RateLimitError as e: + raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}') + except openai.APIError as e: + raise errors.RequesterError(f'请求错误: {e.message}') \ No newline at end of file