Files
LangBot/pkg/platform/sources/qqbotpy.py
2024-09-26 13:01:45 +08:00

596 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
import typing
import datetime
import re
import traceback
import botpy
import botpy.message as botpy_message
import botpy.types.message as botpy_message_type
import pydantic
import pydantic.networks
from .. import adapter as adapter_model
from ...pipeline.longtext.strategies import forward
from ...core import app
from ...config import manager as cfg_mgr
from ...platform.types import entities as platform_entities
from ...platform.types import events as platform_events
from ...platform.types import message as platform_message
class OfficialGroupMessage(platform_events.GroupMessage):
pass
class OfficialFriendMessage(platform_events.FriendMessage):
pass
event_handler_mapping = {
platform_events.GroupMessage: ["on_at_message_create", "on_group_at_message_create"],
platform_events.FriendMessage: ["on_direct_message_create", "on_c2c_message_create"],
}
cached_message_ids = {}
"""由于QQ官方的消息id是字符串而YiriMirai的消息id是整数所以需要一个索引来进行转换"""
id_index = 0
def save_msg_id(message_id: str) -> int:
"""保存消息id"""
global id_index, cached_message_ids
crt_index = id_index
id_index += 1
cached_message_ids[str(crt_index)] = message_id
return crt_index
def char_to_value(char):
"""将单个字符转换为相应的数值。"""
if '0' <= char <= '9':
return ord(char) - ord('0')
elif 'A' <= char <= 'Z':
return ord(char) - ord('A') + 10
return ord(char) - ord('a') + 36
def digest(s: str) -> int:
"""计算字符串的hash值。"""
# 取末尾的8位
sub_s = s[-10:]
number = 0
base = 36
for i in range(len(sub_s)):
number = number * base + char_to_value(sub_s[i])
return number
K = typing.TypeVar("K")
V = typing.TypeVar("V")
class OpenIDMapping(typing.Generic[K, V]):
map: dict[K, V]
dump_func: typing.Callable
digest_func: typing.Callable[[K], V]
def __init__(self, map: dict[K, V], dump_func: typing.Callable, digest_func: typing.Callable[[K], V] = digest):
self.map = map
self.dump_func = dump_func
self.digest_func = digest_func
def __getitem__(self, key: K) -> V:
return self.map[key]
def __setitem__(self, key: K, value: V):
self.map[key] = value
self.dump_func()
def __contains__(self, key: K) -> bool:
return key in self.map
def __delitem__(self, key: K):
del self.map[key]
self.dump_func()
def getkey(self, value: V) -> K:
return list(self.map.keys())[list(self.map.values()).index(value)]
def save_openid(self, key: K) -> V:
if key in self.map:
return self.map[key]
value = self.digest_func(key)
self.map[key] = value
self.dump_func()
return value
class OfficialMessageConverter(adapter_model.MessageConverter):
"""QQ 官方消息转换器"""
@staticmethod
def yiri2target(message_chain: platform_message.MessageChain):
"""将 YiriMirai 的消息链转换为 QQ 官方消息"""
msg_list = []
if type(message_chain) is platform_message.MessageChain:
msg_list = message_chain.__root__
elif type(message_chain) is list:
msg_list = message_chain
elif type(message_chain) is str:
msg_list = [platform_message.Plain(text=message_chain)]
else:
raise Exception(
"Unknown message type: " + str(message_chain) + str(type(message_chain))
)
offcial_messages: list[dict] = []
"""
{
"type": "text",
"content": "Hello World!"
}
{
"type": "image",
"content": "https://example.com/example.jpg"
}
"""
# 遍历并转换
for component in msg_list:
if type(component) is platform_message.Plain:
offcial_messages.append({"type": "text", "content": component.text})
elif type(component) is platform_message.Image:
if component.url is not None:
offcial_messages.append({"type": "image", "content": component.url})
elif component.path is not None:
offcial_messages.append(
{"type": "file_image", "content": component.path}
)
elif type(component) is platform_message.At:
offcial_messages.append({"type": "at", "content": ""})
elif type(component) is platform_message.AtAll:
print(
"上层组件要求发送 AtAll 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。"
)
elif type(component) is platform_message.Voice:
print(
"上层组件要求发送 Voice 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。"
)
elif type(component) is forward.Forward:
# 转发消息
yiri_forward_node_list = component.node_list
# 遍历并转换
for yiri_forward_node in yiri_forward_node_list:
try:
message_chain = yiri_forward_node.message_chain
# 平铺
offcial_messages.extend(
OfficialMessageConverter.yiri2target(message_chain)
)
except Exception as e:
import traceback
traceback.print_exc()
return offcial_messages
@staticmethod
def extract_message_chain_from_obj(
message: typing.Union[botpy_message.Message, botpy_message.DirectMessage, botpy_message.GroupMessage, botpy_message.C2CMessage],
message_id: str = None,
bot_account_id: int = 0,
) -> platform_message.MessageChain:
yiri_msg_list = []
# 存id
yiri_msg_list.append(
platform_message.Source(
id=save_msg_id(message_id), time=datetime.datetime.now()
)
)
if type(message) not in [botpy_message.DirectMessage, botpy_message.C2CMessage]:
yiri_msg_list.append(platform_message.At(target=bot_account_id))
if hasattr(message, "mentions"):
for mention in message.mentions:
if mention.bot:
continue
yiri_msg_list.append(platform_message.At(target=mention.id))
for attachment in message.attachments:
if attachment.content_type.startswith("image"):
yiri_msg_list.append(platform_message.Image(url=attachment.url))
else:
logging.warning(
"不支持的附件类型:" + attachment.content_type + ",忽略此附件。"
)
content = re.sub(r"<@!\d+>", "", str(message.content))
if content.strip() != "":
yiri_msg_list.append(platform_message.Plain(text=content))
chain = platform_message.MessageChain(yiri_msg_list)
return chain
class OfficialEventConverter(adapter_model.EventConverter):
"""事件转换器"""
member_openid_mapping: OpenIDMapping[str, int]
group_openid_mapping: OpenIDMapping[str, int]
def __init__(self, member_openid_mapping: OpenIDMapping[str, int], group_openid_mapping: OpenIDMapping[str, int]):
self.member_openid_mapping = member_openid_mapping
self.group_openid_mapping = group_openid_mapping
def yiri2target(self, event: typing.Type[platform_events.Event]):
if event == platform_events.GroupMessage:
return botpy_message.Message
elif event == platform_events.FriendMessage:
return botpy_message.DirectMessage
else:
raise Exception(
"未支持转换的事件类型(YiriMirai -> Official): " + str(event)
)
def target2yiri(
self,
event: typing.Union[botpy_message.Message, botpy_message.DirectMessage, botpy_message.GroupMessage, botpy_message.C2CMessage],
) -> platform_events.Event:
if type(event) == botpy_message.Message: # 频道内,转群聊事件
permission = "MEMBER"
if "2" in event.member.roles:
permission = "ADMINISTRATOR"
elif "4" in event.member.roles:
permission = "OWNER"
return platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=event.author.id,
member_name=event.author.username,
permission=permission,
group=platform_entities.Group(
id=event.channel_id,
name=event.author.username,
permission=platform_entities.Permission.Member,
),
special_title="",
join_timestamp=int(
datetime.datetime.strptime(
event.member.joined_at, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.DirectMessage: # 频道私聊,转私聊事件
return platform_events.FriendMessage(
sender=platform_entities.Friend(
id=event.guild_id,
nickname=event.author.username,
remark=event.author.username,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.GroupMessage: # 群聊,转群聊事件
replacing_member_id = self.member_openid_mapping.save_openid(event.author.member_openid)
return OfficialGroupMessage(
sender=platform_entities.GroupMember(
id=replacing_member_id,
member_name=replacing_member_id,
permission="MEMBER",
group=platform_entities.Group(
id=self.group_openid_mapping.save_openid(event.group_openid),
name=replacing_member_id,
permission=platform_entities.Permission.Member,
),
special_title="",
join_timestamp=int(0),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.C2CMessage: # 私聊,转私聊事件
user_id_alter = self.member_openid_mapping.save_openid(event.author.user_openid) # 实测这里的user_openid与group的member_openid是一样的
return OfficialFriendMessage(
sender=platform_entities.Friend(
id=user_id_alter,
nickname=user_id_alter,
remark=user_id_alter,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
@adapter_model.adapter_class("qq-botpy")
class OfficialAdapter(adapter_model.MessageSourceAdapter):
"""QQ 官方消息适配器"""
bot: botpy.Client = None
bot_account_id: int = 0
message_converter: OfficialMessageConverter
event_converter: OfficialEventConverter
cfg: dict = None
cached_official_messages: dict = {}
"""缓存的 qq-botpy 框架消息对象
message_id: botpy_message.Message | botpy_message.DirectMessage
"""
ap: app.Application
metadata: cfg_mgr.ConfigManager = None
member_openid_mapping: OpenIDMapping[str, int] = None
group_openid_mapping: OpenIDMapping[str, int] = None
group_msg_seq = None
c2c_msg_seq = None
def __init__(self, cfg: dict, ap: app.Application):
"""初始化适配器"""
self.cfg = cfg
self.ap = ap
self.group_msg_seq = 1
self.c2c_msg_seq = 1
switchs = {}
for intent in cfg["intents"]:
switchs[intent] = True
del cfg["intents"]
intents = botpy.Intents(**switchs)
self.bot = botpy.Client(intents=intents)
async def send_message(
self, target_type: str, target_id: str, message: platform_message.MessageChain
):
message_list = self.message_converter.yiri2target(message)
for msg in message_list:
args = {}
if msg["type"] == "text":
args["content"] = msg["content"]
elif msg["type"] == "image":
args["image"] = msg["content"]
elif msg["type"] == "file_image":
args["file_image"] = msg["content"]
else:
continue
if target_type == "group":
args["channel_id"] = str(target_id)
await self.bot.api.post_message(**args)
elif target_type == "person":
args["guild_id"] = str(target_id)
await self.bot.api.post_dms(**args)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
message_list = self.message_converter.yiri2target(message)
for msg in message_list:
args = {}
if msg["type"] == "text":
args["content"] = msg["content"]
elif msg["type"] == "image":
args["image"] = msg["content"]
elif msg["type"] == "file_image":
args["file_image"] = msg["content"]
else:
continue
if quote_origin:
args["message_reference"] = botpy_message_type.Reference(
message_id=cached_message_ids[
str(message_source.message_chain.message_id)
]
)
if type(message_source) == platform_events.GroupMessage:
args["channel_id"] = str(message_source.sender.group.id)
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
await self.bot.api.post_message(**args)
elif type(message_source) == platform_events.FriendMessage:
args["guild_id"] = str(message_source.sender.id)
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
await self.bot.api.post_dms(**args)
elif type(message_source) == OfficialGroupMessage:
if "file_image" in args: # 暂不支持发送文件图片
continue
args["group_openid"] = self.group_openid_mapping.getkey(
message_source.sender.group.id
)
if "image" in args:
uploadMedia = await self.bot.api.post_group_file(
group_openid=args["group_openid"],
file_type=1,
url=str(args['image'])
)
del args['image']
args['media'] = uploadMedia
args['msg_type'] = 7
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
args["msg_seq"] = self.group_msg_seq
self.group_msg_seq += 1
await self.bot.api.post_group_message(**args)
elif type(message_source) == OfficialFriendMessage:
if "file_image" in args:
continue
args["openid"] = self.member_openid_mapping.getkey(
message_source.sender.id
)
if "image" in args:
uploadMedia = await self.bot.api.post_c2c_file(
openid=args["openid"],
file_type=1,
url=str(args['image'])
)
del args['image']
args['media'] = uploadMedia
args['msg_type'] = 7
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
args["msg_seq"] = self.c2c_msg_seq
self.c2c_msg_seq += 1
await self.bot.api.post_c2c_message(**args)
async def is_muted(self, group_id: int) -> bool:
return False
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, adapter_model.MessageSourceAdapter], None
],
):
try:
async def wrapper(
message: typing.Union[
botpy_message.Message,
botpy_message.DirectMessage,
botpy_message.GroupMessage,
]
):
self.cached_official_messages[str(message.id)] = message
await callback(self.event_converter.target2yiri(message), self)
for event_handler in event_handler_mapping[event_type]:
setattr(self.bot, event_handler, wrapper)
except Exception as e:
traceback.print_exc()
raise e
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, adapter_model.MessageSourceAdapter], None
],
):
delattr(self.bot, event_handler_mapping[event_type])
async def run_async(self):
self.metadata = self.ap.adapter_qq_botpy_meta
self.member_openid_mapping = OpenIDMapping(
map=self.metadata.data["mapping"]["members"],
dump_func=self.metadata.dump_config_sync,
)
self.group_openid_mapping = OpenIDMapping(
map=self.metadata.data["mapping"]["groups"],
dump_func=self.metadata.dump_config_sync,
)
self.message_converter = OfficialMessageConverter()
self.event_converter = OfficialEventConverter(
self.member_openid_mapping, self.group_openid_mapping
)
self.ap.logger.info("运行 QQ 官方适配器")
await self.bot.start(**self.cfg)
def kill(self) -> bool:
return False