fix: ruff linter error in libs

This commit is contained in:
Junyan Qin
2025-06-09 17:56:21 +08:00
parent f5a4503610
commit ad3a163d82
5 changed files with 32 additions and 49 deletions

View File

@@ -1,4 +1,4 @@
from v1 import client
from v1 import client # type: ignore
import asyncio
@@ -8,19 +8,13 @@ import json
class TestDifyClient:
async def test_chat_messages(self):
cln = client.AsyncDifyServiceClient(
api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL')
)
cln = client.AsyncDifyServiceClient(api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL'))
async for chunk in cln.chat_messages(
inputs={}, query='调用工具查看现在几点?', user='test'
):
async for chunk in cln.chat_messages(inputs={}, query='调用工具查看现在几点?', user='test'):
print(json.dumps(chunk, ensure_ascii=False, indent=4))
async def test_upload_file(self):
cln = client.AsyncDifyServiceClient(
api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL')
)
cln = client.AsyncDifyServiceClient(api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL'))
file_bytes = open('img.png', 'rb').read()
@@ -32,9 +26,7 @@ class TestDifyClient:
print(json.dumps(resp, ensure_ascii=False, indent=4))
async def test_workflow_run(self):
cln = client.AsyncDifyServiceClient(
api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL')
)
cln = client.AsyncDifyServiceClient(api_key=os.getenv('DIFY_API_KEY'), base_url=os.getenv('DIFY_BASE_URL'))
# resp = await cln.workflow_run(inputs={}, user="test")
# # print(json.dumps(resp, ensure_ascii=False, indent=4))

View File

@@ -1,5 +1,5 @@
import asyncio
import dingtalk_stream
import dingtalk_stream # type: ignore
from dingtalk_stream import AckMessage
@@ -27,9 +27,3 @@ class EchoTextHandler(dingtalk_stream.ChatbotHandler):
await asyncio.sleep(0.1) # 异步等待,避免阻塞
return self.incoming_message
async def get_dingtalk_client(client_id, client_secret):
from api import DingTalkClient # 延迟导入,避免循环导入
return DingTalkClient(client_id, client_secret)

View File

@@ -2,7 +2,7 @@ import base64
import json
import time
from typing import Callable
import dingtalk_stream
import dingtalk_stream # type: ignore
from .EchoHandler import EchoTextHandler
from .dingtalkevent import DingTalkEvent
import httpx
@@ -49,8 +49,8 @@ class DingTalkClient:
self.access_token = response_data.get('accessToken')
expires_in = int(response_data.get('expireIn', 7200))
self.access_token_expiry_time = time.time() + expires_in - 60
except Exception as e:
await self.logger.error("failed to get access token in dingtalk")
except Exception:
await self.logger.error('failed to get access token in dingtalk')
async def is_token_expired(self):
"""检查token是否过期"""
@@ -75,24 +75,22 @@ class DingTalkClient:
result = response.json()
download_url = result.get('downloadUrl')
else:
await self.logger.error(f"failed to get download url: {response.json()}")
await self.logger.error(f'failed to get download url: {response.json()}')
if download_url:
return await self.download_url_to_base64(download_url)
async def download_url_to_base64(self, download_url):
async with httpx.AsyncClient() as client:
response = await client.get(download_url)
if response.status_code == 200:
file_bytes = response.content
mime_type = response.headers.get("Content-Type", "application/octet-stream")
base64_str = base64.b64encode(file_bytes).decode("utf-8")
return f"data:{mime_type};base64,{base64_str}"
mime_type = response.headers.get('Content-Type', 'application/octet-stream')
base64_str = base64.b64encode(file_bytes).decode('utf-8')
return f'data:{mime_type};base64,{base64_str}'
else:
await self.logger.error(f"failed to get files: {response.json()}")
await self.logger.error(f'failed to get files: {response.json()}')
async def get_audio_url(self, download_code: str):
if not await self.check_access_token():
@@ -108,7 +106,7 @@ class DingTalkClient:
if download_url:
return await self.download_url_to_base64(download_url)
else:
await self.logger.error(f"failed to get audio: {response.json()}")
await self.logger.error(f'failed to get audio: {response.json()}')
else:
raise Exception(f'Error: {response.status_code}, {response.text}')
@@ -120,12 +118,12 @@ class DingTalkClient:
if event:
await self._handle_message(event)
async def send_message(self, content: str, incoming_message,at:bool):
async def send_message(self, content: str, incoming_message, at: bool):
if self.markdown_card:
if at:
self.EchoTextHandler.reply_markdown(
title='@'+incoming_message.sender_nick+' '+content,
text='@'+incoming_message.sender_nick+' '+content,
title='@' + incoming_message.sender_nick + ' ' + content,
text='@' + incoming_message.sender_nick + ' ' + content,
incoming_message=incoming_message,
)
else:
@@ -195,9 +193,9 @@ class DingTalkClient:
copy_message_data = message_data.copy()
del copy_message_data['IncomingMessage']
# print("message_data:", json.dumps(copy_message_data, indent=4, ensure_ascii=False))
except Exception as e:
except Exception:
if self.logger:
await self.logger.error(f"Error in get_message: {traceback.format_exc()}")
await self.logger.error(f'Error in get_message: {traceback.format_exc()}')
else:
traceback.print_exc()
@@ -226,8 +224,8 @@ class DingTalkClient:
if response.status_code == 200:
return
except Exception:
await self.logger.error(f"failed to send proactive massage to person: {traceback.format_exc()}")
raise Exception(f"failed to send proactive massage to person: {traceback.format_exc()}")
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
async def send_proactive_message_to_group(self, target_id: str, content: str):
if not await self.check_access_token():
@@ -252,8 +250,8 @@ class DingTalkClient:
if response.status_code == 200:
return
except Exception:
await self.logger.error(f"failed to send proactive massage to group: {traceback.format_exc()}")
raise Exception(f"failed to send proactive massage to group: {traceback.format_exc()}")
await self.logger.error(f'failed to send proactive massage to group: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to group: {traceback.format_exc()}')
async def start(self):
"""启动 WebSocket 连接,监听消息"""

View File

@@ -1,5 +1,5 @@
from typing import Dict, Any, Optional
import dingtalk_stream
import dingtalk_stream # type: ignore
class DingTalkEvent(dict):

View File

@@ -1,7 +1,7 @@
# 微信公众号的加解密算法与企业微信一样,所以直接使用企业微信的加解密算法文件
import time
import traceback
from ..wecom_api.WXBizMsgCrypt3 import WXBizMsgCrypt
from libs.wecom_api.WXBizMsgCrypt3 import WXBizMsgCrypt
import xml.etree.ElementTree as ET
from quart import Quart, request
import hashlib
@@ -55,7 +55,7 @@ class OAClient:
echostr = request.args.get('echostr', '')
msg_signature = request.args.get('msg_signature', '')
if msg_signature is None:
await self.logger.error(f'msg_signature不在请求体中')
await self.logger.error('msg_signature不在请求体中')
raise Exception('msg_signature不在请求体中')
if request.method == 'GET':
@@ -66,7 +66,7 @@ class OAClient:
if check_signature == signature:
return echostr # 验证成功返回echostr
else:
await self.logger.error(f'拒绝请求')
await self.logger.error('拒绝请求')
raise Exception('拒绝请求')
elif request.method == 'POST':
encryt_msg = await request.data
@@ -75,9 +75,9 @@ class OAClient:
xml_msg = xml_msg.decode('utf-8')
if ret != 0:
await self.logger.error(f'消息解密失败')
await self.logger.error('消息解密失败')
raise Exception('消息解密失败')
message_data = await self.get_message(xml_msg)
if message_data:
event = OAEvent.from_payload(message_data)
@@ -214,7 +214,7 @@ class OAClientForLongerResponse:
msg_signature = request.args.get('msg_signature', '')
if msg_signature is None:
await self.logger.error(f'msg_signature不在请求体中')
await self.logger.error('msg_signature不在请求体中')
raise Exception('msg_signature不在请求体中')
if request.method == 'GET':
@@ -229,9 +229,8 @@ class OAClientForLongerResponse:
xml_msg = xml_msg.decode('utf-8')
if ret != 0:
await self.logger.error(f'消息解密失败')
await self.logger.error('消息解密失败')
raise Exception('消息解密失败')
# 解析 XML
root = ET.fromstring(xml_msg)