from __future__ import annotations import typing import asyncio import traceback import time import datetime import aiocqhttp import aiohttp from .. import adapter from ...pipeline.longtext.strategies import forward from ...core import app from ..types import message as platform_message from ..types import events as platform_events from ..types import entities as platform_entities from ...utils import image class AiocqhttpMessageConverter(adapter.MessageConverter): @staticmethod async def yiri2target(message_chain: platform_message.MessageChain) -> typing.Tuple[list, int, datetime.datetime]: msg_list = aiocqhttp.Message() msg_id = 0 msg_time = None for msg in message_chain: if type(msg) is platform_message.Plain: msg_list.append(aiocqhttp.MessageSegment.text(msg.text)) elif type(msg) is platform_message.Source: msg_id = msg.id msg_time = msg.time elif type(msg) is platform_message.Image: arg = '' if msg.base64: arg = msg.base64 msg_list.append(aiocqhttp.MessageSegment.image(f"base64://{arg}")) elif msg.url: arg = msg.url msg_list.append(aiocqhttp.MessageSegment.image(arg)) elif msg.path: arg = msg.path msg_list.append(aiocqhttp.MessageSegment.image(arg)) elif type(msg) is platform_message.At: msg_list.append(aiocqhttp.MessageSegment.at(msg.target)) elif type(msg) is platform_message.AtAll: msg_list.append(aiocqhttp.MessageSegment.at("all")) elif type(msg) is platform_message.Voice: arg = '' if msg.base64: arg = msg.base64 msg_list.append(aiocqhttp.MessageSegment.record(f"base64://{arg}")) elif msg.url: arg = msg.url msg_list.append(aiocqhttp.MessageSegment.record(arg)) elif msg.path: arg = msg.path msg_list.append(aiocqhttp.MessageSegment.record(msg.path)) elif type(msg) is forward.Forward: for node in msg.node_list: msg_list.extend((await AiocqhttpMessageConverter.yiri2target(node.message_chain))[0]) else: msg_list.append(aiocqhttp.MessageSegment.text(str(msg))) return msg_list, msg_id, msg_time @staticmethod async def target2yiri(message: str, message_id: int = -1): message = aiocqhttp.Message(message) yiri_msg_list = [] yiri_msg_list.append( platform_message.Source(id=message_id, time=datetime.datetime.now()) ) for msg in message: if msg.type == "at": if msg.data["qq"] == "all": yiri_msg_list.append(platform_message.AtAll()) else: yiri_msg_list.append( platform_message.At( target=msg.data["qq"], ) ) elif msg.type == "text": yiri_msg_list.append(platform_message.Plain(text=msg.data["text"])) elif msg.type == "image": image_base64, image_format = await image.qq_image_url_to_base64(msg.data['url']) yiri_msg_list.append(platform_message.Image(base64=f"data:image/{image_format};base64,{image_base64}")) chain = platform_message.MessageChain(yiri_msg_list) return chain class AiocqhttpEventConverter(adapter.EventConverter): @staticmethod async def yiri2target(event: platform_events.Event, bot_account_id: int): msg, msg_id, msg_time = await AiocqhttpMessageConverter.yiri2target(event.message_chain) if type(event) is platform_events.GroupMessage: role = "member" if event.sender.permission == "ADMINISTRATOR": role = "admin" elif event.sender.permission == "OWNER": role = "owner" payload = { "post_type": "message", "message_type": "group", "time": int(msg_time.timestamp()), "self_id": bot_account_id, "sub_type": "normal", "anonymous": None, "font": 0, "message": str(msg), "raw_message": str(msg), "sender": { "age": 0, "area": "", "card": "", "level": "", "nickname": event.sender.member_name, "role": role, "sex": "unknown", "title": "", "user_id": event.sender.id, }, "user_id": event.sender.id, "message_id": msg_id, "group_id": event.group.id, "message_seq": 0, } return aiocqhttp.Event.from_payload(payload) elif type(event) is platform_events.FriendMessage: payload = { "post_type": "message", "message_type": "private", "time": int(msg_time.timestamp()), "self_id": bot_account_id, "sub_type": "friend", "target_id": bot_account_id, "message": str(msg), "raw_message": str(msg), "font": 0, "sender": { "age": 0, "nickname": event.sender.nickname, "sex": "unknown", "user_id": event.sender.id, }, "message_id": msg_id, "user_id": event.sender.id, } return aiocqhttp.Event.from_payload(payload) @staticmethod async def target2yiri(event: aiocqhttp.Event): yiri_chain = await AiocqhttpMessageConverter.target2yiri( event.message, event.message_id ) if event.message_type == "group": permission = "MEMBER" if "role" in event.sender: if event.sender["role"] == "admin": permission = "ADMINISTRATOR" elif event.sender["role"] == "owner": permission = "OWNER" converted_event = platform_events.GroupMessage( sender=platform_entities.GroupMember( id=event.sender["user_id"], # message_seq 放哪? member_name=event.sender["nickname"], permission=permission, group=platform_entities.Group( id=event.group_id, name=event.sender["nickname"], permission=platform_entities.Permission.Member, ), special_title=event.sender["title"] if "title" in event.sender else "", join_timestamp=0, last_speak_timestamp=0, mute_time_remaining=0, ), message_chain=yiri_chain, time=event.time, ) return converted_event elif event.message_type == "private": return platform_events.FriendMessage( sender=platform_entities.Friend( id=event.sender["user_id"], nickname=event.sender["nickname"], remark="", ), message_chain=yiri_chain, time=event.time, ) class AiocqhttpAdapter(adapter.MessagePlatformAdapter): bot: aiocqhttp.CQHttp bot_account_id: int message_converter: AiocqhttpMessageConverter = AiocqhttpMessageConverter() event_converter: AiocqhttpEventConverter = AiocqhttpEventConverter() config: dict ap: app.Application def __init__(self, config: dict, ap: app.Application): self.config = config async def shutdown_trigger_placeholder(): while True: await asyncio.sleep(1) self.config['shutdown_trigger'] = shutdown_trigger_placeholder self.ap = ap if "access-token" in config: self.bot = aiocqhttp.CQHttp(access_token=config["access-token"]) del self.config["access-token"] else: self.bot = aiocqhttp.CQHttp() async def send_message( self, target_type: str, target_id: str, message: platform_message.MessageChain ): aiocq_msg = (await AiocqhttpMessageConverter.yiri2target(message))[0] if target_type == "group": await self.bot.send_group_msg(group_id=int(target_id), message=aiocq_msg) elif target_type == "person": await self.bot.send_private_msg(user_id=int(target_id), message=aiocq_msg) async def reply_message( self, message_source: platform_events.MessageEvent, message: platform_message.MessageChain, quote_origin: bool = False, ): aiocq_event = await AiocqhttpEventConverter.yiri2target(message_source, self.bot_account_id) aiocq_msg = (await AiocqhttpMessageConverter.yiri2target(message))[0] if quote_origin: aiocq_msg = aiocqhttp.MessageSegment.reply(aiocq_event.message_id) + aiocq_msg return await self.bot.send( aiocq_event, aiocq_msg ) async def is_muted(self, group_id: int) -> bool: return False def register_listener( self, event_type: typing.Type[platform_events.Event], callback: typing.Callable[[platform_events.Event, adapter.MessagePlatformAdapter], None], ): async def on_message(event: aiocqhttp.Event): self.bot_account_id = event.self_id try: return await callback(await self.event_converter.target2yiri(event), self) except: traceback.print_exc() if event_type == platform_events.GroupMessage: self.bot.on_message("group")(on_message) elif event_type == platform_events.FriendMessage: self.bot.on_message("private")(on_message) def unregister_listener( self, event_type: typing.Type[platform_events.Event], callback: typing.Callable[[platform_events.Event, adapter.MessagePlatformAdapter], None], ): return super().unregister_listener(event_type, callback) async def run_async(self): await self.bot._server_app.run_task(**self.config) async def kill(self) -> bool: return False