Files
LangBot/libs/qq_official_api/api.py
2025-02-04 06:35:51 -05:00

275 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from quart import request
import base64
import binascii
import httpx
from quart import Quart
import xml.etree.ElementTree as ET
from typing import Callable, Dict, Any
from pkg.platform.types import events as platform_events, message as platform_message
import aiofiles
from .qqofficialevent import QQOfficialEvent
import json
import hmac
import base64
import hashlib
import traceback
from cryptography.hazmat.primitives.asymmetric import ed25519
from .qqofficialevent import QQOfficialEvent
def handle_validation(body: dict, bot_secret: str):
# bot正确的secert是32位的此处仅为了适配演示demo
while len(bot_secret) < 32:
bot_secret = bot_secret * 2
bot_secret = bot_secret[:32]
# 实际使用场景中以上三行内容可清除
seed_bytes = bot_secret.encode()
signing_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed_bytes)
msg = body['d']['event_ts'] + body['d']['plain_token']
msg_bytes = msg.encode()
signature = signing_key.sign(msg_bytes)
signature_hex = signature.hex()
response = {
"plain_token": body['d']['plain_token'],
"signature": signature_hex
}
return response
class QQOfficialClient:
def __init__(self, secret: str, token: str, app_id: str):
self.app = Quart(__name__)
self.app.add_url_rule(
"/callback/command",
"handle_callback",
self.handle_callback_request,
methods=["GET", "POST"],
)
self.secret = secret
self.token = token
self.app_id = app_id
self._message_handlers = {
}
self.base_url = "https://api.sgroup.qq.com"
self.access_token = ""
self.access_token_expiry_time = None
async def check_access_token(self):
"""检查access_token是否存在"""
if not self.access_token or await self.is_token_expired():
return False
return bool(self.access_token and self.access_token.strip())
async def get_access_token(self):
"""获取access_token"""
url = "https://bots.qq.com/app/getAppAccessToken"
async with httpx.AsyncClient() as client:
params = {
"appId":self.app_id,
"clientSecret":self.secret,
}
headers = {
"content-type":"application/json",
}
try:
response = await client.post(url,json=params,headers=headers)
if response.status_code == 200:
response_data = response.json()
access_token = response_data.get("access_token")
expires_in = int(response_data.get("expires_in",7200))
self.access_token_expiry_time = time.time() + expires_in - 60
if access_token:
self.access_token = access_token
except Exception as e:
raise Exception(f"获取access_token失败: {e}")
async def handle_callback_request(self):
"""处理回调请求"""
try:
# 读取请求数据
body = await request.get_data()
payload = json.loads(body)
# 验证是否为回调验证请求
if payload.get("op") == 13:
# 生成签名
response = handle_validation(payload, self.secret)
return response
if payload.get("op") == 0:
message_data = await self.get_message(payload)
if message_data:
event = QQOfficialEvent.from_payload(message_data)
await self._handle_message(event)
return {"code": 0, "message": "success"}
except Exception as e:
traceback.print_exc()
return {"error": str(e)}, 400
async def run_task(self, host: str, port: int, *args, **kwargs):
"""启动 Quart 应用"""
await self.app.run_task(host=host, port=port, *args, **kwargs)
def on_message(self, msg_type: str):
"""注册消息类型处理器"""
def decorator(func: Callable[[platform_events.Event], None]):
if msg_type not in self._message_handlers:
self._message_handlers[msg_type] = []
self._message_handlers[msg_type].append(func)
return func
return decorator
async def _handle_message(self, event:QQOfficialEvent):
"""处理消息事件"""
msg_type = event.t
if msg_type in self._message_handlers:
for handler in self._message_handlers[msg_type]:
await handler(event)
async def get_message(self,msg:dict) -> Dict[str,Any]:
"""获取消息"""
message_data = {
"t": msg.get("t",{}),
"user_openid": msg.get("d",{}).get("author",{}).get("user_openid",{}),
"timestamp": msg.get("d",{}).get("timestamp",{}),
"d_author_id": msg.get("d",{}).get("author",{}).get("id",{}),
"content": msg.get("d",{}).get("content",{}),
"d_id": msg.get("d",{}).get("id",{}),
"id": msg.get("id",{}),
"channel_id": msg.get("d",{}).get("channel_id",{}),
"username": msg.get("d",{}).get("author",{}).get("username",{}),
"guild_id": msg.get("d",{}).get("guild_id",{}),
"member_openid": msg.get("d",{}).get("author",{}).get("openid",{}),
"group_openid": msg.get("d",{}).get("group_openid",{})
}
attachments = msg.get("d", {}).get("attachments", [])
image_attachments = [attachment['url'] for attachment in attachments if await self.is_image(attachment)]
image_attachments_type = [attachment['content_type'] for attachment in attachments if await self.is_image(attachment)]
if image_attachments:
message_data["image_attachments"] = image_attachments[0]
message_data["content_type"] = image_attachments_type[0]
else:
message_data["image_attachments"] = None
return message_data
async def is_image(self,attachment:dict) -> bool:
"""判断是否为图片附件"""
content_type = attachment.get("content_type","")
return content_type.startswith("image/")
async def send_private_text_msg(self,user_openid:str,content:str,msg_id:str):
"""发送私聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/v2/users/" + user_openid + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
data = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=data)
if response.status_code == 200:
return
else:
raise ValueError(response)
async def send_group_text_msg(self,group_openid:str,content:str,msg_id:str):
"""发送群聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/v2/groups/" + group_openid + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
data = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=data)
if response.status_code == 200:
return
else:
raise Exception(response.read().decode())
async def send_channle_group_text_msg(self,channel_id:str,content:str,msg_id:str):
"""发送频道群聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/channels/" + channel_id + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
params = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=params)
if response.status_code == 200:
return True
else:
raise Exception(response)
async def send_channle_private_text_msg(self,guild_id:str,content:str,msg_id:str):
"""发送频道私聊消息"""
if not await self.check_access_token():
await self.get_access_token()
url = self.base_url + "/dms/" + guild_id + "/messages"
async with httpx.AsyncClient() as client:
headers = {
"Authorization": f"QQBot {self.access_token}",
"Content-Type": "application/json",
}
params = {
"content": content,
"msg_type": 0,
"msg_id": msg_id,
}
response = await client.post(url,headers=headers,json=params)
if response.status_code == 200:
return True
else:
raise Exception(response)
async def is_token_expired(self):
"""检查token是否过期"""
if self.access_token_expiry_time is None:
return True
return time.time() > self.access_token_expiry_time