mirror of
https://github.com/fish2018/pansou.git
synced 2025-11-25 11:29:30 +08:00
缓存优化
This commit is contained in:
167
README.md
167
README.md
@@ -9,7 +9,7 @@ PanSou是一个高性能的网盘资源搜索API服务,支持TG搜索和网盘
|
||||
- **网盘类型分类**:自动识别多种网盘链接,按类型归类展示
|
||||
- **智能排序**:基于时间和关键词权重的多级排序策略
|
||||
- **插件系统**:支持通过插件扩展搜索来源,已内置多个网盘搜索插件;支持"尽快响应,持续处理"的异步搜索模式
|
||||
- **两级缓存**:内存+磁盘缓存机制,大幅提升重复查询速度;异步插件缓存自动保存到磁盘,系统重启后自动恢复,
|
||||
- **两级异步缓存**:内存+分片磁盘缓存机制,大幅提升重复查询速度和并发性能,即使在不强制刷新的情况下也能获取异步插件更新的最新缓存数据
|
||||
|
||||
## 支持的网盘类型
|
||||
|
||||
@@ -55,6 +55,7 @@ export CACHE_ENABLED=true
|
||||
export CACHE_PATH="./cache"
|
||||
export CACHE_MAX_SIZE=100 # MB
|
||||
export CACHE_TTL=60 # 分钟
|
||||
export SHARD_COUNT=8 # 分片数量
|
||||
|
||||
# 异步插件配置
|
||||
export ASYNC_PLUGIN_ENABLED=true
|
||||
@@ -213,6 +214,8 @@ GET /api/search?kw=速度与激情&channels=tgsearchers2,xxx&conc=2&refresh=true
|
||||
| CACHE_PATH | 缓存文件路径 | ./cache |
|
||||
| CACHE_MAX_SIZE | 最大缓存大小(MB) | 100 |
|
||||
| CACHE_TTL | 缓存生存时间(分钟) | 60 |
|
||||
| SHARD_COUNT | 缓存分片数量 | 8 |
|
||||
| SERIALIZER_TYPE | 序列化器类型(gob/json) | gob |
|
||||
| ENABLE_COMPRESSION | 是否启用压缩 | false |
|
||||
| MIN_SIZE_TO_COMPRESS | 最小压缩阈值(字节) | 1024 |
|
||||
| GC_PERCENT | GC触发百分比 | 100 |
|
||||
@@ -223,6 +226,7 @@ GET /api/search?kw=速度与激情&channels=tgsearchers2,xxx&conc=2&refresh=true
|
||||
| ASYNC_MAX_BACKGROUND_WORKERS | 最大后台工作者数量 | 20 |
|
||||
| ASYNC_MAX_BACKGROUND_TASKS | 最大后台任务数量 | 100 |
|
||||
| ASYNC_CACHE_TTL_HOURS | 异步缓存有效期(小时) | 1 |
|
||||
| CACHE_FRESHNESS_SECONDS | 缓存数据新鲜度(秒) | 30 |
|
||||
|
||||
## 性能优化
|
||||
|
||||
@@ -230,11 +234,16 @@ PanSou 实现了多项性能优化技术:
|
||||
|
||||
1. **JSON处理优化**:使用 sonic 高性能 JSON 库
|
||||
2. **内存优化**:预分配策略、对象池化、GC参数优化
|
||||
3. **缓存优化**:两级缓存、异步写入、优化键生成
|
||||
3. **缓存优化**:
|
||||
- 增强版两级缓存(内存+分片磁盘)
|
||||
- 分片磁盘缓存减少锁竞争
|
||||
- 高效序列化(Gob和JSON双支持)
|
||||
- 分离的缓存键(TG搜索和插件搜索独立缓存)
|
||||
- 缓存数据时间戳检查(获取最新数据)
|
||||
- 异步写入(不阻塞主流程)
|
||||
4. **HTTP客户端优化**:连接池、HTTP/2支持
|
||||
5. **并发优化**:工作池、智能并发控制
|
||||
6. **传输压缩**:支持 gzip 压缩
|
||||
7. **异步插件缓存**:持久化缓存、即时保存、优雅关闭机制
|
||||
|
||||
## 异步插件系统
|
||||
|
||||
@@ -248,142 +257,26 @@ PanSou实现了高级异步插件系统,解决了某些搜索源响应时间
|
||||
- **优雅关闭**:在程序退出前保存缓存,确保数据不丢失
|
||||
- **增量更新**:智能合并新旧结果,保留有价值的数据
|
||||
- **后台自动刷新**:对于接近过期的缓存,在后台自动刷新
|
||||
- **资源管理**:通过工作池控制并发任务数量,避免资源耗尽
|
||||
- **与主程序缓存协同**:通过时间戳检查机制,确保即使在不强制刷新的情况下也能获取最新缓存数据
|
||||
|
||||
### 异步插件工作流程
|
||||
### 缓存系统特点
|
||||
|
||||
1. **缓存检查**:首先检查是否有有效缓存
|
||||
2. **快速响应**:如果有缓存,立即返回;如果缓存接近过期,在后台刷新
|
||||
3. **双通道处理**:如果没有缓存,启动快速响应通道和后台处理通道
|
||||
4. **超时控制**:在响应超时时返回当前结果(可能为空),后台继续处理
|
||||
5. **缓存更新**:后台处理完成后更新缓存,供后续查询使用
|
||||
1. **分片磁盘缓存**:
|
||||
- 将缓存数据分散到多个子目录,减少锁竞争
|
||||
- 通过哈希算法将缓存键均匀分布到不同分片
|
||||
- 提高高并发场景下的性能
|
||||
|
||||
## 插件系统
|
||||
2. **序列化器接口**:
|
||||
- 统一的序列化和反序列化操作
|
||||
- 支持Gob和JSON双序列化方式
|
||||
- Gob序列化提供更高性能和更小的结果大小
|
||||
|
||||
PanSou 实现了灵活的插件系统,允许轻松扩展搜索来源
|
||||
3. **缓存数据时间戳检查**:
|
||||
- 检查缓存数据是否是最新的(30秒内更新)
|
||||
- 确保获取异步插件在后台更新的最新缓存数据
|
||||
- 在不强制刷新的情况下也能获取最新数据
|
||||
|
||||
详情参考[插件开发指南.md](docs/插件开发指南.md)
|
||||
|
||||
### 插件特性
|
||||
|
||||
- **自动注册机制**:插件通过init函数自动注册,无需修改主程序代码
|
||||
- **统一接口**:所有插件实现相同的SearchPlugin接口
|
||||
- **双层超时控制**:插件内部使用自定义超时时间,系统外部提供强制超时保障
|
||||
- **并发执行**:插件搜索与频道搜索并发执行,提高整体性能
|
||||
- **结果标准化**:插件返回标准化的搜索结果,便于统一处理
|
||||
- **异步处理**:支持异步插件,实现"尽快响应,持续处理"的模式
|
||||
|
||||
### 开发自定义插件
|
||||
|
||||
1. 创建新的插件包:
|
||||
|
||||
```go
|
||||
package myplugin
|
||||
|
||||
import (
|
||||
"pansou/model"
|
||||
"pansou/plugin"
|
||||
)
|
||||
|
||||
// 在init函数中注册插件
|
||||
func init() {
|
||||
plugin.RegisterGlobalPlugin(NewMyPlugin())
|
||||
}
|
||||
|
||||
// MyPlugin 自定义插件
|
||||
type MyPlugin struct {}
|
||||
|
||||
// NewMyPlugin 创建新的插件实例
|
||||
func NewMyPlugin() *MyPlugin {
|
||||
return &MyPlugin{}
|
||||
}
|
||||
|
||||
// Name 返回插件名称
|
||||
func (p *MyPlugin) Name() string {
|
||||
return "myplugin"
|
||||
}
|
||||
|
||||
// Priority 返回插件优先级
|
||||
func (p *MyPlugin) Priority() int {
|
||||
return 3 // 中等优先级
|
||||
}
|
||||
|
||||
// Search 执行搜索并返回结果
|
||||
func (p *MyPlugin) Search(keyword string) ([]model.SearchResult, error) {
|
||||
// 实现搜索逻辑
|
||||
// ...
|
||||
|
||||
return results, nil
|
||||
}
|
||||
```
|
||||
|
||||
2. 在main.go中导入插件包:
|
||||
|
||||
```go
|
||||
import (
|
||||
// 导入插件包以触发init函数
|
||||
_ "pansou/plugin/myplugin"
|
||||
)
|
||||
```
|
||||
|
||||
## 附录
|
||||
|
||||
### TG频道
|
||||
|
||||
```
|
||||
"channels": ["tgsearchers2","SharePanBaidu", "yunpanxunlei", "tianyifc", "BaiduCloudDisk", "txtyzy", "peccxinpd", "gotopan", "xingqiump4", "yunpanqk", "PanjClub", "kkxlzy", "baicaoZY", "MCPH01", "share_aliyun", "pan115_share", "bdwpzhpd", "ysxb48", "pankuake_share", "jdjdn1111", "yggpan", "yunpanall", "MCPH086", "zaihuayun", "Q66Share", "NewAliPan", "Oscar_4Kmovies", "ucwpzy", "alyp_TV", "alyp_4K_Movies", "shareAliyun", "alyp_1", "yunpanpan", "hao115", "yunpanshare", "dianyingshare", "Quark_Movies", "XiangxiuNB", "NewQuark", "ydypzyfx", "kuakeyun", "ucquark", "xx123pan", "yingshifenxiang123", "zyfb123", "pan123pan", "tyypzhpd", "tianyirigeng", "cloudtianyi", "hdhhd21", "Lsp115", "oneonefivewpfx", "Maidanglaocom", "qixingzhenren", "taoxgzy", "tgsearchers115", "Channel_Shares_115", "tyysypzypd", "vip115hot", "wp123zy", "yunpan139", "yunpan189", "yunpanuc", "yydf_hzl", "alyp_Animation", "alyp_JLP","leoziyuan"]
|
||||
```
|
||||
|
||||
### 配置参考
|
||||
|
||||
supervisor配置参考
|
||||
|
||||
```
|
||||
[program:pansou]
|
||||
environment=PORT=9999,CHANNELS="SharePanBaidu,yunpanxunlei,tianyifc,BaiduCloudDisk,txtyzy,peccxinpd,gotopan,xingqiump4,yunpanqk,PanjClub,kkxlzy,baicaoZY,MCPH01,share_aliyun,pan115_share,bdwpzhpd,ysxb48,pankuake_share,jdjdn1111,yggpan,yunpanall,MCPH086,zaihuayun,Q66Share,NewAliPan,Oscar_4Kmovies,ucwpzy,alyp_TV,alyp_4K_Movies,shareAliyun,alyp_1,yunpanpan,hao115,yunpanshare,dianyingshare,Quark_Movies,XiangxiuNB,NewQuark,ydypzyfx,kuakeyun,ucquark,xx123pan,yingshifenxiang123,zyfb123,pan123pan,tyypzhpd,tianyirigeng,cloudtianyi,hdhhd21,Lsp115,oneonefivewpfx,Maidanglaocom,qixingzhenren,taoxgzy,tgsearchers115,Channel_Shares_115,tyysypzypd,vip115hot,wp123zy,yunpan139,yunpan189,yunpanuc,yydf_hzl,alyp_Animation,alyp_JLP,tgsearchers2,leoziyuan"
|
||||
command=/home/work/pansou/pansou
|
||||
directory=/home/work/pansou
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=5
|
||||
startretries=3
|
||||
exitcodes=0
|
||||
stopwaitsecs=10
|
||||
stopasgroup=true
|
||||
killasgroup=true
|
||||
```
|
||||
|
||||
nginx配置参考
|
||||
|
||||
```
|
||||
server {
|
||||
listen 80;
|
||||
server_name pansou.252035.xyz;
|
||||
|
||||
# 将 HTTP 重定向到 HTTPS
|
||||
return 301 https://$host$request_uri;
|
||||
}
|
||||
|
||||
server {
|
||||
listen 443 ssl http2; # 添加 http2
|
||||
server_name pansou.252035.xyz;
|
||||
|
||||
# 证书和密钥路径
|
||||
ssl_certificate /etc/letsencrypt/live/252035.xyz/fullchain.pem;
|
||||
ssl_certificate_key /etc/letsencrypt/live/252035.xyz/privkey.pem;
|
||||
|
||||
# 增强 SSL 安全性
|
||||
ssl_protocols TLSv1.2 TLSv1.3;
|
||||
ssl_ciphers EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH;
|
||||
ssl_prefer_server_ciphers on;
|
||||
|
||||
# 后端代理
|
||||
location / {
|
||||
proxy_pass http://127.0.0.1:9999;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
}
|
||||
}
|
||||
```
|
||||
4. **分离的缓存键**:
|
||||
- TG搜索和插件搜索使用独立的缓存键
|
||||
- 实现独立更新,互不影响
|
||||
- 提高缓存命中率和更新效率
|
||||
@@ -10,15 +10,17 @@ PanSou是一个高性能的网盘资源搜索API服务,支持Telegram搜索和
|
||||
- **高性能并发**:通过工作池实现高效并发搜索
|
||||
- **智能排序**:基于时间和关键词权重的多级排序策略
|
||||
- **网盘类型分类**:自动识别多种网盘链接,按类型归类展示
|
||||
- **两级缓存**:内存+磁盘缓存机制,大幅提升重复查询速度
|
||||
- **增强版两级缓存**:内存+分片磁盘缓存机制,大幅提升重复查询速度
|
||||
- **插件系统**:支持通过插件扩展搜索来源
|
||||
- **缓存键一致性**:优化的缓存键生成算法,确保相同语义查询的缓存命中
|
||||
- **异步插件缓存更新**:支持在不强制刷新的情况下获取异步插件的最新缓存数据
|
||||
|
||||
### 1.2 技术栈
|
||||
|
||||
- **编程语言**:Go
|
||||
- **Web框架**:Gin
|
||||
- **缓存**:自定义两级缓存(内存+磁盘)
|
||||
- **缓存**:自定义增强版两级缓存(内存+分片磁盘)
|
||||
- **序列化**:Gob和JSON双序列化支持
|
||||
- **JSON处理**:sonic(高性能JSON库)
|
||||
- **并发控制**:工作池模式
|
||||
- **HTTP客户端**:自定义优化的HTTP客户端
|
||||
@@ -65,8 +67,9 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
|
||||
- **搜索服务**:整合插件和Telegram搜索结果
|
||||
- **结果处理**:过滤、排序和分类搜索结果
|
||||
- **缓存管理**:管理搜索结果的缓存策略
|
||||
- **缓存管理**:管理搜索结果的缓存策略,支持TG和插件搜索的独立缓存
|
||||
- **缓存键生成**:基于所有影响结果的参数生成一致的缓存键
|
||||
- **缓存更新检测**:检查缓存数据是否是最新的,支持获取异步更新的缓存数据
|
||||
|
||||
#### 2.2.3 插件系统
|
||||
|
||||
@@ -74,10 +77,12 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
- **插件管理**:管理插件的注册、获取和调用
|
||||
- **自动注册**:通过init函数实现插件自动注册
|
||||
- **高性能JSON处理**:使用sonic库优化JSON序列化/反序列化
|
||||
- **异步插件**:支持"尽快响应,持续处理"的异步搜索模式
|
||||
|
||||
#### 2.2.4 工具层
|
||||
|
||||
- **缓存工具**:两级缓存实现(内存+磁盘)
|
||||
- **缓存工具**:增强版两级缓存实现(内存+分片磁盘)
|
||||
- **序列化器**:支持Gob和JSON双序列化方式
|
||||
- **HTTP客户端**:优化的HTTP客户端,支持代理
|
||||
- **工作池**:并发任务执行的工作池
|
||||
- **JSON工具**:高性能JSON处理工具
|
||||
@@ -86,9 +91,9 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
|
||||
1. **请求接收**:API层接收搜索请求
|
||||
2. **参数处理**:解析、验证和规范化请求参数
|
||||
3. **缓存键生成**:基于所有影响结果的参数生成一致的缓存键
|
||||
4. **缓存检查**:检查是否有缓存结果
|
||||
5. **并发搜索**:如无缓存,并发执行搜索任务
|
||||
3. **缓存键生成**:分别为TG搜索和插件搜索生成独立的缓存键
|
||||
4. **缓存检查**:检查是否有缓存结果,并验证缓存数据是否是最新的
|
||||
5. **并发搜索**:如无缓存或缓存数据不是最新的,并发执行搜索任务
|
||||
6. **结果处理**:过滤、排序和分类搜索结果
|
||||
7. **缓存存储**:将结果存入缓存
|
||||
8. **响应返回**:返回处理后的结果
|
||||
@@ -98,9 +103,10 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
### 3.1 高性能设计
|
||||
|
||||
- **并发搜索**:使用工作池模式实现高效并发
|
||||
- **两级缓存**:内存缓存提供快速访问,磁盘缓存提供持久存储
|
||||
- **增强版两级缓存**:内存缓存提供快速访问,分片磁盘缓存提供持久存储和高并发支持
|
||||
- **异步操作**:非关键路径使用异步处理
|
||||
- **内存优化**:预分配策略、对象池化、GC参数优化
|
||||
- **高效序列化**:使用Gob序列化提高性能,保留JSON序列化兼容性
|
||||
- **高效JSON处理**:使用sonic库替代标准库,提升序列化性能
|
||||
|
||||
### 3.2 可扩展性设计
|
||||
@@ -108,6 +114,7 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
- **插件系统**:通过统一接口和自动注册机制实现可扩展
|
||||
- **模块化**:清晰的模块边界和职责划分
|
||||
- **配置驱动**:通过环境变量实现灵活配置
|
||||
- **序列化器接口**:通过统一接口支持多种序列化方式
|
||||
|
||||
### 3.3 可靠性设计
|
||||
|
||||
@@ -115,6 +122,7 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次:
|
||||
- **错误处理**:全面的错误捕获和处理
|
||||
- **优雅降级**:单个搜索源失败不影响整体结果
|
||||
- **缓存一致性**:确保相同语义的查询使用相同的缓存键
|
||||
- **双缓存机制**:优先使用增强版缓存,失败时回退到原始缓存
|
||||
|
||||
## 4. 代码组织结构
|
||||
|
||||
@@ -139,6 +147,7 @@ pansou/
|
||||
│ └── response.go # 响应模型
|
||||
├── plugin/ # 插件系统
|
||||
│ ├── plugin.go # 插件接口和管理
|
||||
│ ├── baseasyncplugin.go # 异步插件基类实现
|
||||
│ ├── jikepan/ # 即刻盘插件
|
||||
│ ├── hunhepan/ # 混合盘插件
|
||||
│ ├── pansearch/ # 盘搜插件
|
||||
@@ -150,11 +159,12 @@ pansou/
|
||||
├── util/ # 工具层
|
||||
│ ├── cache/ # 缓存工具
|
||||
│ │ ├── cache_key.go # 优化的缓存键生成
|
||||
│ │ ├── cache_key_test.go # 缓存键生成测试
|
||||
│ │ ├── disk_cache.go # 磁盘缓存
|
||||
│ │ ├── sharded_disk_cache.go # 分片磁盘缓存
|
||||
│ │ ├── two_level_cache.go # 两级缓存
|
||||
│ │ ├── utils.go # 缓存工具函数
|
||||
│ │ └── utils_test.go # 缓存工具测试
|
||||
│ │ ├── enhanced_two_level_cache.go # 增强版两级缓存
|
||||
│ │ ├── serializer.go # 序列化器接口
|
||||
│ │ └── utils.go # 缓存工具函数
|
||||
│ ├── compression.go # 压缩工具
|
||||
│ ├── convert.go # 类型转换工具
|
||||
│ ├── http_util.go # HTTP工具函数
|
||||
|
||||
853
docs/3-服务层设计.md
853
docs/3-服务层设计.md
File diff suppressed because it is too large
Load Diff
245
docs/4-插件系统设计.md
245
docs/4-插件系统设计.md
@@ -250,121 +250,220 @@ func loadCacheFromDisk() {
|
||||
### 5.4 异步插件实现示例
|
||||
|
||||
```go
|
||||
// HunhepanAsyncPlugin 混合盘搜索异步插件
|
||||
type HunhepanAsyncPlugin struct {
|
||||
// JikePanPlugin 即刻盘异步搜索插件
|
||||
type JikePanPlugin struct {
|
||||
*plugin.BaseAsyncPlugin
|
||||
}
|
||||
|
||||
// NewHunhepanAsyncPlugin 创建新的混合盘搜索异步插件
|
||||
func NewHunhepanAsyncPlugin() *HunhepanAsyncPlugin {
|
||||
return &HunhepanAsyncPlugin{
|
||||
BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("hunhepan_async", 3),
|
||||
// NewJikePanPlugin 创建即刻盘插件
|
||||
func NewJikePanPlugin() *JikePanPlugin {
|
||||
return &JikePanPlugin{
|
||||
BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("jikepan", 5),
|
||||
}
|
||||
}
|
||||
|
||||
// Search 执行搜索并返回结果
|
||||
func (p *HunhepanAsyncPlugin) Search(keyword string) ([]model.SearchResult, error) {
|
||||
// Search 实现搜索接口
|
||||
func (p *JikePanPlugin) Search(keyword string) ([]model.SearchResult, error) {
|
||||
// 生成缓存键
|
||||
cacheKey := keyword
|
||||
cacheKey := generateCacheKey(keyword)
|
||||
|
||||
// 使用异步搜索基础方法
|
||||
return p.AsyncSearch(keyword, cacheKey, p.doSearch)
|
||||
// 使用异步搜索
|
||||
return p.BaseAsyncPlugin.AsyncSearch(keyword, cacheKey, p.doSearch)
|
||||
}
|
||||
|
||||
// doSearch 实际的搜索实现
|
||||
func (p *HunhepanAsyncPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) {
|
||||
// 实现具体搜索逻辑
|
||||
// doSearch 执行实际搜索
|
||||
func (p *JikePanPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) {
|
||||
// 实现搜索逻辑
|
||||
// ...
|
||||
return results, nil
|
||||
}
|
||||
```
|
||||
|
||||
## 6. 优雅关闭机制
|
||||
### 5.5 缓存更新检测机制
|
||||
|
||||
系统实现了优雅关闭机制,确保在程序退出前保存异步插件缓存:
|
||||
为了确保即使在不强制刷新的情况下也能获取最新的缓存数据,系统在搜索服务中实现了缓存数据时间戳检查机制:
|
||||
|
||||
```go
|
||||
// 在main.go中
|
||||
// 创建通道来接收操作系统信号
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// 等待中断信号
|
||||
<-quit
|
||||
fmt.Println("正在关闭服务器...")
|
||||
|
||||
// 保存异步插件缓存
|
||||
plugin.SaveCacheToDisk()
|
||||
|
||||
// 优雅关闭服务器
|
||||
if err := srv.Shutdown(ctx); err != nil {
|
||||
log.Fatalf("服务器关闭异常: %v", err)
|
||||
// 在searchPlugins方法中
|
||||
if err == nil && hit {
|
||||
var results []model.SearchResult
|
||||
if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil {
|
||||
// 确保缓存数据是最新的
|
||||
// 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回
|
||||
// 否则,我们将重新执行搜索以获取最新数据
|
||||
if len(results) > 0 {
|
||||
// 获取当前时间
|
||||
now := time.Now()
|
||||
// 检查缓存数据是否是最近更新的
|
||||
for _, result := range results {
|
||||
if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second {
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果缓存中没有数据,直接返回空结果
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 7. JSON处理优化
|
||||
这种机制特别适用于异步插件,因为异步插件会在后台持续更新缓存数据。通过时间戳检查,系统能够确保用户总是获取到最新的搜索结果,同时保持良好的性能。
|
||||
|
||||
为了提高插件的性能,特别是在处理大量JSON数据时,所有插件都使用了高性能的JSON库进行序列化和反序列化操作。
|
||||
### 5.6 异步插件与主程序缓存协同
|
||||
|
||||
### 7.1 JSON库选择
|
||||
异步插件系统与主程序的缓存系统协同工作,实现了完整的缓存更新流程:
|
||||
|
||||
PanSou使用字节跳动开发的sonic库替代标准库的encoding/json,提供更高效的JSON处理:
|
||||
1. **异步插件缓存更新**:异步插件在后台持续更新自己的缓存
|
||||
2. **主程序缓存检查**:主程序在获取缓存数据时检查时间戳
|
||||
3. **缓存更新判断**:如果缓存数据不是最新的,重新执行搜索
|
||||
4. **缓存写入**:搜索结果异步写入主程序缓存
|
||||
|
||||
这种协同机制确保了用户总是能获取到最新的搜索结果,同时保持系统的高性能和响应速度。
|
||||
|
||||
## 6. 插件管理器
|
||||
|
||||
### 6.1 插件管理器设计
|
||||
|
||||
插件管理器负责管理所有已注册的插件,提供统一的接口获取和使用插件。
|
||||
|
||||
```go
|
||||
// 使用优化的JSON库
|
||||
import (
|
||||
"pansou/util/json" // 内部封装了github.com/bytedance/sonic
|
||||
)
|
||||
// PluginManager 插件管理器
|
||||
type PluginManager struct {
|
||||
plugins []SearchPlugin
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// 序列化示例
|
||||
jsonData, err := json.Marshal(reqBody)
|
||||
// NewPluginManager 创建插件管理器
|
||||
func NewPluginManager() *PluginManager {
|
||||
return &PluginManager{
|
||||
plugins: make([]SearchPlugin, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// 反序列化示例
|
||||
if err := json.Unmarshal(respBody, &apiResp); err != nil {
|
||||
return nil, fmt.Errorf("decode response failed: %w", err)
|
||||
// LoadPlugins 加载所有已注册的插件
|
||||
func (m *PluginManager) LoadPlugins() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
// 获取所有已注册的插件
|
||||
m.plugins = GetRegisteredPlugins()
|
||||
|
||||
// 按优先级排序
|
||||
sort.Slice(m.plugins, func(i, j int) bool {
|
||||
return m.plugins[i].Priority() > m.plugins[j].Priority()
|
||||
})
|
||||
}
|
||||
|
||||
// GetPlugins 获取所有插件
|
||||
func (m *PluginManager) GetPlugins() []SearchPlugin {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
return m.plugins
|
||||
}
|
||||
```
|
||||
|
||||
### 7.2 性能优势
|
||||
|
||||
- **更快的序列化/反序列化速度**:sonic库比标准库快2-5倍
|
||||
- **更低的内存分配**:减少GC压力
|
||||
- **SIMD加速**:利用现代CPU的向量指令集
|
||||
- **并行处理**:大型JSON可以并行处理
|
||||
|
||||
### 7.3 实现方式
|
||||
|
||||
所有插件通过统一的内部包装库使用sonic:
|
||||
### 6.2 插件管理器使用
|
||||
|
||||
```go
|
||||
// util/json/json.go
|
||||
package json
|
||||
// 在main.go中初始化插件管理器
|
||||
pluginManager := plugin.NewPluginManager()
|
||||
pluginManager.LoadPlugins()
|
||||
|
||||
// 创建搜索服务,传入插件管理器
|
||||
searchService := service.NewSearchService(pluginManager)
|
||||
|
||||
// 设置路由
|
||||
router := api.SetupRouter(searchService)
|
||||
```
|
||||
|
||||
## 7. 插件系统优势
|
||||
|
||||
1. **可扩展性**:通过统一接口和自动注册机制,轻松添加新的搜索源
|
||||
2. **高性能**:异步插件模式提供快速响应和后台处理,提高用户体验
|
||||
3. **缓存优化**:完善的缓存机制,提高重复查询的响应速度
|
||||
4. **错误隔离**:单个插件的错误不会影响其他插件和整体系统
|
||||
5. **优先级控制**:通过插件优先级控制搜索结果的排序
|
||||
|
||||
## 8. 插件开发指南
|
||||
|
||||
### 8.1 同步插件开发
|
||||
|
||||
```go
|
||||
package myplugin
|
||||
|
||||
import (
|
||||
"github.com/bytedance/sonic"
|
||||
"pansou/model"
|
||||
"pansou/plugin"
|
||||
)
|
||||
|
||||
// API是sonic的全局配置实例
|
||||
var API = sonic.ConfigDefault
|
||||
// MyPlugin 自定义插件
|
||||
type MyPlugin struct{}
|
||||
|
||||
// 初始化sonic配置
|
||||
// 注册插件
|
||||
func init() {
|
||||
// 根据需要配置sonic选项
|
||||
API = sonic.Config{
|
||||
UseNumber: true,
|
||||
EscapeHTML: true,
|
||||
SortMapKeys: false, // 生产环境设为false提高性能
|
||||
}.Froze()
|
||||
plugin.RegisterGlobalPlugin(&MyPlugin{})
|
||||
}
|
||||
|
||||
// Marshal 使用sonic序列化对象到JSON
|
||||
func Marshal(v interface{}) ([]byte, error) {
|
||||
return API.Marshal(v)
|
||||
// Name 返回插件名称
|
||||
func (p *MyPlugin) Name() string {
|
||||
return "myplugin"
|
||||
}
|
||||
|
||||
// Unmarshal 使用sonic反序列化JSON到对象
|
||||
func Unmarshal(data []byte, v interface{}) error {
|
||||
return API.Unmarshal(data, v)
|
||||
// Search 执行搜索
|
||||
func (p *MyPlugin) Search(keyword string) ([]model.SearchResult, error) {
|
||||
// 实现搜索逻辑
|
||||
// ...
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// Priority 返回插件优先级
|
||||
func (p *MyPlugin) Priority() int {
|
||||
return 3
|
||||
}
|
||||
```
|
||||
|
||||
这种统一的JSON处理方式确保了所有插件都能获得一致的高性能,特别是在处理大量搜索结果时,显著提升了系统整体响应速度。
|
||||
### 8.2 异步插件开发
|
||||
|
||||
```go
|
||||
package myasyncplugin
|
||||
|
||||
import (
|
||||
"pansou/model"
|
||||
"pansou/plugin"
|
||||
)
|
||||
|
||||
// MyAsyncPlugin 自定义异步插件
|
||||
type MyAsyncPlugin struct {
|
||||
*plugin.BaseAsyncPlugin
|
||||
}
|
||||
|
||||
// 创建并注册插件
|
||||
func init() {
|
||||
plugin.RegisterGlobalPlugin(NewMyAsyncPlugin())
|
||||
}
|
||||
|
||||
// NewMyAsyncPlugin 创建异步插件
|
||||
func NewMyAsyncPlugin() *MyAsyncPlugin {
|
||||
return &MyAsyncPlugin{
|
||||
BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("myasyncplugin", 4),
|
||||
}
|
||||
}
|
||||
|
||||
// Search 实现搜索接口
|
||||
func (p *MyAsyncPlugin) Search(keyword string) ([]model.SearchResult, error) {
|
||||
// 生成缓存键
|
||||
cacheKey := generateCacheKey(keyword)
|
||||
|
||||
// 使用异步搜索
|
||||
return p.BaseAsyncPlugin.AsyncSearch(keyword, cacheKey, p.doSearch)
|
||||
}
|
||||
|
||||
// doSearch 执行实际搜索
|
||||
func (p *MyAsyncPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) {
|
||||
// 实现搜索逻辑
|
||||
// ...
|
||||
return results, nil
|
||||
}
|
||||
```
|
||||
905
docs/5-缓存系统设计.md
905
docs/5-缓存系统设计.md
File diff suppressed because it is too large
Load Diff
@@ -14,6 +14,7 @@ import (
|
||||
"pansou/util"
|
||||
"pansou/util/cache"
|
||||
"pansou/util/pool"
|
||||
"sync" // Added for sync.WaitGroup
|
||||
)
|
||||
|
||||
// 优先关键词列表
|
||||
@@ -22,6 +23,7 @@ var priorityKeywords = []string{"合集", "系列", "全", "完", "最新", "附
|
||||
// 全局缓存实例和缓存是否初始化标志
|
||||
var (
|
||||
twoLevelCache *cache.TwoLevelCache
|
||||
enhancedTwoLevelCache *cache.EnhancedTwoLevelCache
|
||||
cacheInitialized bool
|
||||
)
|
||||
|
||||
@@ -29,6 +31,14 @@ var (
|
||||
func init() {
|
||||
if config.AppConfig != nil && config.AppConfig.CacheEnabled {
|
||||
var err error
|
||||
// 优先使用增强版缓存
|
||||
enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache()
|
||||
if err == nil {
|
||||
cacheInitialized = true
|
||||
return
|
||||
}
|
||||
|
||||
// 如果增强版缓存初始化失败,回退到原始缓存
|
||||
twoLevelCache, err = cache.NewTwoLevelCache()
|
||||
if err == nil {
|
||||
cacheInitialized = true
|
||||
@@ -46,11 +56,18 @@ func NewSearchService(pluginManager *plugin.PluginManager) *SearchService {
|
||||
// 检查缓存是否已初始化,如果未初始化则尝试重新初始化
|
||||
if !cacheInitialized && config.AppConfig != nil && config.AppConfig.CacheEnabled {
|
||||
var err error
|
||||
// 优先使用增强版缓存
|
||||
enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache()
|
||||
if err == nil {
|
||||
cacheInitialized = true
|
||||
} else {
|
||||
// 如果增强版缓存初始化失败,回退到原始缓存
|
||||
twoLevelCache, err = cache.NewTwoLevelCache()
|
||||
if err == nil {
|
||||
cacheInitialized = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &SearchService{
|
||||
pluginManager: pluginManager,
|
||||
@@ -128,130 +145,46 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in
|
||||
}
|
||||
}
|
||||
|
||||
// 立即生成缓存键并检查缓存
|
||||
cacheKey := cache.GenerateCacheKey(keyword, channels, sourceType, plugins)
|
||||
// 并行获取TG搜索和插件搜索结果
|
||||
var tgResults []model.SearchResult
|
||||
var pluginResults []model.SearchResult
|
||||
|
||||
// 如果未启用强制刷新,尝试从缓存获取结果
|
||||
if !forceRefresh && twoLevelCache != nil && config.AppConfig.CacheEnabled {
|
||||
data, hit, err := twoLevelCache.Get(cacheKey)
|
||||
var wg sync.WaitGroup
|
||||
var tgErr, pluginErr error
|
||||
|
||||
if err == nil && hit {
|
||||
var response model.SearchResponse
|
||||
if err := cache.DeserializeWithPool(data, &response); err == nil {
|
||||
// 根据resultType过滤返回结果
|
||||
return filterResponseByType(response, resultType), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 获取所有可用插件
|
||||
var availablePlugins []plugin.SearchPlugin
|
||||
if s.pluginManager != nil && (sourceType == "all" || sourceType == "plugin") {
|
||||
allPlugins := s.pluginManager.GetPlugins()
|
||||
|
||||
// 确保plugins不为nil并且有非空元素
|
||||
hasPlugins := plugins != nil && len(plugins) > 0
|
||||
hasNonEmptyPlugin := false
|
||||
|
||||
if hasPlugins {
|
||||
for _, p := range plugins {
|
||||
if p != "" {
|
||||
hasNonEmptyPlugin = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 只有当plugins数组包含非空元素时才进行过滤
|
||||
if hasPlugins && hasNonEmptyPlugin {
|
||||
pluginMap := make(map[string]bool)
|
||||
for _, p := range plugins {
|
||||
if p != "" { // 忽略空字符串
|
||||
pluginMap[strings.ToLower(p)] = true
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range allPlugins {
|
||||
if pluginMap[strings.ToLower(p.Name())] {
|
||||
availablePlugins = append(availablePlugins, p)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果plugins为nil、空数组或只包含空字符串,视为未指定,使用所有插件
|
||||
availablePlugins = allPlugins
|
||||
}
|
||||
}
|
||||
|
||||
// 控制并发数:如果用户没有指定有效值,则默认使用"频道数+插件数+10"的并发数
|
||||
pluginCount := len(availablePlugins)
|
||||
|
||||
// 根据sourceType决定是否搜索Telegram频道
|
||||
channelCount := 0
|
||||
// 如果需要搜索TG
|
||||
if sourceType == "all" || sourceType == "tg" {
|
||||
channelCount = len(channels)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
tgResults, tgErr = s.searchTG(keyword, channels, forceRefresh)
|
||||
}()
|
||||
}
|
||||
|
||||
if concurrency <= 0 {
|
||||
concurrency = channelCount + pluginCount + 10
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
// 如果需要搜索插件
|
||||
if sourceType == "all" || sourceType == "plugin" {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// 对于插件搜索,我们总是希望获取最新的缓存数据
|
||||
// 因此,即使forceRefresh=false,我们也需要确保获取到最新的缓存
|
||||
pluginResults, pluginErr = s.searchPlugins(keyword, plugins, forceRefresh, concurrency)
|
||||
}()
|
||||
}
|
||||
|
||||
// 计算任务总数(频道数 + 插件数)
|
||||
totalTasks := channelCount + pluginCount
|
||||
// 等待所有搜索完成
|
||||
wg.Wait()
|
||||
|
||||
// 如果没有任务要执行,返回空结果
|
||||
if totalTasks == 0 {
|
||||
return model.SearchResponse{
|
||||
Total: 0,
|
||||
Results: []model.SearchResult{},
|
||||
MergedByType: make(model.MergedLinks),
|
||||
}, nil
|
||||
// 检查错误
|
||||
if tgErr != nil {
|
||||
return model.SearchResponse{}, tgErr
|
||||
}
|
||||
if pluginErr != nil {
|
||||
return model.SearchResponse{}, pluginErr
|
||||
}
|
||||
|
||||
// 使用工作池执行并行搜索
|
||||
tasks := make([]pool.Task, 0, totalTasks)
|
||||
|
||||
// 添加频道搜索任务(如果需要)
|
||||
if sourceType == "all" || sourceType == "tg" {
|
||||
for _, channel := range channels {
|
||||
ch := channel // 创建副本,避免闭包问题
|
||||
tasks = append(tasks, func() interface{} {
|
||||
results, err := s.searchChannel(keyword, ch)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return results
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 添加插件搜索任务(如果需要)
|
||||
for _, p := range availablePlugins {
|
||||
plugin := p // 创建副本,避免闭包问题
|
||||
tasks = append(tasks, func() interface{} {
|
||||
results, err := plugin.Search(keyword)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return results
|
||||
})
|
||||
}
|
||||
|
||||
// 使用带超时控制的工作池执行所有任务并获取结果
|
||||
results := pool.ExecuteBatchWithTimeout(tasks, concurrency, config.AppConfig.PluginTimeout)
|
||||
|
||||
// 预估每个任务平均返回22个结果
|
||||
allResults := make([]model.SearchResult, 0, totalTasks*22)
|
||||
|
||||
// 合并所有结果
|
||||
for _, result := range results {
|
||||
if result != nil {
|
||||
channelResults := result.([]model.SearchResult)
|
||||
allResults = append(allResults, channelResults...)
|
||||
}
|
||||
}
|
||||
// 合并结果
|
||||
allResults := mergeSearchResults(tgResults, pluginResults)
|
||||
|
||||
// 过滤结果,确保标题包含搜索关键词
|
||||
filteredResults := filterResultsByKeyword(allResults, keyword)
|
||||
@@ -290,19 +223,6 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in
|
||||
MergedByType: mergedLinks,
|
||||
}
|
||||
|
||||
// 异步缓存搜索结果(缓存完整结果,以便后续可以根据不同resultType过滤)
|
||||
if twoLevelCache != nil && config.AppConfig.CacheEnabled {
|
||||
go func(resp model.SearchResponse) {
|
||||
data, err := cache.SerializeWithPool(resp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute
|
||||
twoLevelCache.Set(cacheKey, data, ttl)
|
||||
}(response)
|
||||
}
|
||||
|
||||
// 根据resultType过滤返回结果
|
||||
return filterResponseByType(response, resultType), nil
|
||||
}
|
||||
@@ -582,6 +502,291 @@ func mergeResultsByType(results []model.SearchResult) model.MergedLinks {
|
||||
return mergedLinks
|
||||
}
|
||||
|
||||
// searchTG 搜索TG频道
|
||||
func (s *SearchService) searchTG(keyword string, channels []string, forceRefresh bool) ([]model.SearchResult, error) {
|
||||
// 生成缓存键
|
||||
cacheKey := cache.GenerateTGCacheKey(keyword, channels)
|
||||
|
||||
// 如果未启用强制刷新,尝试从缓存获取结果
|
||||
if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled {
|
||||
var data []byte
|
||||
var hit bool
|
||||
var err error
|
||||
|
||||
// 优先使用增强版缓存
|
||||
if enhancedTwoLevelCache != nil {
|
||||
data, hit, err = enhancedTwoLevelCache.Get(cacheKey)
|
||||
|
||||
if err == nil && hit {
|
||||
var results []model.SearchResult
|
||||
if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil {
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
} else if twoLevelCache != nil {
|
||||
data, hit, err = twoLevelCache.Get(cacheKey)
|
||||
|
||||
if err == nil && hit {
|
||||
var results []model.SearchResult
|
||||
if err := cache.DeserializeWithPool(data, &results); err == nil {
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 缓存未命中,执行实际搜索
|
||||
var results []model.SearchResult
|
||||
|
||||
// 使用工作池并行搜索多个频道
|
||||
tasks := make([]pool.Task, 0, len(channels))
|
||||
|
||||
for _, channel := range channels {
|
||||
ch := channel // 创建副本,避免闭包问题
|
||||
tasks = append(tasks, func() interface{} {
|
||||
results, err := s.searchChannel(keyword, ch)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return results
|
||||
})
|
||||
}
|
||||
|
||||
// 执行搜索任务并获取结果
|
||||
taskResults := pool.ExecuteBatchWithTimeout(tasks, len(channels), config.AppConfig.PluginTimeout)
|
||||
|
||||
// 合并所有频道的结果
|
||||
for _, result := range taskResults {
|
||||
if result != nil {
|
||||
channelResults := result.([]model.SearchResult)
|
||||
results = append(results, channelResults...)
|
||||
}
|
||||
}
|
||||
|
||||
// 异步缓存结果
|
||||
if cacheInitialized && config.AppConfig.CacheEnabled {
|
||||
go func(res []model.SearchResult) {
|
||||
ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute
|
||||
|
||||
// 优先使用增强版缓存
|
||||
if enhancedTwoLevelCache != nil {
|
||||
data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
enhancedTwoLevelCache.Set(cacheKey, data, ttl)
|
||||
} else if twoLevelCache != nil {
|
||||
data, err := cache.SerializeWithPool(res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
twoLevelCache.Set(cacheKey, data, ttl)
|
||||
}
|
||||
}(results)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// searchPlugins 搜索插件
|
||||
func (s *SearchService) searchPlugins(keyword string, plugins []string, forceRefresh bool, concurrency int) ([]model.SearchResult, error) {
|
||||
// 生成缓存键
|
||||
cacheKey := cache.GeneratePluginCacheKey(keyword, plugins)
|
||||
|
||||
// 如果未启用强制刷新,尝试从缓存获取结果
|
||||
if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled {
|
||||
var data []byte
|
||||
var hit bool
|
||||
var err error
|
||||
|
||||
// 优先使用增强版缓存
|
||||
if enhancedTwoLevelCache != nil {
|
||||
data, hit, err = enhancedTwoLevelCache.Get(cacheKey)
|
||||
|
||||
if err == nil && hit {
|
||||
var results []model.SearchResult
|
||||
if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil {
|
||||
// 确保缓存数据是最新的
|
||||
// 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回
|
||||
// 否则,我们将重新执行搜索以获取最新数据
|
||||
if len(results) > 0 {
|
||||
// 获取当前时间
|
||||
now := time.Now()
|
||||
// 检查缓存数据是否是最近更新的
|
||||
// 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的
|
||||
for _, result := range results {
|
||||
if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second {
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果缓存中没有数据,直接返回空结果
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if twoLevelCache != nil {
|
||||
data, hit, err = twoLevelCache.Get(cacheKey)
|
||||
|
||||
if err == nil && hit {
|
||||
var results []model.SearchResult
|
||||
if err := cache.DeserializeWithPool(data, &results); err == nil {
|
||||
// 确保缓存数据是最新的
|
||||
// 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回
|
||||
// 否则,我们将重新执行搜索以获取最新数据
|
||||
if len(results) > 0 {
|
||||
// 获取当前时间
|
||||
now := time.Now()
|
||||
// 检查缓存数据是否是最近更新的
|
||||
// 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的
|
||||
for _, result := range results {
|
||||
if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second {
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果缓存中没有数据,直接返回空结果
|
||||
return results, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 缓存未命中或缓存数据不是最新的,执行实际搜索
|
||||
// 获取所有可用插件
|
||||
var availablePlugins []plugin.SearchPlugin
|
||||
if s.pluginManager != nil {
|
||||
allPlugins := s.pluginManager.GetPlugins()
|
||||
|
||||
// 确保plugins不为nil并且有非空元素
|
||||
hasPlugins := plugins != nil && len(plugins) > 0
|
||||
hasNonEmptyPlugin := false
|
||||
|
||||
if hasPlugins {
|
||||
for _, p := range plugins {
|
||||
if p != "" {
|
||||
hasNonEmptyPlugin = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 只有当plugins数组包含非空元素时才进行过滤
|
||||
if hasPlugins && hasNonEmptyPlugin {
|
||||
pluginMap := make(map[string]bool)
|
||||
for _, p := range plugins {
|
||||
if p != "" { // 忽略空字符串
|
||||
pluginMap[strings.ToLower(p)] = true
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range allPlugins {
|
||||
if pluginMap[strings.ToLower(p.Name())] {
|
||||
availablePlugins = append(availablePlugins, p)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果plugins为nil、空数组或只包含空字符串,视为未指定,使用所有插件
|
||||
availablePlugins = allPlugins
|
||||
}
|
||||
}
|
||||
|
||||
// 控制并发数
|
||||
if concurrency <= 0 {
|
||||
concurrency = len(availablePlugins) + 10
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
}
|
||||
|
||||
// 使用工作池执行并行搜索
|
||||
tasks := make([]pool.Task, 0, len(availablePlugins))
|
||||
for _, p := range availablePlugins {
|
||||
plugin := p // 创建副本,避免闭包问题
|
||||
tasks = append(tasks, func() interface{} {
|
||||
results, err := plugin.Search(keyword)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return results
|
||||
})
|
||||
}
|
||||
|
||||
// 执行搜索任务并获取结果
|
||||
results := pool.ExecuteBatchWithTimeout(tasks, concurrency, config.AppConfig.PluginTimeout)
|
||||
|
||||
// 合并所有插件的结果
|
||||
var allResults []model.SearchResult
|
||||
for _, result := range results {
|
||||
if result != nil {
|
||||
pluginResults := result.([]model.SearchResult)
|
||||
allResults = append(allResults, pluginResults...)
|
||||
}
|
||||
}
|
||||
|
||||
// 异步缓存结果
|
||||
if cacheInitialized && config.AppConfig.CacheEnabled {
|
||||
go func(res []model.SearchResult) {
|
||||
ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute
|
||||
|
||||
// 优先使用增强版缓存
|
||||
if enhancedTwoLevelCache != nil {
|
||||
data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
enhancedTwoLevelCache.Set(cacheKey, data, ttl)
|
||||
} else if twoLevelCache != nil {
|
||||
data, err := cache.SerializeWithPool(res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
twoLevelCache.Set(cacheKey, data, ttl)
|
||||
}
|
||||
}(allResults)
|
||||
}
|
||||
|
||||
return allResults, nil
|
||||
}
|
||||
|
||||
// 合并搜索结果
|
||||
func mergeSearchResults(tgResults, pluginResults []model.SearchResult) []model.SearchResult {
|
||||
// 预估合并后的结果数量
|
||||
totalSize := len(tgResults) + len(pluginResults)
|
||||
if totalSize == 0 {
|
||||
return []model.SearchResult{}
|
||||
}
|
||||
|
||||
// 创建结果映射,用于去重
|
||||
resultMap := make(map[string]model.SearchResult, totalSize)
|
||||
|
||||
// 添加TG搜索结果
|
||||
for _, result := range tgResults {
|
||||
resultMap[result.UniqueID] = result
|
||||
}
|
||||
|
||||
// 添加或更新插件搜索结果(如果有重复,保留较新的)
|
||||
for _, result := range pluginResults {
|
||||
if existing, ok := resultMap[result.UniqueID]; ok {
|
||||
// 如果已存在,保留较新的
|
||||
if result.Datetime.After(existing.Datetime) {
|
||||
resultMap[result.UniqueID] = result
|
||||
}
|
||||
} else {
|
||||
resultMap[result.UniqueID] = result
|
||||
}
|
||||
}
|
||||
|
||||
// 转换回切片
|
||||
mergedResults := make([]model.SearchResult, 0, len(resultMap))
|
||||
for _, result := range resultMap {
|
||||
mergedResults = append(mergedResults, result)
|
||||
}
|
||||
|
||||
return mergedResults
|
||||
}
|
||||
|
||||
// GetPluginManager 获取插件管理器
|
||||
func (s *SearchService) GetPluginManager() *plugin.PluginManager {
|
||||
return s.pluginManager
|
||||
|
||||
28
util/cache/cache_key.go
vendored
28
util/cache/cache_key.go
vendored
@@ -72,6 +72,34 @@ func init() {
|
||||
precomputedHashes.Store("all_channels", allChannelsHash)
|
||||
}
|
||||
|
||||
// GenerateTGCacheKey 为TG搜索生成缓存键
|
||||
func GenerateTGCacheKey(keyword string, channels []string) string {
|
||||
// 关键词标准化
|
||||
normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword))
|
||||
|
||||
// 获取频道列表哈希
|
||||
channelsHash := getChannelsHash(channels)
|
||||
|
||||
// 生成TG搜索特定的缓存键
|
||||
keyStr := fmt.Sprintf("tg:%s:%s", normalizedKeyword, channelsHash)
|
||||
hash := md5.Sum([]byte(keyStr))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// GeneratePluginCacheKey 为插件搜索生成缓存键
|
||||
func GeneratePluginCacheKey(keyword string, plugins []string) string {
|
||||
// 关键词标准化
|
||||
normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword))
|
||||
|
||||
// 获取插件列表哈希
|
||||
pluginsHash := getPluginsHash(plugins)
|
||||
|
||||
// 生成插件搜索特定的缓存键
|
||||
keyStr := fmt.Sprintf("plugin:%s:%s", normalizedKeyword, pluginsHash)
|
||||
hash := md5.Sum([]byte(keyStr))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// GenerateCacheKey 根据所有影响搜索结果的参数生成缓存键
|
||||
func GenerateCacheKey(keyword string, channels []string, sourceType string, plugins []string) string {
|
||||
// 关键词标准化
|
||||
|
||||
119
util/cache/enhanced_two_level_cache.go
vendored
Normal file
119
util/cache/enhanced_two_level_cache.go
vendored
Normal file
@@ -0,0 +1,119 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"pansou/config"
|
||||
)
|
||||
|
||||
// EnhancedTwoLevelCache 改进的两级缓存
|
||||
type EnhancedTwoLevelCache struct {
|
||||
memory *MemoryCache
|
||||
disk *ShardedDiskCache
|
||||
mutex sync.RWMutex
|
||||
serializer Serializer
|
||||
}
|
||||
|
||||
// NewEnhancedTwoLevelCache 创建新的改进两级缓存
|
||||
func NewEnhancedTwoLevelCache() (*EnhancedTwoLevelCache, error) {
|
||||
// 内存缓存大小为磁盘缓存的60%
|
||||
memCacheMaxItems := 5000
|
||||
memCacheSizeMB := config.AppConfig.CacheMaxSizeMB * 3 / 5
|
||||
|
||||
memCache := NewMemoryCache(memCacheMaxItems, memCacheSizeMB)
|
||||
memCache.StartCleanupTask()
|
||||
|
||||
// 创建分片磁盘缓存,默认使用8个分片
|
||||
shardCount := 8
|
||||
diskCache, err := NewShardedDiskCache(config.AppConfig.CachePath, shardCount, config.AppConfig.CacheMaxSizeMB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 创建序列化器
|
||||
serializer := NewGobSerializer()
|
||||
|
||||
return &EnhancedTwoLevelCache{
|
||||
memory: memCache,
|
||||
disk: diskCache,
|
||||
serializer: serializer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Set 设置缓存
|
||||
func (c *EnhancedTwoLevelCache) Set(key string, data []byte, ttl time.Duration) error {
|
||||
// 先设置内存缓存(这是快速操作,直接在当前goroutine中执行)
|
||||
c.memory.Set(key, data, ttl)
|
||||
|
||||
// 异步设置磁盘缓存(这是IO操作,可能较慢)
|
||||
go func(k string, d []byte, t time.Duration) {
|
||||
// 使用独立的goroutine写入磁盘,避免阻塞调用者
|
||||
_ = c.disk.Set(k, d, t)
|
||||
}(key, data, ttl)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get 获取缓存
|
||||
func (c *EnhancedTwoLevelCache) Get(key string) ([]byte, bool, error) {
|
||||
// 优先检查内存缓存
|
||||
if data, found := c.memory.Get(key); found {
|
||||
return data, true, nil
|
||||
}
|
||||
|
||||
// 内存未命中,检查磁盘缓存
|
||||
data, found, err := c.disk.Get(key)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if found {
|
||||
// 磁盘命中,更新内存缓存
|
||||
ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute
|
||||
c.memory.Set(key, data, ttl)
|
||||
return data, true, nil
|
||||
}
|
||||
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// Delete 删除缓存
|
||||
func (c *EnhancedTwoLevelCache) Delete(key string) error {
|
||||
// 从内存缓存删除
|
||||
c.memory.mutex.Lock()
|
||||
if item, exists := c.memory.items[key]; exists {
|
||||
c.memory.currSize -= int64(item.size)
|
||||
delete(c.memory.items, key)
|
||||
}
|
||||
c.memory.mutex.Unlock()
|
||||
|
||||
// 从磁盘缓存删除
|
||||
return c.disk.Delete(key)
|
||||
}
|
||||
|
||||
// Clear 清空所有缓存
|
||||
func (c *EnhancedTwoLevelCache) Clear() error {
|
||||
// 清空内存缓存
|
||||
c.memory.mutex.Lock()
|
||||
c.memory.items = make(map[string]*memoryCacheItem)
|
||||
c.memory.currSize = 0
|
||||
c.memory.mutex.Unlock()
|
||||
|
||||
// 清空磁盘缓存
|
||||
return c.disk.Clear()
|
||||
}
|
||||
|
||||
// 设置序列化器
|
||||
func (c *EnhancedTwoLevelCache) SetSerializer(serializer Serializer) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
c.serializer = serializer
|
||||
}
|
||||
|
||||
// 获取序列化器
|
||||
func (c *EnhancedTwoLevelCache) GetSerializer() Serializer {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
return c.serializer
|
||||
}
|
||||
103
util/cache/serializer.go
vendored
Normal file
103
util/cache/serializer.go
vendored
Normal file
@@ -0,0 +1,103 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"pansou/model"
|
||||
)
|
||||
|
||||
// 初始化函数,注册model包中的类型到gob
|
||||
func init() {
|
||||
// 注册SearchResult类型
|
||||
gob.Register(model.SearchResult{})
|
||||
|
||||
// 注册SearchResponse类型
|
||||
gob.Register(model.SearchResponse{})
|
||||
|
||||
// 注册MergedLinks类型
|
||||
gob.Register(model.MergedLinks{})
|
||||
|
||||
// 注册[]model.SearchResult类型
|
||||
gob.Register([]model.SearchResult{})
|
||||
|
||||
// 注册map[string][]model.SearchResult类型
|
||||
gob.Register(map[string][]model.SearchResult{})
|
||||
|
||||
// 注册time.Time类型
|
||||
gob.Register(time.Time{})
|
||||
}
|
||||
|
||||
// Serializer 序列化接口
|
||||
type Serializer interface {
|
||||
Serialize(v interface{}) ([]byte, error)
|
||||
Deserialize(data []byte, v interface{}) error
|
||||
}
|
||||
|
||||
// GobSerializer 使用gob进行序列化/反序列化
|
||||
type GobSerializer struct {
|
||||
bufferPool sync.Pool
|
||||
}
|
||||
|
||||
// NewGobSerializer 创建新的gob序列化器
|
||||
func NewGobSerializer() *GobSerializer {
|
||||
return &GobSerializer{
|
||||
bufferPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(bytes.Buffer)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize 序列化数据
|
||||
func (s *GobSerializer) Serialize(v interface{}) ([]byte, error) {
|
||||
buf := s.bufferPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer s.bufferPool.Put(buf)
|
||||
|
||||
enc := gob.NewEncoder(buf)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]byte, buf.Len())
|
||||
copy(result, buf.Bytes())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Deserialize 反序列化数据
|
||||
func (s *GobSerializer) Deserialize(data []byte, v interface{}) error {
|
||||
buf := s.bufferPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
defer s.bufferPool.Put(buf)
|
||||
|
||||
buf.Write(data)
|
||||
dec := gob.NewDecoder(buf)
|
||||
return dec.Decode(v)
|
||||
}
|
||||
|
||||
// JSONSerializer 使用JSON进行序列化/反序列化
|
||||
// 为了保持向后兼容性
|
||||
type JSONSerializer struct {
|
||||
bufferPool *sync.Pool
|
||||
}
|
||||
|
||||
// NewJSONSerializer 创建新的JSON序列化器
|
||||
func NewJSONSerializer() *JSONSerializer {
|
||||
return &JSONSerializer{
|
||||
bufferPool: &bufferPool, // 使用已有的缓冲区池
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize 序列化数据
|
||||
func (s *JSONSerializer) Serialize(v interface{}) ([]byte, error) {
|
||||
return SerializeWithPool(v)
|
||||
}
|
||||
|
||||
// Deserialize 反序列化数据
|
||||
func (s *JSONSerializer) Deserialize(data []byte, v interface{}) error {
|
||||
return DeserializeWithPool(data, v)
|
||||
}
|
||||
94
util/cache/sharded_disk_cache.go
vendored
Normal file
94
util/cache/sharded_disk_cache.go
vendored
Normal file
@@ -0,0 +1,94 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ShardedDiskCache 分片磁盘缓存
|
||||
type ShardedDiskCache struct {
|
||||
baseDir string
|
||||
shardCount int
|
||||
shards []*DiskCache
|
||||
maxSizeMB int
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewShardedDiskCache 创建新的分片磁盘缓存
|
||||
func NewShardedDiskCache(baseDir string, shardCount, maxSizeMB int) (*ShardedDiskCache, error) {
|
||||
// 确保每个分片的大小合理
|
||||
shardSize := maxSizeMB / shardCount
|
||||
if shardSize < 1 {
|
||||
shardSize = 1
|
||||
}
|
||||
|
||||
cache := &ShardedDiskCache{
|
||||
baseDir: baseDir,
|
||||
shardCount: shardCount,
|
||||
shards: make([]*DiskCache, shardCount),
|
||||
maxSizeMB: maxSizeMB,
|
||||
}
|
||||
|
||||
// 初始化每个分片
|
||||
for i := 0; i < shardCount; i++ {
|
||||
shardPath := filepath.Join(baseDir, fmt.Sprintf("shard_%d", i))
|
||||
diskCache, err := NewDiskCache(shardPath, shardSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.shards[i] = diskCache
|
||||
}
|
||||
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
// 获取键对应的分片
|
||||
func (c *ShardedDiskCache) getShard(key string) *DiskCache {
|
||||
// 计算哈希值决定分片
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
shardIndex := int(h.Sum32()) % c.shardCount
|
||||
return c.shards[shardIndex]
|
||||
}
|
||||
|
||||
// Set 设置缓存
|
||||
func (c *ShardedDiskCache) Set(key string, data []byte, ttl time.Duration) error {
|
||||
shard := c.getShard(key)
|
||||
return shard.Set(key, data, ttl)
|
||||
}
|
||||
|
||||
// Get 获取缓存
|
||||
func (c *ShardedDiskCache) Get(key string) ([]byte, bool, error) {
|
||||
shard := c.getShard(key)
|
||||
return shard.Get(key)
|
||||
}
|
||||
|
||||
// Delete 删除缓存
|
||||
func (c *ShardedDiskCache) Delete(key string) error {
|
||||
shard := c.getShard(key)
|
||||
return shard.Delete(key)
|
||||
}
|
||||
|
||||
// Has 检查缓存是否存在
|
||||
func (c *ShardedDiskCache) Has(key string) bool {
|
||||
shard := c.getShard(key)
|
||||
return shard.Has(key)
|
||||
}
|
||||
|
||||
// Clear 清空所有缓存
|
||||
func (c *ShardedDiskCache) Clear() error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
var lastErr error
|
||||
for _, shard := range c.shards {
|
||||
if err := shard.Clear(); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
Reference in New Issue
Block a user