mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-25 11:29:39 +08:00
231 lines
8.6 KiB
Python
231 lines
8.6 KiB
Python
from __future__ import annotations
|
|
import typing
|
|
import asyncio
|
|
import traceback
|
|
import datetime
|
|
|
|
import aiocqhttp
|
|
|
|
from .. import adapter
|
|
from ...core import app
|
|
from ..types import message as platform_message
|
|
from ..types import events as platform_events
|
|
from ..types import entities as platform_entities
|
|
from ...utils import image
|
|
|
|
|
|
class AiocqhttpMessageConverter(adapter.MessageConverter):
|
|
@staticmethod
|
|
async def yiri2target(
|
|
message_chain: platform_message.MessageChain,
|
|
) -> typing.Tuple[list, int, datetime.datetime]:
|
|
msg_list = aiocqhttp.Message()
|
|
|
|
msg_id = 0
|
|
msg_time = None
|
|
|
|
for msg in message_chain:
|
|
if type(msg) is platform_message.Plain:
|
|
msg_list.append(aiocqhttp.MessageSegment.text(msg.text))
|
|
elif type(msg) is platform_message.Source:
|
|
msg_id = msg.id
|
|
msg_time = msg.time
|
|
elif type(msg) is platform_message.Image:
|
|
arg = ''
|
|
if msg.base64:
|
|
arg = msg.base64
|
|
msg_list.append(aiocqhttp.MessageSegment.image(f'base64://{arg}'))
|
|
elif msg.url:
|
|
arg = msg.url
|
|
msg_list.append(aiocqhttp.MessageSegment.image(arg))
|
|
elif msg.path:
|
|
arg = msg.path
|
|
msg_list.append(aiocqhttp.MessageSegment.image(arg))
|
|
elif type(msg) is platform_message.At:
|
|
msg_list.append(aiocqhttp.MessageSegment.at(msg.target))
|
|
elif type(msg) is platform_message.AtAll:
|
|
msg_list.append(aiocqhttp.MessageSegment.at('all'))
|
|
elif type(msg) is platform_message.Voice:
|
|
arg = ''
|
|
if msg.base64:
|
|
arg = msg.base64
|
|
msg_list.append(aiocqhttp.MessageSegment.record(f'base64://{arg}'))
|
|
elif msg.url:
|
|
arg = msg.url
|
|
msg_list.append(aiocqhttp.MessageSegment.record(arg))
|
|
elif msg.path:
|
|
arg = msg.path
|
|
msg_list.append(aiocqhttp.MessageSegment.record(msg.path))
|
|
elif type(msg) is platform_message.Forward:
|
|
for node in msg.node_list:
|
|
msg_list.extend((await AiocqhttpMessageConverter.yiri2target(node.message_chain))[0])
|
|
|
|
else:
|
|
msg_list.append(aiocqhttp.MessageSegment.text(str(msg)))
|
|
|
|
return msg_list, msg_id, msg_time
|
|
|
|
@staticmethod
|
|
async def target2yiri(message: str, message_id: int = -1):
|
|
message = aiocqhttp.Message(message)
|
|
|
|
yiri_msg_list = []
|
|
|
|
yiri_msg_list.append(platform_message.Source(id=message_id, time=datetime.datetime.now()))
|
|
|
|
for msg in message:
|
|
if msg.type == 'at':
|
|
if msg.data['qq'] == 'all':
|
|
yiri_msg_list.append(platform_message.AtAll())
|
|
else:
|
|
yiri_msg_list.append(
|
|
platform_message.At(
|
|
target=msg.data['qq'],
|
|
)
|
|
)
|
|
elif msg.type == 'text':
|
|
yiri_msg_list.append(platform_message.Plain(text=msg.data['text']))
|
|
elif msg.type == 'image':
|
|
image_base64, image_format = await image.qq_image_url_to_base64(msg.data['url'])
|
|
yiri_msg_list.append(platform_message.Image(base64=f'data:image/{image_format};base64,{image_base64}'))
|
|
|
|
chain = platform_message.MessageChain(yiri_msg_list)
|
|
|
|
return chain
|
|
|
|
|
|
class AiocqhttpEventConverter(adapter.EventConverter):
|
|
@staticmethod
|
|
async def yiri2target(event: platform_events.MessageEvent, bot_account_id: int):
|
|
return event.source_platform_object
|
|
|
|
@staticmethod
|
|
async def target2yiri(event: aiocqhttp.Event):
|
|
yiri_chain = await AiocqhttpMessageConverter.target2yiri(event.message, event.message_id)
|
|
|
|
if event.message_type == 'group':
|
|
permission = 'MEMBER'
|
|
|
|
if 'role' in event.sender:
|
|
if event.sender['role'] == 'admin':
|
|
permission = 'ADMINISTRATOR'
|
|
elif event.sender['role'] == 'owner':
|
|
permission = 'OWNER'
|
|
converted_event = platform_events.GroupMessage(
|
|
sender=platform_entities.GroupMember(
|
|
id=event.sender['user_id'], # message_seq 放哪?
|
|
member_name=event.sender['nickname'],
|
|
permission=permission,
|
|
group=platform_entities.Group(
|
|
id=event.group_id,
|
|
name=event.sender['nickname'],
|
|
permission=platform_entities.Permission.Member,
|
|
),
|
|
special_title=event.sender['title'] if 'title' in event.sender else '',
|
|
join_timestamp=0,
|
|
last_speak_timestamp=0,
|
|
mute_time_remaining=0,
|
|
),
|
|
message_chain=yiri_chain,
|
|
time=event.time,
|
|
source_platform_object=event,
|
|
)
|
|
return converted_event
|
|
elif event.message_type == 'private':
|
|
return platform_events.FriendMessage(
|
|
sender=platform_entities.Friend(
|
|
id=event.sender['user_id'],
|
|
nickname=event.sender['nickname'],
|
|
remark='',
|
|
),
|
|
message_chain=yiri_chain,
|
|
time=event.time,
|
|
source_platform_object=event,
|
|
)
|
|
|
|
|
|
class AiocqhttpAdapter(adapter.MessagePlatformAdapter):
|
|
bot: aiocqhttp.CQHttp
|
|
|
|
bot_account_id: int
|
|
|
|
message_converter: AiocqhttpMessageConverter = AiocqhttpMessageConverter()
|
|
event_converter: AiocqhttpEventConverter = AiocqhttpEventConverter()
|
|
|
|
config: dict
|
|
|
|
ap: app.Application
|
|
|
|
def __init__(self, config: dict, ap: app.Application):
|
|
self.config = config
|
|
|
|
async def shutdown_trigger_placeholder():
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
self.config['shutdown_trigger'] = shutdown_trigger_placeholder
|
|
|
|
self.ap = ap
|
|
|
|
if 'access-token' in config:
|
|
self.bot = aiocqhttp.CQHttp(access_token=config['access-token'])
|
|
del self.config['access-token']
|
|
else:
|
|
self.bot = aiocqhttp.CQHttp()
|
|
|
|
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
|
|
aiocq_msg = (await AiocqhttpMessageConverter.yiri2target(message))[0]
|
|
|
|
if target_type == 'group':
|
|
await self.bot.send_group_msg(group_id=int(target_id), message=aiocq_msg)
|
|
elif target_type == 'person':
|
|
await self.bot.send_private_msg(user_id=int(target_id), message=aiocq_msg)
|
|
|
|
async def reply_message(
|
|
self,
|
|
message_source: platform_events.MessageEvent,
|
|
message: platform_message.MessageChain,
|
|
quote_origin: bool = False,
|
|
):
|
|
aiocq_event = await AiocqhttpEventConverter.yiri2target(message_source, self.bot_account_id)
|
|
aiocq_msg = (await AiocqhttpMessageConverter.yiri2target(message))[0]
|
|
if quote_origin:
|
|
aiocq_msg = aiocqhttp.MessageSegment.reply(aiocq_event.message_id) + aiocq_msg
|
|
|
|
return await self.bot.send(aiocq_event, aiocq_msg)
|
|
|
|
async def is_muted(self, group_id: int) -> bool:
|
|
return False
|
|
|
|
def register_listener(
|
|
self,
|
|
event_type: typing.Type[platform_events.Event],
|
|
callback: typing.Callable[[platform_events.Event, adapter.MessagePlatformAdapter], None],
|
|
):
|
|
async def on_message(event: aiocqhttp.Event):
|
|
self.bot_account_id = event.self_id
|
|
try:
|
|
return await callback(await self.event_converter.target2yiri(event), self)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
if event_type == platform_events.GroupMessage:
|
|
self.bot.on_message('group')(on_message)
|
|
elif event_type == platform_events.FriendMessage:
|
|
self.bot.on_message('private')(on_message)
|
|
|
|
def unregister_listener(
|
|
self,
|
|
event_type: typing.Type[platform_events.Event],
|
|
callback: typing.Callable[[platform_events.Event, adapter.MessagePlatformAdapter], None],
|
|
):
|
|
return super().unregister_listener(event_type, callback)
|
|
|
|
async def run_async(self):
|
|
await self.bot._server_app.run_task(**self.config)
|
|
|
|
async def kill(self) -> bool:
|
|
# Current issue: existing connection will not be closed
|
|
# self.should_shutdown = True
|
|
return False
|