fix: qqo webhook

This commit is contained in:
wangcham
2025-11-17 16:21:01 +08:00
parent 2ef47ebfb1
commit 403a721b94

View File

@@ -10,73 +10,6 @@ import traceback
from cryptography.hazmat.primitives.asymmetric import ed25519
def handle_validation(body: dict, bot_secret: str):
"""
处理 QQ 官方机器人的回调验证请求
Args:
body: 包含验证数据的请求体
bot_secret: 机器人密钥
Returns:
包含签名的验证响应
"""
try:
# 解析验证数据
validation_data = body.get('d')
if not validation_data:
print("parse http payload failed: missing 'd' field")
return None
event_ts = validation_data.get('event_ts')
plain_token = validation_data.get('plain_token')
if not event_ts or not plain_token:
print("parse http payload failed: missing event_ts or plain_token")
return None
# 处理 bot_secret确保长度达到 32 字节ed25519.SeedSize
seed = bot_secret
while len(seed) < 32:
seed = seed * 2
seed = seed[:32]
# 将 seed 转换为字节
seed_bytes = seed.encode()
# 从 seed 生成 ed25519 私钥
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed_bytes)
msg = event_ts + plain_token
msg_bytes = msg.encode()
signature = private_key.sign(msg_bytes)
# 将签名转换为十六进制字符串
signature_hex = signature.hex()
# 构建验证响应
response = {
'plain_token': plain_token,
'signature': signature_hex
}
# 打印调试信息
print(f'[QQ Official Validation]')
print(f' event_ts: {event_ts}')
print(f' plain_token: {plain_token}')
print(f' Message to sign: {msg}')
print(f' Signature: {signature_hex}')
return response
except Exception as e:
print(f"handle validation failed: {e}")
import traceback
traceback.print_exc()
return None
class QQOfficialClient:
def __init__(self, secret: str, token: str, app_id: str, logger: None, unified_mode: bool = False):
self.unified_mode = unified_mode
@@ -157,16 +90,19 @@ class QQOfficialClient:
print(f'[QQ Official] Received request, body length: {len(body)}')
if not body or len(body) == 0:
print('[QQ Official] Received empty body, might be health check or GET request')
return {'code': 0, 'message': 'ok'}, 200
payload = json.loads(body)
# 验证是否为回调验证请求
if payload.get('op') == 13:
print(f'[QQ Official] Received callback validation request (op=13)')
# 生成签名
response = handle_validation(payload, self.secret)
print(response)
print(f'[QQ Official] Returning validation response')
return response
validation_data = payload.get('d')
if not validation_data:
return {'error': "missing 'd' field"}, 400
response = await self.verify(validation_data)
return response, 200
if payload.get('op') == 0:
message_data = await self.get_message(payload)
@@ -335,3 +271,26 @@ class QQOfficialClient:
if self.access_token_expiry_time is None:
return True
return time.time() > self.access_token_expiry_time
async def repeat_seed(self, bot_secret: str, target_size: int = 32) -> bytes:
seed = bot_secret
while len(seed) < target_size:
seed *= 2
return seed[:target_size].encode("utf-8")
async def verify(self, validation_payload: dict):
seed = await self.repeat_seed(self.secret)
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed)
event_ts = validation_payload.get("event_ts", "")
plain_token = validation_payload.get("plain_token", "")
msg = event_ts + plain_token
# sign
signature = private_key.sign(msg.encode()).hex()
response = {
"plain_token": plain_token,
"signature": signature,
}
return response