Files
LangBot/pkg/storage/providers/localstorage.py
Junyan Qin (Chin) f1e9f46af1 feat: event log of bots (#1441)
* feat: basic arch of event log

* feat: complete event log framework

* fix: bad struct in bot log api

* feat: add event logging to all platform adapters

Co-Authored-By: wangcham233@gmail.com <651122857@qq.com>

* feat: add event logging to client classes

Co-Authored-By: wangcham233@gmail.com <651122857@qq.com>

* refactor: bot log getting api

* perf: logger for aiocqhttp and gewechat

* fix: add ignored logger in dingtalk

* fix: seq id bug in log getting

* feat: add logger in dingtalk,QQ official,Slack, wxoa

* feat: add logger for wecom

* feat: add logger for wecomcs

* perf(event logger): image processing

* 完成机器人日志的前端部分 (#1479)

* feat: webui  bot log framework done

* feat: bot log complete

* perf(bot-log): style

* chore: fix incompleted i18n

* feat: support message session copy

* fix: filter and badge text

* perf: styles

* feat: add bot toggle switch in bot card

* fix: linter errors

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: wangcham233@gmail.com <651122857@qq.com>
Co-authored-by: HYana <65863826+KaedeSAMA@users.noreply.github.com>
2025-05-27 22:36:50 +08:00

46 lines
1.0 KiB
Python

from __future__ import annotations
import os
import aiofiles
from ...core import app
from .. import provider
LOCAL_STORAGE_PATH = os.path.join('data', 'storage')
class LocalStorageProvider(provider.StorageProvider):
def __init__(self, ap: app.Application):
super().__init__(ap)
if not os.path.exists(LOCAL_STORAGE_PATH):
os.makedirs(LOCAL_STORAGE_PATH)
async def save(
self,
key: str,
value: bytes,
):
async with aiofiles.open(os.path.join(LOCAL_STORAGE_PATH, f'{key}'), 'wb') as f:
await f.write(value)
async def load(
self,
key: str,
) -> bytes:
async with aiofiles.open(os.path.join(LOCAL_STORAGE_PATH, f'{key}'), 'rb') as f:
return await f.read()
async def exists(
self,
key: str,
) -> bool:
return os.path.exists(os.path.join(LOCAL_STORAGE_PATH, f'{key}'))
async def delete(
self,
key: str,
):
os.remove(os.path.join(LOCAL_STORAGE_PATH, f'{key}'))