From 403a721b94a499fea7f945ef162dfea316103c50 Mon Sep 17 00:00:00 2001 From: wangcham Date: Mon, 17 Nov 2025 16:21:01 +0800 Subject: [PATCH] fix: qqo webhook --- libs/qq_official_api/api.py | 107 +++++++++++------------------------- 1 file changed, 33 insertions(+), 74 deletions(-) diff --git a/libs/qq_official_api/api.py b/libs/qq_official_api/api.py index 6960a239..8e7da01f 100644 --- a/libs/qq_official_api/api.py +++ b/libs/qq_official_api/api.py @@ -10,73 +10,6 @@ import traceback from cryptography.hazmat.primitives.asymmetric import ed25519 -def handle_validation(body: dict, bot_secret: str): - """ - 处理 QQ 官方机器人的回调验证请求 - - Args: - body: 包含验证数据的请求体 - bot_secret: 机器人密钥 - - Returns: - 包含签名的验证响应 - """ - try: - # 解析验证数据 - validation_data = body.get('d') - if not validation_data: - print("parse http payload failed: missing 'd' field") - return None - - event_ts = validation_data.get('event_ts') - plain_token = validation_data.get('plain_token') - - if not event_ts or not plain_token: - print("parse http payload failed: missing event_ts or plain_token") - return None - - # 处理 bot_secret:确保长度达到 32 字节(ed25519.SeedSize) - seed = bot_secret - while len(seed) < 32: - seed = seed * 2 - seed = seed[:32] - - # 将 seed 转换为字节 - seed_bytes = seed.encode() - - # 从 seed 生成 ed25519 私钥 - private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed_bytes) - - msg = event_ts + plain_token - msg_bytes = msg.encode() - - signature = private_key.sign(msg_bytes) - - # 将签名转换为十六进制字符串 - signature_hex = signature.hex() - - # 构建验证响应 - response = { - 'plain_token': plain_token, - 'signature': signature_hex - } - - # 打印调试信息 - print(f'[QQ Official Validation]') - print(f' event_ts: {event_ts}') - print(f' plain_token: {plain_token}') - print(f' Message to sign: {msg}') - print(f' Signature: {signature_hex}') - - return response - - except Exception as e: - print(f"handle validation failed: {e}") - import traceback - traceback.print_exc() - return None - - class QQOfficialClient: def __init__(self, secret: str, token: str, app_id: str, logger: None, unified_mode: bool = False): self.unified_mode = unified_mode @@ -157,16 +90,19 @@ class QQOfficialClient: print(f'[QQ Official] Received request, body length: {len(body)}') + if not body or len(body) == 0: + print('[QQ Official] Received empty body, might be health check or GET request') + return {'code': 0, 'message': 'ok'}, 200 + payload = json.loads(body) - # 验证是否为回调验证请求 + if payload.get('op') == 13: - print(f'[QQ Official] Received callback validation request (op=13)') - # 生成签名 - response = handle_validation(payload, self.secret) - print(response) - print(f'[QQ Official] Returning validation response') - return response + validation_data = payload.get('d') + if not validation_data: + return {'error': "missing 'd' field"}, 400 + response = await self.verify(validation_data) + return response, 200 if payload.get('op') == 0: message_data = await self.get_message(payload) @@ -335,3 +271,26 @@ class QQOfficialClient: if self.access_token_expiry_time is None: return True return time.time() > self.access_token_expiry_time + + async def repeat_seed(self, bot_secret: str, target_size: int = 32) -> bytes: + seed = bot_secret + while len(seed) < target_size: + seed *= 2 + return seed[:target_size].encode("utf-8") + + async def verify(self, validation_payload: dict): + seed = await self.repeat_seed(self.secret) + private_key = ed25519.Ed25519PrivateKey.from_private_bytes(seed) + + event_ts = validation_payload.get("event_ts", "") + plain_token = validation_payload.get("plain_token", "") + msg = event_ts + plain_token + + # sign + signature = private_key.sign(msg.encode()).hex() + + response = { + "plain_token": plain_token, + "signature": signature, + } + return response