feat:add dingtalk stream

fix:adapter is_stream_output_supported bug
fix:stream message reply chunk in message_id
This commit is contained in:
Dong_master
2025-07-20 23:53:20 +08:00
committed by Junyan Qin
parent 43a259a1ae
commit 4905b5a738
9 changed files with 124 additions and 121 deletions

View File

@@ -3,6 +3,7 @@ import json
import time
from typing import Callable
import dingtalk_stream # type: ignore
from dingtalk_stream import AckMessage, ChatbotHandler, CallbackHandler, CallbackMessage, ChatbotMessage, AICardReplier
from .EchoHandler import EchoTextHandler
from .dingtalkevent import DingTalkEvent
import httpx
@@ -253,6 +254,48 @@ class DingTalkClient:
await self.logger.error(f'failed to send proactive massage to group: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to group: {traceback.format_exc()}')
async def create_and_card(self, temp_card_id: str, incoming_message: dingtalk_stream.ChatbotMessage,quote_origin:bool=False):
content_key = "content"
card_data = {content_key: ""}
card_instance = dingtalk_stream.AICardReplier(
self.client, incoming_message
)
# print(card_instance)
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
card_instance_id = await card_instance.async_create_and_deliver_card(
temp_card_id, card_data,
)
return card_instance,card_instance_id
async def send_card_message(self,
card_instance,
card_instance_id: str,content: str,is_final: bool):
content_key = "content"
try:
await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
content_value=content,
append=False,
finished=is_final,
failed=False,
)
except Exception as e:
self.logger.exception(e)
await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
content_value="",
append=False,
finished=is_final,
failed=True,
)
async def start(self):
"""启动 WebSocket 连接,监听消息"""
await self.client.start()

View File

@@ -23,11 +23,11 @@ class ChatMessageHandler(handler.MessageHandler):
self,
query: core_entities.Query,
) -> typing.AsyncGenerator[entities.StageProcessResult, None]:
"""Process"""
# Call API
# generator
"""处理"""
# API
# 生成器
# Trigger plugin event
# 触发插件事件
event_class = (
events.PersonNormalMessageReceived
if query.launcher_type == core_entities.LauncherTypes.PERSON
@@ -47,7 +47,6 @@ class ChatMessageHandler(handler.MessageHandler):
if event_ctx.is_prevented_default():
if event_ctx.event.reply is not None:
mc = platform_message.MessageChain(event_ctx.event.reply)
query.resp_messages.append(mc)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
@@ -55,15 +54,14 @@ class ChatMessageHandler(handler.MessageHandler):
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
else:
if event_ctx.event.alter is not None:
# if isinstance(event_ctx.event, str): # Currently not considering multi-modal alter
# if isinstance(event_ctx.event, str): # 现在暂时不考虑多模态alter
query.user_message.content = event_ctx.event.alter
text_length = 0
try:
is_stream = query.adapter.is_stream
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
print(is_stream)
try:
for r in runner_module.preregistered_runners:
@@ -107,7 +105,8 @@ class ChatMessageHandler(handler.MessageHandler):
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:
self.ap.logger.error(f'Request failed({query.query_id}): {type(e).__name__} {str(e)}')
self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}')
traceback.print_exc()
hide_exception_info = query.pipeline_config['output']['misc']['hide-exception']
@@ -120,4 +119,4 @@ class ChatMessageHandler(handler.MessageHandler):
)
finally:
# TODO statistics
pass
pass

View File

@@ -46,7 +46,7 @@ class SendResponseBackStage(stage.PipelineStage):
print(is_final)
await query.adapter.reply_message_chunk(
message_source=query.message_event,
message_id=query.message_event.message_chain.message_id,
message_id=query.resp_messages[-1].resp_message_id,
message=query.resp_message_chain[-1],
quote_origin=quote_origin,
is_final=is_final,

View File

@@ -80,6 +80,10 @@ class MessagePlatformAdapter(metaclass=abc.ABCMeta):
"""
raise NotImplementedError
async def create_message_card(self,message_id,event):
'''创建卡片消息'''
return False
async def is_muted(self, group_id: int) -> bool:
"""获取账号是否在指定群被禁言"""
raise NotImplementedError

View File

@@ -154,19 +154,15 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
)
incoming_message = event.incoming_message
msg_id = incoming_message.message_id
# msg_id = incoming_message.message_id
content, at = await DingTalkMessageConverter.yiri2target(message)
# is_stream = self.config['enable-stream-reply']
# print(content)
card_template_id = self.config['card_template_id']
if msg_id not in self.card_instance_id_dict:
card_instance,card_instance_id = await self.bot.create_and_card(card_template_id,incoming_message,at)
self.card_instance_id_dict[msg_id] = (card_instance,card_instance_id)
else:
card_instance,card_instance_id = self.card_instance_id_dict[msg_id]
card_instance,card_instance_id = self.card_instance_id_dict[message_id]
# print(card_instance_id)
await self.bot.send_card_message(card_instance,card_instance_id,content,is_final)
if is_final:
self.card_instance_id_dict.pop(message_id)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
@@ -180,15 +176,15 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
is_stream = False
if self.config.get("enable-stream-reply", None):
is_stream = True
self.is_stream = is_stream
return is_stream
async def create_message_card(self,message_id: str, incoming_message):
async def create_message_card(self,message_id,event):
card_template_id = self.config['card_template_id']
incoming_message = event.incoming_message
# message_id = incoming_message.message_id
card_instance, card_instance_id = await self.bot.create_and_card(card_template_id, incoming_message)
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
return True
def register_listener(

View File

@@ -360,6 +360,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
self.message_id_to_card_id = {}
self.card_id_dict = {}
self.seq = 1
self.card_id_time = {}
@self.quart_app.route('/lark/callback', methods=['POST'])
async def lark_callback():
@@ -405,101 +406,13 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
return {'code': 500, 'message': 'error'}
async def is_stream_output_supported() -> bool:
is_stream = False
if self.config.get("enable-stream-reply",None):
is_stream = True
self.is_stream = is_stream
return is_stream
async def create_card_id(message_id):
try:
is_stream = await is_stream_output_supported()
if is_stream:
self.ap.logger.debug('飞书支持stream输出,创建卡片......')
# card_id = ''
# # if self.card_id_dict:
# # card_id = [k for k,v in self.card_id_dict.items() if (v+datetime.timedelta(days=14))< datetime.datetime.now()][0]
#
# if self.card_id_dict is None:
# # content = {
# # "type": "card_json",
# # "data": {"schema":"2.0","header":{"title":{"content":"bot","tag":"plain_text"}},"body":{"elements":[{"tag":"markdown","content":""}]}}
# # }
# card_data = {"schema":"2.0","header":{"title":{"content":"bot","tag":"plain_text"}},
# "body":{"elements":[{"tag":"markdown","content":""}]},"config": {"streaming_mode": True,
# "streaming_config": {"print_strategy": "fast"}}}
#
# request: CreateCardRequest = CreateCardRequest.builder() \
# .request_body(
# CreateCardRequestBody.builder()
# .type("card_json")
# .data(json.dumps(card_data)) \
# .build()
# ).build()
#
# # 发起请求
# response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request)
#
#
# # 处理失败返回
# if not response.success():
# raise Exception(
# f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
#
# self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}')
# self.card_id_dict[response.data.card_id] = datetime.datetime.now()
#
# card_id = response.data.card_id
card_data = {"schema": "2.0", "header": {"title": {"content": "bot", "tag": "plain_text"}},
"body": {"elements": [{"tag": "markdown", "content": "[思考中.....]","element_id":"markdown_1"}]},
"config": {"streaming_mode": True,
"streaming_config": {"print_strategy": "delay"}}} # delay / fast
request: CreateCardRequest = CreateCardRequest.builder() \
.request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_data)) \
.build()
).build()
# 发起请求
response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request)
# 处理失败返回
if not response.success():
raise Exception(
f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}')
self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id
return card_id
except Exception as e:
self.ap.logger.error(f'飞书卡片创建失败,错误信息: {e}')
async def on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
if await is_stream_output_supported():
self.ap.logger.debug('卡片回复模式开启')
# 开启卡片回复模式. 这里可以实现飞书一发消息,马上创建卡片进行回复"思考中..."
card_id = await create_card_id(event.event.message.message_id)
reply_message_id = await self.create_message_card(card_id, event.event.message.message_id)
self.message_id_to_card_id[event.event.message.message_id] = (reply_message_id, time.time())
if len(self.message_id_to_card_id) > CARD_ID_CACHE_SIZE:
self.message_id_to_card_id = {
k: v
for k, v in self.message_id_to_card_id.items()
if v[1] > time.time() - CARD_ID_CACHE_MAX_LIFETIME
}
lb_event = await self.event_converter.target2yiri(event, self.api_client)
@@ -520,21 +433,64 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
pass
async def create_message_card(self, card_id: str, message_id: str) -> str:
async def is_stream_output_supported(self) -> bool:
is_stream = False
if self.config.get("enable-stream-reply", None):
is_stream = True
return is_stream
async def create_card_id(self,message_id):
try:
is_stream = await self.is_stream_output_supported()
if is_stream:
self.ap.logger.debug('飞书支持stream输出,创建卡片......')
card_data = {"schema": "2.0", "header": {"title": {"content": "bot", "tag": "plain_text"}},
"body": {"elements": [
{"tag": "markdown", "content": "[思考中.....]", "element_id": "markdown_1"}]},
"config": {"streaming_mode": True,
"streaming_config": {"print_strategy": "delay"}}} # delay / fast
request: CreateCardRequest = CreateCardRequest.builder() \
.request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_data)) \
.build()
).build()
# 发起请求
response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request)
# 处理失败返回
if not response.success():
raise Exception(
f"client.cardkit.v1.card.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
self.ap.logger.debug(f'飞书卡片创建成功,卡片ID: {response.data.card_id}')
self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id
return card_id
except Exception as e:
self.ap.logger.error(f'飞书卡片创建失败,错误信息: {e}')
async def create_message_card(self,message_id,event) -> str:
"""
创建卡片消息。
使用卡片消息是因为普通消息更新次数有限制,而大模型流式返回结果可能很多而超过限制,而飞书卡片没有这个限制
"""
# message_id = event.message_chain.message_id
# TODO 目前只支持卡片模板方式且卡片变量一定是content未来这块要做成可配置
# 发消息马上就会回复显示初始化的content信息即思考中
card_id = await self.create_card_id(message_id)
content = {
'type': 'card',
'data': {'card_id': card_id, 'template_variable': {'content': 'Thinking...'}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_id)
.message_id(event.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder().content(json.dumps(content)).msg_type('interactive').build()
)
@@ -549,7 +505,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
raise Exception(
f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
return response.data.message_id
return True
async def reply_message(
self,
@@ -633,6 +589,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
if is_final:
self.seq = 1
self.card_id_dict.pop(message_id)
# 发起请求
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request)

View File

@@ -127,6 +127,8 @@ class Message(pydantic.BaseModel):
class MessageChunk(pydantic.BaseModel):
"""消息"""
resp_message_id: typing.Optional[str] = None
"""消息id"""
role: str # user, system, assistant, tool, command, plugin
"""消息的角色"""

View File

@@ -96,7 +96,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.variables['conversation_id'] = cov_id
try:
is_stream = query.adapter.is_stream
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
@@ -209,7 +209,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.variables['conversation_id'] = cov_id
try:
is_stream = query.adapter.is_stream
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
@@ -346,7 +346,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.variables['conversation_id'] = query.session.using_conversation.uuid
try:
is_stream = query.adapter.is_stream
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False

View File

@@ -89,7 +89,9 @@ class LocalAgentRunner(runner.RequestRunner):
is_stream = query.adapter.is_stream_output_supported()
try:
is_stream = query.adapter.is_stream
# print(await query.adapter.is_stream_output_supported())
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# while True: