fix: add support for webhook qq official

This commit is contained in:
wangcham
2025-02-04 06:35:51 -05:00
parent e1e21c0063
commit 43ea64befa
13 changed files with 669 additions and 614 deletions

2
.gitignore vendored
View File

@@ -37,4 +37,4 @@ botpy.log*
/poc
/libs/wecom_api/test.py
/venv
/jp-tyo-churros-05.rockchin.top

View File

274
libs/qq_official_api/api.py Normal file
View File

@@ -0,0 +1,274 @@
import time
from quart import request
import base64
import binascii
import httpx
from quart import Quart
import xml.etree.ElementTree as ET
from typing import Callable, Dict, Any
from pkg.platform.types import events as platform_events, message as platform_message
import aiofiles
from .qqofficialevent import QQOfficialEvent
import json
import hmac
import base64
import hashlib
import traceback
from cryptography.hazmat.primitives.asymmetric import ed25519
from .qqofficialevent import QQOfficialEvent
def handle_validation(body: dict, bot_secret: str):
# bot正确的secert是32位的此处仅为了适配演示demo
while len(bot_secret) < 32:
bot_secret = bot_secret * 2
bot_secret = bot_secret[:32]
# 实际使用场景中以上三行内容可清除
seed_bytes = bot_secret.encode()
signing_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed_bytes)
msg = body['d']['event_ts'] + body['d']['plain_token']
msg_bytes = msg.encode()
signature = signing_key.sign(msg_bytes)
signature_hex = signature.hex()
response = {
"plain_token": body['d']['plain_token'],
"signature": signature_hex
}
return response
class QQOfficialClient:
def __init__(self, secret: str, token: str, app_id: str):
self.app = Quart(__name__)
self.app.add_url_rule(
"/callback/command",
"handle_callback",
self.handle_callback_request,
methods=["GET", "POST"],
)
self.secret = secret
self.token = token
self.app_id = app_id
self._message_handlers = {
}
self.base_url = "https://api.sgroup.qq.com"
self.access_token = ""
self.access_token_expiry_time = None
async def check_access_token(self):
"""检查access_token是否存在"""
if not self.access_token or await self.is_token_expired():
return False
return bool(self.access_token and self.access_token.strip())
async def get_access_token(self):
"""获取access_token"""
url = "https://bots.qq.com/app/getAppAccessToken"
async with httpx.AsyncClient() as client:
params = {
"appId":self.app_id,
"clientSecret":self.secret,
}
headers = {
"content-type":"application/json",
}
try:
response = await client.post(url,json=params,headers=headers)
if response.status_code == 200:
response_data = response.json()
access_token = response_data.get("access_token")
expires_in = int(response_data.get("expires_in",7200))
self.access_token_expiry_time = time.time() + expires_in - 60
if access_token:
self.access_token = access_token
except Exception as e:
raise Exception(f"获取access_token失败: {e}")
async def handle_callback_request(self):
"""处理回调请求"""
try:
# 读取请求数据
body = await request.get_data()
payload = json.loads(body)
# 验证是否为回调验证请求
if payload.get("op") == 13:
# 生成签名
response = handle_validation(payload, self.secret)
return response
if payload.get("op") == 0:
message_data = await self.get_message(payload)
if message_data:
event = QQOfficialEvent.from_payload(message_data)
await self._handle_message(event)
return {"code": 0, "message": "success"}
except Exception as e:
traceback.print_exc()
return {"error": str(e)}, 400
async def run_task(self, host: str, port: int, *args, **kwargs):
"""启动 Quart 应用"""
await self.app.run_task(host=host, port=port, *args, **kwargs)
def on_message(self, msg_type: str):
"""注册消息类型处理器"""
def decorator(func: Callable[[platform_events.Event], None]):
if msg_type not in self._message_handlers:
self._message_handlers[msg_type] = []
self._message_handlers[msg_type].append(func)
return func
return decorator
async def _handle_message(self, event:QQOfficialEvent):
"""处理消息事件"""
msg_type = event.t
if msg_type in self._message_handlers:
for handler in self._message_handlers[msg_type]:
await handler(event)
async def get_message(self,msg:dict) -> Dict[str,Any]:
"""获取消息"""
message_data = {
"t": msg.get("t",{}),
"user_openid": msg.get("d",{}).get("author",{}).get("user_openid",{}),
"timestamp": msg.get("d",{}).get("timestamp",{}),
"d_author_id": msg.get("d",{}).get("author",{}).get("id",{}),
"content": msg.get("d",{}).get("content",{}),
"d_id": msg.get("d",{}).get("id",{}),
"id": msg.get("id",{}),
"channel_id": msg.get("d",{}).get("channel_id",{}),
"username": msg.get("d",{}).get("author",{}).get("username",{}),
"guild_id": msg.get("d",{}).get("guild_id",{}),
"member_openid": msg.get("d",{}).get("author",{}).get("openid",{}),
"group_openid": msg.get("d",{}).get("group_openid",{})
}
attachments = msg.get("d", {}).get("attachments", [])
image_attachments = [attachment['url'] for attachment in attachments if await self.is_image(attachment)]
image_attachments_type = [attachment['content_type'] for attachment in attachments if await self.is_image(attachment)]
if image_attachments:
message_data["image_attachments"] = image_attachments[0]
message_data["content_type"] = image_attachments_type[0]
else:
message_data["image_attachments"] = None
return message_data
async def is_image(self,attachment:dict) -> bool:
"""判断是否为图片附件"""
content_type = attachment.get("content_type","")
return content_type.startswith("image/")
async def send_private_text_msg(self,user_openid:str,content:str,msg_id:str):
"""发送私聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/v2/users/" + user_openid + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
data = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=data)
if response.status_code == 200:
return
else:
raise ValueError(response)
async def send_group_text_msg(self,group_openid:str,content:str,msg_id:str):
"""发送群聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/v2/groups/" + group_openid + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
data = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=data)
if response.status_code == 200:
return
else:
raise Exception(response.read().decode())
async def send_channle_group_text_msg(self,channel_id:str,content:str,msg_id:str):
"""发送频道群聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/channels/" + channel_id + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
params = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=params)
if response.status_code == 200:
return True
else:
raise Exception(response)
async def send_channle_private_text_msg(self,guild_id:str,content:str,msg_id:str):
"""发送频道私聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/dms/" + guild_id + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
params = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=params)
if response.status_code == 200:
return True
else:
raise Exception(response)
async def is_token_expired(self):
"""检查token是否过期"""
if self.access_token_expiry_time is None:
return True
return time.time() > self.access_token_expiry_time

View File

@@ -0,0 +1,114 @@
from typing import Dict, Any, Optional
class QQOfficialEvent(dict):
@staticmethod
def from_payload(payload: Dict[str, Any]) -> Optional["QQOfficialEvent"]:
try:
event = QQOfficialEvent(payload)
return event
except KeyError:
return None
@property
def t(self) -> str:
"""
事件类型
"""
return self.get("t", "")
@property
def user_openid(self) -> str:
"""
用户openid
"""
return self.get("user_openid",{})
@property
def timestamp(self) -> str:
"""
时间戳
"""
return self.get("timestamp",{})
@property
def d_author_id(self) -> str:
"""
作者id
"""
return self.get("id",{})
@property
def content(self) -> str:
"""
内容
"""
return self.get("content",'')
@property
def d_id(self) -> str:
"""
d_id
"""
return self.get("d_id",{})
@property
def id(self) -> str:
"""
消息idmsg_id
"""
return self.get("id",{})
@property
def channel_id(self) -> str:
"""
频道id
"""
return self.get("channel_id",{})
@property
def username(self) -> str:
"""
用户名
"""
return self.get("username",{})
@property
def guild_id(self) -> str:
"""
频道id
"""
return self.get("guild_id",{})
@property
def member_openid(self) -> str:
"""
成员openid
"""
return self.get("openid",{})
@property
def attachments(self) -> str:
"""
附件url
"""
url = self.get("image_attachments", "")
if url and not url.startswith("https://"):
url = "https://" + url
return url
@property
def group_openid(self) -> str:
"""
群组id
"""
return self.get("group_openid",{})
@property
def content_type(self) -> str:
"""
文件类型
"""
return self.get("content_type","")

View File

@@ -27,7 +27,8 @@ required_deps = {
"jwt": "pyjwt",
"Crypto": "pycryptodome",
"lark_oapi": "lark-oapi",
"discord": "discord.py"
"discord": "discord.py",
"cryptography": "cryptography"
}

View File

@@ -7,6 +7,8 @@ import logging
import asyncio
import traceback
from .sources import qqofficial
# FriendMessage, Image, MessageChain, Plain
from ..platform import adapter as msadapter
@@ -37,7 +39,7 @@ class PlatformManager:
async def initialize(self):
from .sources import nakuru, aiocqhttp, qqbotpy, wecom, lark, discord
from .sources import nakuru, aiocqhttp, qqofficial, wecom, lark, discord
async def on_friend_message(event: platform_events.FriendMessage, adapter: msadapter.MessageSourceAdapter):

View File

@@ -1,596 +0,0 @@
from __future__ import annotations
import logging
import typing
import datetime
import re
import traceback
import botpy
import botpy.message as botpy_message
import botpy.types.message as botpy_message_type
from .. import adapter as adapter_model
from ...pipeline.longtext.strategies import forward
from ...core import app
from ...config import manager as cfg_mgr
from ...platform.types import entities as platform_entities
from ...platform.types import events as platform_events
from ...platform.types import message as platform_message
class OfficialGroupMessage(platform_events.GroupMessage):
pass
class OfficialFriendMessage(platform_events.FriendMessage):
pass
event_handler_mapping = {
platform_events.GroupMessage: ["on_at_message_create", "on_group_at_message_create"],
platform_events.FriendMessage: ["on_direct_message_create", "on_c2c_message_create"],
}
cached_message_ids = {}
"""由于QQ官方的消息id是字符串而YiriMirai的消息id是整数所以需要一个索引来进行转换"""
id_index = 0
def save_msg_id(message_id: str) -> int:
"""保存消息id"""
global id_index, cached_message_ids
crt_index = id_index
id_index += 1
cached_message_ids[str(crt_index)] = message_id
return crt_index
def char_to_value(char):
"""将单个字符转换为相应的数值。"""
if '0' <= char <= '9':
return ord(char) - ord('0')
elif 'A' <= char <= 'Z':
return ord(char) - ord('A') + 10
return ord(char) - ord('a') + 36
def digest(s: str) -> int:
"""计算字符串的hash值。"""
# 取末尾的8位
sub_s = s[-10:]
number = 0
base = 36
for i in range(len(sub_s)):
number = number * base + char_to_value(sub_s[i])
return number
K = typing.TypeVar("K")
V = typing.TypeVar("V")
class OpenIDMapping(typing.Generic[K, V]):
map: dict[K, V]
dump_func: typing.Callable
digest_func: typing.Callable[[K], V]
def __init__(self, map: dict[K, V], dump_func: typing.Callable, digest_func: typing.Callable[[K], V] = digest):
self.map = map
self.dump_func = dump_func
self.digest_func = digest_func
def __getitem__(self, key: K) -> V:
return self.map[key]
def __setitem__(self, key: K, value: V):
self.map[key] = value
self.dump_func()
def __contains__(self, key: K) -> bool:
return key in self.map
def __delitem__(self, key: K):
del self.map[key]
self.dump_func()
def getkey(self, value: V) -> K:
return list(self.map.keys())[list(self.map.values()).index(value)]
def save_openid(self, key: K) -> V:
if key in self.map:
return self.map[key]
value = self.digest_func(key)
self.map[key] = value
self.dump_func()
return value
class OfficialMessageConverter(adapter_model.MessageConverter):
"""QQ 官方消息转换器"""
@staticmethod
def yiri2target(message_chain: platform_message.MessageChain):
"""将 YiriMirai 的消息链转换为 QQ 官方消息"""
msg_list = []
if type(message_chain) is platform_message.MessageChain:
msg_list = message_chain.__root__
elif type(message_chain) is list:
msg_list = message_chain
elif type(message_chain) is str:
msg_list = [platform_message.Plain(text=message_chain)]
else:
raise Exception(
"Unknown message type: " + str(message_chain) + str(type(message_chain))
)
offcial_messages: list[dict] = []
"""
{
"type": "text",
"content": "Hello World!"
}
{
"type": "image",
"content": "https://example.com/example.jpg"
}
"""
# 遍历并转换
for component in msg_list:
if type(component) is platform_message.Plain:
offcial_messages.append({"type": "text", "content": component.text})
elif type(component) is platform_message.Image:
if component.url is not None:
offcial_messages.append({"type": "image", "content": component.url})
elif component.path is not None:
offcial_messages.append(
{"type": "file_image", "content": component.path}
)
elif type(component) is platform_message.At:
offcial_messages.append({"type": "at", "content": ""})
elif type(component) is platform_message.AtAll:
print(
"上层组件要求发送 AtAll 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。"
)
elif type(component) is platform_message.Voice:
print(
"上层组件要求发送 Voice 消息,但 QQ 官方 API 不支持此消息类型,忽略此消息。"
)
elif type(component) is forward.Forward:
# 转发消息
yiri_forward_node_list = component.node_list
# 遍历并转换
for yiri_forward_node in yiri_forward_node_list:
try:
message_chain = yiri_forward_node.message_chain
# 平铺
offcial_messages.extend(
OfficialMessageConverter.yiri2target(message_chain)
)
except Exception as e:
import traceback
traceback.print_exc()
return offcial_messages
@staticmethod
def extract_message_chain_from_obj(
message: typing.Union[botpy_message.Message, botpy_message.DirectMessage, botpy_message.GroupMessage, botpy_message.C2CMessage],
message_id: str = None,
bot_account_id: int = 0,
) -> platform_message.MessageChain:
yiri_msg_list = []
# 存id
yiri_msg_list.append(
platform_message.Source(
id=save_msg_id(message_id), time=datetime.datetime.now()
)
)
if type(message) not in [botpy_message.DirectMessage, botpy_message.C2CMessage]:
yiri_msg_list.append(platform_message.At(target=bot_account_id))
if hasattr(message, "mentions"):
for mention in message.mentions:
if mention.bot:
continue
yiri_msg_list.append(platform_message.At(target=mention.id))
for attachment in message.attachments:
if attachment.content_type.startswith("image"):
yiri_msg_list.append(platform_message.Image(url=attachment.url))
else:
logging.warning(
"不支持的附件类型:" + attachment.content_type + ",忽略此附件。"
)
content = re.sub(r"<@!\d+>", "", str(message.content))
if content.strip() != "":
yiri_msg_list.append(platform_message.Plain(text=content))
chain = platform_message.MessageChain(yiri_msg_list)
return chain
class OfficialEventConverter(adapter_model.EventConverter):
"""事件转换器"""
member_openid_mapping: OpenIDMapping[str, int]
group_openid_mapping: OpenIDMapping[str, int]
def __init__(self, member_openid_mapping: OpenIDMapping[str, int], group_openid_mapping: OpenIDMapping[str, int]):
self.member_openid_mapping = member_openid_mapping
self.group_openid_mapping = group_openid_mapping
def yiri2target(self, event: typing.Type[platform_events.Event]):
if event == platform_events.GroupMessage:
return botpy_message.Message
elif event == platform_events.FriendMessage:
return botpy_message.DirectMessage
else:
raise Exception(
"未支持转换的事件类型(YiriMirai -> Official): " + str(event)
)
def target2yiri(
self,
event: typing.Union[botpy_message.Message, botpy_message.DirectMessage, botpy_message.GroupMessage, botpy_message.C2CMessage],
) -> platform_events.Event:
if type(event) == botpy_message.Message: # 频道内,转群聊事件
permission = "MEMBER"
if "2" in event.member.roles:
permission = "ADMINISTRATOR"
elif "4" in event.member.roles:
permission = "OWNER"
return platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=event.author.id,
member_name=event.author.username,
permission=permission,
group=platform_entities.Group(
id=event.channel_id,
name=event.author.username,
permission=platform_entities.Permission.Member,
),
special_title="",
join_timestamp=int(
datetime.datetime.strptime(
event.member.joined_at, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.DirectMessage: # 频道私聊,转私聊事件
return platform_events.FriendMessage(
sender=platform_entities.Friend(
id=event.guild_id,
nickname=event.author.username,
remark=event.author.username,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.GroupMessage: # 群聊,转群聊事件
replacing_member_id = self.member_openid_mapping.save_openid(event.author.member_openid)
return OfficialGroupMessage(
sender=platform_entities.GroupMember(
id=replacing_member_id,
member_name=replacing_member_id,
permission="MEMBER",
group=platform_entities.Group(
id=self.group_openid_mapping.save_openid(event.group_openid),
name=replacing_member_id,
permission=platform_entities.Permission.Member,
),
special_title="",
join_timestamp=int(0),
last_speak_timestamp=datetime.datetime.now().timestamp(),
mute_time_remaining=0,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
elif type(event) == botpy_message.C2CMessage: # 私聊,转私聊事件
user_id_alter = self.member_openid_mapping.save_openid(event.author.user_openid) # 实测这里的user_openid与group的member_openid是一样的
return OfficialFriendMessage(
sender=platform_entities.Friend(
id=user_id_alter,
nickname=user_id_alter,
remark=user_id_alter,
),
message_chain=OfficialMessageConverter.extract_message_chain_from_obj(
event, event.id
),
time=int(
datetime.datetime.strptime(
event.timestamp, "%Y-%m-%dT%H:%M:%S%z"
).timestamp()
),
)
@adapter_model.adapter_class("qq-botpy")
class OfficialAdapter(adapter_model.MessageSourceAdapter):
"""QQ 官方消息适配器"""
bot: botpy.Client = None
bot_account_id: int = 0
message_converter: OfficialMessageConverter
event_converter: OfficialEventConverter
cfg: dict = None
cached_official_messages: dict = {}
"""缓存的 qq-botpy 框架消息对象
message_id: botpy_message.Message | botpy_message.DirectMessage
"""
ap: app.Application
metadata: cfg_mgr.ConfigManager = None
member_openid_mapping: OpenIDMapping[str, int] = None
group_openid_mapping: OpenIDMapping[str, int] = None
group_msg_seq = None
c2c_msg_seq = None
def __init__(self, cfg: dict, ap: app.Application):
"""初始化适配器"""
self.cfg = cfg
self.ap = ap
self.group_msg_seq = 1
self.c2c_msg_seq = 1
switchs = {}
for intent in cfg["intents"]:
switchs[intent] = True
del cfg["intents"]
intents = botpy.Intents(**switchs)
self.bot = botpy.Client(intents=intents)
async def send_message(
self, target_type: str, target_id: str, message: platform_message.MessageChain
):
message_list = self.message_converter.yiri2target(message)
for msg in message_list:
args = {}
if msg["type"] == "text":
args["content"] = msg["content"]
elif msg["type"] == "image":
args["image"] = msg["content"]
elif msg["type"] == "file_image":
args["file_image"] = msg["content"]
else:
continue
if target_type == "group":
args["channel_id"] = str(target_id)
await self.bot.api.post_message(**args)
elif target_type == "person":
args["guild_id"] = str(target_id)
await self.bot.api.post_dms(**args)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
message_list = self.message_converter.yiri2target(message)
for msg in message_list:
args = {}
if msg["type"] == "text":
args["content"] = msg["content"]
elif msg["type"] == "image":
args["image"] = msg["content"]
elif msg["type"] == "file_image":
args["file_image"] = msg["content"]
else:
continue
if quote_origin:
args["message_reference"] = botpy_message_type.Reference(
message_id=cached_message_ids[
str(message_source.message_chain.message_id)
]
)
if type(message_source) == platform_events.GroupMessage:
args["channel_id"] = str(message_source.sender.group.id)
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
await self.bot.api.post_message(**args)
elif type(message_source) == platform_events.FriendMessage:
args["guild_id"] = str(message_source.sender.id)
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
await self.bot.api.post_dms(**args)
elif type(message_source) == OfficialGroupMessage:
if "file_image" in args: # 暂不支持发送文件图片
continue
args["group_openid"] = self.group_openid_mapping.getkey(
message_source.sender.group.id
)
if "image" in args:
uploadMedia = await self.bot.api.post_group_file(
group_openid=args["group_openid"],
file_type=1,
url=str(args['image'])
)
del args['image']
args['media'] = uploadMedia
args['msg_type'] = 7
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
args["msg_seq"] = self.group_msg_seq
self.group_msg_seq += 1
await self.bot.api.post_group_message(**args)
elif type(message_source) == OfficialFriendMessage:
if "file_image" in args:
continue
args["openid"] = self.member_openid_mapping.getkey(
message_source.sender.id
)
if "image" in args:
uploadMedia = await self.bot.api.post_c2c_file(
openid=args["openid"],
file_type=1,
url=str(args['image'])
)
del args['image']
args['media'] = uploadMedia
args['msg_type'] = 7
args["msg_id"] = cached_message_ids[
str(message_source.message_chain.message_id)
]
args["msg_seq"] = self.c2c_msg_seq
self.c2c_msg_seq += 1
await self.bot.api.post_c2c_message(**args)
async def is_muted(self, group_id: int) -> bool:
return False
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, adapter_model.MessageSourceAdapter], None
],
):
try:
async def wrapper(
message: typing.Union[
botpy_message.Message,
botpy_message.DirectMessage,
botpy_message.GroupMessage,
]
):
self.cached_official_messages[str(message.id)] = message
await callback(self.event_converter.target2yiri(message), self)
for event_handler in event_handler_mapping[event_type]:
setattr(self.bot, event_handler, wrapper)
except Exception as e:
traceback.print_exc()
raise e
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, adapter_model.MessageSourceAdapter], None
],
):
delattr(self.bot, event_handler_mapping[event_type])
async def run_async(self):
self.metadata = self.ap.adapter_qq_botpy_meta
self.member_openid_mapping = OpenIDMapping(
map=self.metadata.data["mapping"]["members"],
dump_func=self.metadata.dump_config_sync,
)
self.group_openid_mapping = OpenIDMapping(
map=self.metadata.data["mapping"]["groups"],
dump_func=self.metadata.dump_config_sync,
)
self.message_converter = OfficialMessageConverter()
self.event_converter = OfficialEventConverter(
self.member_openid_mapping, self.group_openid_mapping
)
self.cfg['ret_coro'] = True
self.ap.logger.info("运行 QQ 官方适配器")
await (await self.bot.start(**self.cfg))
async def kill(self) -> bool:
if not self.bot.is_closed():
await self.bot.close()
return True

View File

@@ -0,0 +1,256 @@
from __future__ import annotations
import typing
import asyncio
import traceback
import time
import datetime
import aiocqhttp
import aiohttp
from pkg.platform.adapter import MessageSourceAdapter
from pkg.platform.types import events as platform_events, message as platform_message
from pkg.core import app
from .. import adapter
from ...pipeline.longtext.strategies import forward
from ...core import app
from ..types import message as platform_message
from ..types import events as platform_events
from ..types import entities as platform_entities
from ...command.errors import ParamNotEnoughError
from libs.qq_official_api.api import QQOfficialClient
from libs.qq_official_api.qqofficialevent import QQOfficialEvent
from ...utils import image
class QQOfficialMessageConverter(adapter.MessageConverter):
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain):
content_list = []
#只实现了发文字
for msg in message_chain:
if type(msg) is platform_message.Plain:
content_list.append({
"type":"text",
"content":msg.text,
})
return content_list
@staticmethod
async def target2yiri(message:str,message_id:str,pic_url:str,content_type):
yiri_msg_list = []
yiri_msg_list.append(
platform_message.Source(id=message_id,time=datetime.datetime.now())
)
if pic_url is not None:
base64_url = await image.get_qq_official_image_base64(pic_url=pic_url,content_type=content_type)
yiri_msg_list.append(
platform_message.Image(base64=base64_url)
)
message = ''
yiri_msg_list.append(platform_message.Plain(text=message))
chain = platform_message.MessageChain(yiri_msg_list)
return chain
class QQOfficialEventConverter(adapter.EventConverter):
@staticmethod
async def yiri2target(event:platform_events.MessageEvent) -> QQOfficialEvent:
return event.source_platform_object
@staticmethod
async def target2yiri(event:QQOfficialEvent):
"""
QQ官方消息转换为LB对象
"""
yiri_chain = await QQOfficialMessageConverter.target2yiri(
message=event.content,message_id=event.d_id,pic_url=event.attachments,content_type=event.content_type
)
if event.t == 'C2C_MESSAGE_CREATE':
friend = platform_entities.Friend(
id = event.user_openid,
nickname = event.t,
remark = "",
)
return platform_events.FriendMessage(
sender = friend,message_chain = yiri_chain,time = event.timestamp,
source_platform_object=event
)
if event.t == 'DIRECT_MESSAGE_CREATE':
friend = platform_entities.Friend(
id = event.guild_id,
nickname = event.t,
remark = "",
)
return platform_events.FriendMessage(
sender = friend,message_chain = yiri_chain,
source_platform_object=event
)
if event.t == 'GROUP_AT_MESSAGE_CREATE':
yiri_chain.insert(0, platform_message.At(target="justbot"))
sender = platform_entities.GroupMember(
id = event.group_openid,
member_name= event.t,
permission= 'MEMBER',
group = platform_entities.Group(
id = 0,
name = 'MEMBER',
permission= platform_entities.Permission.Member
),
special_title='',
join_timestamp=0,
last_speak_timestamp=0,
mute_time_remaining=0
)
time = event.timestamp
return platform_events.GroupMessage(
sender = sender,
message_chain=yiri_chain,
time = time,
source_platform_object=event
)
if event.t =='AT_MESSAGE_CREATE':
yiri_chain.insert(0, platform_message.At(target="justbot"))
sender = platform_entities.GroupMember(
id = event.channel_id,
member_name=event.t,
permission= 'MEMBER',
group = platform_entities.Group(
id = 0,
name = 'MEMBER',
permission=platform_entities.Permission.Member
),
special_title='',
join_timestamp=0,
last_speak_timestamp=0,
mute_time_remaining=0
)
time = event.timestamp,
return platform_events.GroupMessage(
sender =sender,
message_chain = yiri_chain,
time = time,
source_platform_object=event
)
@adapter.adapter_class("qqofficial")
class QQOfficialAdapter(adapter.MessageSourceAdapter):
bot:QQOfficialClient
ap:app.Application
config:dict
bot_account_id:str
message_converter: QQOfficialMessageConverter = QQOfficialMessageConverter()
event_converter: QQOfficialEventConverter = QQOfficialEventConverter()
def __init__(self, config:dict, ap:app.Application):
self.config = config
self.ap = ap
required_keys = [
"appid",
"secret",
]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise ParamNotEnoughError("QQ官方机器人缺少相关配置项请查看文档或联系管理员")
self.bot = QQOfficialClient(
app_id=config["appid"],
secret=config["secret"],
token=config["token"],
)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
qq_official_event = await QQOfficialEventConverter.yiri2target(
message_source,
)
content_list = await QQOfficialMessageConverter.yiri2target(message)
#私聊消息
if qq_official_event.t == 'C2C_MESSAGE_CREATE':
for content in content_list:
if content["type"] == 'text':
await self.bot.send_private_text_msg(qq_official_event.user_openid,content['content'],qq_official_event.d_id)
#群聊消息
if qq_official_event.t == 'GROUP_AT_MESSAGE_CREATE':
for content in content_list:
if content["type"] == 'text':
await self.bot.send_group_text_msg(qq_official_event.group_openid,content['content'],qq_official_event.d_id)
#频道群聊
if qq_official_event.t == 'AT_MESSAGE_CREATE':
for content in content_list:
if content["type"] == 'text':
await self.bot.send_channle_group_text_msg(qq_official_event.channel_id,content['content'],qq_official_event.d_id)
#频道私聊
if qq_official_event.t == 'DIRECT_MESSAGE_CREATE':
for content in content_list:
if content["type"] == 'text':
await self.bot.send_channle_private_text_msg(qq_official_event.guild_id,content['content'],qq_official_event.d_id)
async def send_message(
self, target_type: str, target_id: str, message: platform_message.MessageChain
):
pass
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, adapter.MessageSourceAdapter], None
],
):
async def on_message(event:QQOfficialEvent):
self.bot_account_id = "justbot"
try:
return await callback(
await self.event_converter.target2yiri(event),self
)
except:
traceback.print_exc()
if event_type == platform_events.FriendMessage:
self.bot.on_message("DIRECT_MESSAGE_CREATE")(on_message)
self.bot.on_message("C2C_MESSAGE_CREATE")(on_message)
elif event_type == platform_events.GroupMessage:
self.bot.on_message("GROUP_AT_MESSAGE_CREATE")(on_message)
self.bot.on_message("AT_MESSAGE_CREATE")(on_message)
async def run_async(self):
async def shutdown_trigger_placeholder():
while True:
await asyncio.sleep(1)
await self.bot.run_task(
host=self.config["host"],
port=self.config["port"],
shutdown_trigger=shutdown_trigger_placeholder,
)
async def kill(self) -> bool:
return False
def unregister_listener(
self,
event_type: type,
callback: typing.Callable[[platform_events.Event, MessageSourceAdapter], None],
):
return super().unregister_listener(event_type, callback)

View File

@@ -29,17 +29,6 @@ class WecomMessageConverter(adapter.MessageConverter):
):
content_list = []
[
{
"type": "text",
"content": "text",
},
{
"type": "image",
"media_id": "media_id",
}
]
for msg in message_chain:
if type(msg) is platform_message.Plain:
content_list.append({
@@ -83,7 +72,7 @@ class WecomMessageConverter(adapter.MessageConverter):
image_base64, image_format = await image.get_wecom_image_base64(pic_url=picurl)
yiri_msg_list.append(platform_message.Image(base64=f"data:image/{image_format};base64,{image_base64}"))
chain = platform_message.MessageChain(yiri_msg_list)
return chain
@@ -208,7 +197,7 @@ class WecomeAdapter(adapter.MessageSourceAdapter):
await self.bot.send_private_msg(Wecom_event.user_id, Wecom_event.agent_id, content["content"])
elif content["type"] == "image":
await self.bot.send_image(Wecom_event.user_id, Wecom_event.agent_id, content["media_id"])
async def send_message(
self, target_type: str, target_id: str, message: platform_message.MessageChain
):

View File

@@ -6,6 +6,7 @@ import ssl
import aiohttp
import PIL.Image
import httpx
async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]:
"""
@@ -30,7 +31,19 @@ async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]:
image_base64 = base64.b64encode(image_data).decode('utf-8')
return image_base64, image_format
async def get_qq_official_image_base64(pic_url:str,content_type:str) -> tuple[str,str]:
"""
下载QQ官方图片
并且转换为base64格式
"""
async with httpx.AsyncClient() as client:
response = await client.get(pic_url)
response.raise_for_status() # 确保请求成功
image_data = response.content
base64_data = base64.b64encode(image_data).decode('utf-8')
return f"data:{content_type};base64,{base64_data}"
def get_qq_image_downloadable_url(image_url: str) -> tuple[str, dict]:

View File

@@ -27,6 +27,6 @@ pyjwt
pycryptodome
lark-oapi
discord.py
cryptography
# indirect
taskgroup==0.0.0a4

View File

@@ -0,0 +1 @@
[]

1
res/instance_id.json Normal file
View File

@@ -0,0 +1 @@
{"host_id": "host_9b4a220d-3bb6-42fc-aec3-41188ce0a41c", "instance_id": "instance_61d8f262-b98a-4165-8e77-85fb6262529e", "instance_create_ts": 1736824678}