diff --git a/libs/dingtalk_api/api.py b/libs/dingtalk_api/api.py index d323df1e..d1c7065f 100644 --- a/libs/dingtalk_api/api.py +++ b/libs/dingtalk_api/api.py @@ -3,6 +3,7 @@ import json import time from typing import Callable import dingtalk_stream # type: ignore +from dingtalk_stream import AckMessage, ChatbotHandler, CallbackHandler, CallbackMessage, ChatbotMessage, AICardReplier from .EchoHandler import EchoTextHandler from .dingtalkevent import DingTalkEvent import httpx @@ -253,6 +254,48 @@ class DingTalkClient: await self.logger.error(f'failed to send proactive massage to group: {traceback.format_exc()}') raise Exception(f'failed to send proactive massage to group: {traceback.format_exc()}') + async def create_and_card(self, temp_card_id: str, incoming_message: dingtalk_stream.ChatbotMessage,quote_origin:bool=False): + content_key = "content" + card_data = {content_key: ""} + + card_instance = dingtalk_stream.AICardReplier( + self.client, incoming_message + ) + # print(card_instance) + # 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards + card_instance_id = await card_instance.async_create_and_deliver_card( + temp_card_id, card_data, + ) + return card_instance,card_instance_id + + async def send_card_message(self, + card_instance, + card_instance_id: str,content: str,is_final: bool): + content_key = "content" + try: + await card_instance.async_streaming( + card_instance_id, + content_key=content_key, + content_value=content, + append=False, + finished=is_final, + failed=False, + ) + except Exception as e: + self.logger.exception(e) + await card_instance.async_streaming( + card_instance_id, + content_key=content_key, + content_value="", + append=False, + finished=is_final, + failed=True, + ) + + + + + async def start(self): """启动 WebSocket 连接,监听消息""" await self.client.start() diff --git a/pkg/pipeline/process/handlers/chat.py b/pkg/pipeline/process/handlers/chat.py index 1ccf9bb9..0fe7f868 100644 --- a/pkg/pipeline/process/handlers/chat.py +++ b/pkg/pipeline/process/handlers/chat.py @@ -23,11 +23,11 @@ class ChatMessageHandler(handler.MessageHandler): self, query: core_entities.Query, ) -> typing.AsyncGenerator[entities.StageProcessResult, None]: - """Process""" - # Call API - # generator + """处理""" + # 调API + # 生成器 - # Trigger plugin event + # 触发插件事件 event_class = ( events.PersonNormalMessageReceived if query.launcher_type == core_entities.LauncherTypes.PERSON @@ -47,7 +47,6 @@ class ChatMessageHandler(handler.MessageHandler): if event_ctx.is_prevented_default(): if event_ctx.event.reply is not None: mc = platform_message.MessageChain(event_ctx.event.reply) - query.resp_messages.append(mc) yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) @@ -55,15 +54,14 @@ class ChatMessageHandler(handler.MessageHandler): yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query) else: if event_ctx.event.alter is not None: - # if isinstance(event_ctx.event, str): # Currently not considering multi-modal alter + # if isinstance(event_ctx.event, str): # 现在暂时不考虑多模态alter query.user_message.content = event_ctx.event.alter text_length = 0 try: - is_stream = query.adapter.is_stream + is_stream = await query.adapter.is_stream_output_supported() except AttributeError: is_stream = False - print(is_stream) try: for r in runner_module.preregistered_runners: @@ -107,7 +105,8 @@ class ChatMessageHandler(handler.MessageHandler): query.session.using_conversation.messages.extend(query.resp_messages) except Exception as e: - self.ap.logger.error(f'Request failed({query.query_id}): {type(e).__name__} {str(e)}') + self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}') + traceback.print_exc() hide_exception_info = query.pipeline_config['output']['misc']['hide-exception'] @@ -120,4 +119,4 @@ class ChatMessageHandler(handler.MessageHandler): ) finally: # TODO statistics - pass + pass \ No newline at end of file diff --git a/pkg/pipeline/respback/respback.py b/pkg/pipeline/respback/respback.py index 52714ce2..9a410b3f 100644 --- a/pkg/pipeline/respback/respback.py +++ b/pkg/pipeline/respback/respback.py @@ -46,7 +46,7 @@ class SendResponseBackStage(stage.PipelineStage): print(is_final) await query.adapter.reply_message_chunk( message_source=query.message_event, - message_id=query.message_event.message_chain.message_id, + message_id=query.resp_messages[-1].resp_message_id, message=query.resp_message_chain[-1], quote_origin=quote_origin, is_final=is_final, diff --git a/pkg/platform/adapter.py b/pkg/platform/adapter.py index 3951326c..d4b48ef6 100644 --- a/pkg/platform/adapter.py +++ b/pkg/platform/adapter.py @@ -80,6 +80,10 @@ class MessagePlatformAdapter(metaclass=abc.ABCMeta): """ raise NotImplementedError + async def create_message_card(self,message_id,event): + '''创建卡片消息''' + return False + async def is_muted(self, group_id: int) -> bool: """获取账号是否在指定群被禁言""" raise NotImplementedError diff --git a/pkg/platform/sources/dingtalk.py b/pkg/platform/sources/dingtalk.py index a669a599..4a312063 100644 --- a/pkg/platform/sources/dingtalk.py +++ b/pkg/platform/sources/dingtalk.py @@ -154,19 +154,15 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter): ) incoming_message = event.incoming_message - msg_id = incoming_message.message_id + # msg_id = incoming_message.message_id content, at = await DingTalkMessageConverter.yiri2target(message) - # is_stream = self.config['enable-stream-reply'] - # print(content) - card_template_id = self.config['card_template_id'] - if msg_id not in self.card_instance_id_dict: - card_instance,card_instance_id = await self.bot.create_and_card(card_template_id,incoming_message,at) - self.card_instance_id_dict[msg_id] = (card_instance,card_instance_id) - else: - card_instance,card_instance_id = self.card_instance_id_dict[msg_id] + + card_instance,card_instance_id = self.card_instance_id_dict[message_id] # print(card_instance_id) await self.bot.send_card_message(card_instance,card_instance_id,content,is_final) + if is_final: + self.card_instance_id_dict.pop(message_id) async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): @@ -180,15 +176,15 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter): is_stream = False if self.config.get("enable-stream-reply", None): is_stream = True - self.is_stream = is_stream - return is_stream - async def create_message_card(self,message_id: str, incoming_message): + async def create_message_card(self,message_id,event): card_template_id = self.config['card_template_id'] - + incoming_message = event.incoming_message + # message_id = incoming_message.message_id card_instance, card_instance_id = await self.bot.create_and_card(card_template_id, incoming_message) self.card_instance_id_dict[message_id] = (card_instance, card_instance_id) + return True def register_listener( diff --git a/pkg/platform/sources/lark.py b/pkg/platform/sources/lark.py index 2fd0a081..fb3d0c48 100644 --- a/pkg/platform/sources/lark.py +++ b/pkg/platform/sources/lark.py @@ -360,6 +360,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): self.message_id_to_card_id = {} self.card_id_dict = {} self.seq = 1 + self.card_id_time = {} @self.quart_app.route('/lark/callback', methods=['POST']) async def lark_callback(): @@ -405,101 +406,13 @@ class LarkAdapter(adapter.MessagePlatformAdapter): return {'code': 500, 'message': 'error'} - async def is_stream_output_supported() -> bool: - is_stream = False - if self.config.get("enable-stream-reply",None): - is_stream = True - self.is_stream = is_stream - return is_stream - - async def create_card_id(message_id): - try: - is_stream = await is_stream_output_supported() - if is_stream: - self.ap.logger.debug('飞书支持stream输出,创建卡片......') - # card_id = '' - # # if self.card_id_dict: - # # card_id = [k for k,v in self.card_id_dict.items() if (v+datetime.timedelta(days=14))< datetime.datetime.now()][0] - # - # if self.card_id_dict is None: - # # content = { - # # "type": "card_json", - # # "data": {"schema":"2.0","header":{"title":{"content":"bot","tag":"plain_text"}},"body":{"elements":[{"tag":"markdown","content":""}]}} - # # } - # card_data = {"schema":"2.0","header":{"title":{"content":"bot","tag":"plain_text"}}, - # "body":{"elements":[{"tag":"markdown","content":""}]},"config": {"streaming_mode": True, - # "streaming_config": {"print_strategy": "fast"}}} - # - # request: CreateCardRequest = CreateCardRequest.builder() \ - # .request_body( - # CreateCardRequestBody.builder() - # .type("card_json") - # .data(json.dumps(card_data)) \ - # .build() - # ).build() - # - # # 发起请求 - # response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request) - # - # - # # 处理失败返回 - # if not response.success(): - # raise Exception( - # f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}") - # - # self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}') - # self.card_id_dict[response.data.card_id] = datetime.datetime.now() - # - # card_id = response.data.card_id - card_data = {"schema": "2.0", "header": {"title": {"content": "bot", "tag": "plain_text"}}, - "body": {"elements": [{"tag": "markdown", "content": "[思考中.....]","element_id":"markdown_1"}]}, - "config": {"streaming_mode": True, - "streaming_config": {"print_strategy": "delay"}}} # delay / fast - request: CreateCardRequest = CreateCardRequest.builder() \ - .request_body( - CreateCardRequestBody.builder() - .type("card_json") - .data(json.dumps(card_data)) \ - .build() - ).build() - - # 发起请求 - response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request) - - # 处理失败返回 - if not response.success(): - raise Exception( - f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}") - - self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}') - self.card_id_dict[message_id] = response.data.card_id - - card_id = response.data.card_id - return card_id - - except Exception as e: - self.ap.logger.error(f'飞书卡片创建失败,错误信息: {e}') - async def on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): - if await is_stream_output_supported(): - self.ap.logger.debug('卡片回复模式开启') - # 开启卡片回复模式. 这里可以实现飞书一发消息,马上创建卡片进行回复"思考中..." - card_id = await create_card_id(event.event.message.message_id) - reply_message_id = await self.create_message_card(card_id, event.event.message.message_id) - self.message_id_to_card_id[event.event.message.message_id] = (reply_message_id, time.time()) - - if len(self.message_id_to_card_id) > CARD_ID_CACHE_SIZE: - self.message_id_to_card_id = { - k: v - for k, v in self.message_id_to_card_id.items() - if v[1] > time.time() - CARD_ID_CACHE_MAX_LIFETIME - } lb_event = await self.event_converter.target2yiri(event, self.api_client) @@ -520,21 +433,64 @@ class LarkAdapter(adapter.MessagePlatformAdapter): async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): pass - async def create_message_card(self, card_id: str, message_id: str) -> str: + async def is_stream_output_supported(self) -> bool: + is_stream = False + if self.config.get("enable-stream-reply", None): + is_stream = True + return is_stream + + async def create_card_id(self,message_id): + try: + is_stream = await self.is_stream_output_supported() + if is_stream: + self.ap.logger.debug('飞书支持stream输出,创建卡片......') + + card_data = {"schema": "2.0", "header": {"title": {"content": "bot", "tag": "plain_text"}}, + "body": {"elements": [ + {"tag": "markdown", "content": "[思考中.....]", "element_id": "markdown_1"}]}, + "config": {"streaming_mode": True, + "streaming_config": {"print_strategy": "delay"}}} # delay / fast + + request: CreateCardRequest = CreateCardRequest.builder() \ + .request_body( + CreateCardRequestBody.builder() + .type("card_json") + .data(json.dumps(card_data)) \ + .build() + ).build() + + # 发起请求 + response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request) + + # 处理失败返回 + if not response.success(): + raise Exception( + f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}") + + self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}') + self.card_id_dict[message_id] = response.data.card_id + + card_id = response.data.card_id + return card_id + + except Exception as e: + self.ap.logger.error(f'飞书卡片创建失败,错误信息: {e}') + + async def create_message_card(self,message_id,event) -> str: """ 创建卡片消息。 使用卡片消息是因为普通消息更新次数有限制,而大模型流式返回结果可能很多而超过限制,而飞书卡片没有这个限制 """ + # message_id = event.message_chain.message_id - # TODO 目前只支持卡片模板方式,且卡片变量一定是content,未来这块要做成可配置 - # 发消息马上就会回复显示初始化的content信息,即思考中 + card_id = await self.create_card_id(message_id) content = { 'type': 'card', 'data': {'card_id': card_id, 'template_variable': {'content': 'Thinking...'}}, } request: ReplyMessageRequest = ( ReplyMessageRequest.builder() - .message_id(message_id) + .message_id(event.message_chain.message_id) .request_body( ReplyMessageRequestBody.builder().content(json.dumps(content)).msg_type('interactive').build() ) @@ -549,7 +505,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): raise Exception( f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' ) - return response.data.message_id + return True async def reply_message( self, @@ -633,6 +589,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter): if is_final: self.seq = 1 + self.card_id_dict.pop(message_id) # 发起请求 response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request) diff --git a/pkg/provider/entities.py b/pkg/provider/entities.py index e8037e68..df2b5487 100644 --- a/pkg/provider/entities.py +++ b/pkg/provider/entities.py @@ -127,6 +127,8 @@ class Message(pydantic.BaseModel): class MessageChunk(pydantic.BaseModel): """消息""" + resp_message_id: typing.Optional[str] = None + """消息id""" role: str # user, system, assistant, tool, command, plugin """消息的角色""" diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index 24318716..f0c36ca1 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -96,7 +96,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.variables['conversation_id'] = cov_id try: - is_stream = query.adapter.is_stream + is_stream = await query.adapter.is_stream_output_supported() except AttributeError: is_stream = False @@ -209,7 +209,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.variables['conversation_id'] = cov_id try: - is_stream = query.adapter.is_stream + is_stream = await query.adapter.is_stream_output_supported() except AttributeError: is_stream = False @@ -346,7 +346,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.variables['conversation_id'] = query.session.using_conversation.uuid try: - is_stream = query.adapter.is_stream + is_stream = await query.adapter.is_stream_output_supported() except AttributeError: is_stream = False diff --git a/pkg/provider/runners/localagent.py b/pkg/provider/runners/localagent.py index 79de89a4..6b4da90b 100644 --- a/pkg/provider/runners/localagent.py +++ b/pkg/provider/runners/localagent.py @@ -89,7 +89,9 @@ class LocalAgentRunner(runner.RequestRunner): is_stream = query.adapter.is_stream_output_supported() try: - is_stream = query.adapter.is_stream + # print(await query.adapter.is_stream_output_supported()) + is_stream = await query.adapter.is_stream_output_supported() + except AttributeError: is_stream = False # while True: