Files
LangBot/pkg/platform/sources/dingtalk.py
2025-06-30 21:49:59 +08:00

180 lines
6.8 KiB
Python

import traceback
import typing
from libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
from libs.dingtalk_api.api import DingTalkClient
import datetime
from ..logger import EventLogger
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain):
content = ''
at = False
for msg in message_chain:
if type(msg) is platform_message.At:
at = True
if type(msg) is platform_message.Plain:
content += msg.text
return content, at
@staticmethod
async def target2yiri(event: DingTalkEvent, bot_name: str):
yiri_msg_list = []
yiri_msg_list.append(
platform_message.Source(id=event.incoming_message.message_id, time=datetime.datetime.now())
)
for atUser in event.incoming_message.at_users:
if atUser.dingtalk_id == event.incoming_message.chatbot_user_id:
yiri_msg_list.append(platform_message.At(target=bot_name))
if event.content:
text_content = event.content.replace('@' + bot_name, '')
yiri_msg_list.append(platform_message.Plain(text=text_content))
if event.picture:
yiri_msg_list.append(platform_message.Image(base64=event.picture))
if event.audio:
yiri_msg_list.append(platform_message.Voice(base64=event.audio))
chain = platform_message.MessageChain(yiri_msg_list)
return chain
class DingTalkEventConverter(abstract_platform_adapter.AbstractEventConverter):
@staticmethod
async def yiri2target(event: platform_events.MessageEvent):
return event.source_platform_object
@staticmethod
async def target2yiri(event: DingTalkEvent, bot_name: str):
message_chain = await DingTalkMessageConverter.target2yiri(event, bot_name)
if event.conversation == 'FriendMessage':
return platform_events.FriendMessage(
sender=platform_entities.Friend(
id=event.incoming_message.sender_id,
nickname=event.incoming_message.sender_nick,
remark='',
),
message_chain=message_chain,
time=event.incoming_message.create_at,
source_platform_object=event,
)
elif event.conversation == 'GroupMessage':
sender = platform_entities.GroupMember(
id=event.incoming_message.sender_id,
member_name=event.incoming_message.sender_nick,
permission='MEMBER',
group=platform_entities.Group(
id=event.incoming_message.conversation_id,
name=event.incoming_message.conversation_title,
permission=platform_entities.Permission.Member,
),
special_title='',
join_timestamp=0,
last_speak_timestamp=0,
mute_time_remaining=0,
)
time = event.incoming_message.create_at
return platform_events.GroupMessage(
sender=sender,
message_chain=message_chain,
time=time,
source_platform_object=event,
)
class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: DingTalkClient
bot_account_id: str
message_converter: DingTalkMessageConverter = DingTalkMessageConverter()
event_converter: DingTalkEventConverter = DingTalkEventConverter()
config: dict
def __init__(self, config: dict, logger: EventLogger):
self.config = config
self.logger = logger
required_keys = [
'client_id',
'client_secret',
'robot_name',
'robot_code',
]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise Exception('钉钉缺少相关配置项,请查看文档或联系管理员')
self.bot_account_id = self.config['robot_name']
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
event = await DingTalkEventConverter.yiri2target(
message_source,
)
incoming_message = event.incoming_message
content, at = await DingTalkMessageConverter.yiri2target(message)
await self.bot.send_message(content, incoming_message, at)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
content = await DingTalkMessageConverter.yiri2target(message)
if target_type == 'person':
await self.bot.send_proactive_message_to_one(target_id, content)
if target_type == 'group':
await self.bot.send_proactive_message_to_group(target_id, content)
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
async def on_message(event: DingTalkEvent):
try:
return await callback(
await self.event_converter.target2yiri(event, self.config['robot_name']),
self,
)
except Exception:
await self.logger.error(f'Error in dingtalk callback: {traceback.format_exc()}')
if event_type == platform_events.FriendMessage:
self.bot.on_message('FriendMessage')(on_message)
elif event_type == platform_events.GroupMessage:
self.bot.on_message('GroupMessage')(on_message)
async def run_async(self):
config = self.config
self.bot = DingTalkClient(
client_id=config['client_id'],
client_secret=config['client_secret'],
robot_name=config['robot_name'],
robot_code=config['robot_code'],
markdown_card=config['markdown_card'],
logger=self.logger,
)
await self.bot.start()
async def kill(self) -> bool:
return False
async def unregister_listener(
self,
event_type: type,
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
return super().unregister_listener(event_type, callback)