diff --git a/README.md b/README.md index 62dbeeb..4ff45f1 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ PanSou是一个高性能的网盘资源搜索API服务,支持TG搜索和网盘 - **网盘类型分类**:自动识别多种网盘链接,按类型归类展示 - **智能排序**:基于时间和关键词权重的多级排序策略 - **插件系统**:支持通过插件扩展搜索来源,已内置多个网盘搜索插件;支持"尽快响应,持续处理"的异步搜索模式 -- **两级缓存**:内存+磁盘缓存机制,大幅提升重复查询速度;异步插件缓存自动保存到磁盘,系统重启后自动恢复, +- **两级异步缓存**:内存+分片磁盘缓存机制,大幅提升重复查询速度和并发性能,即使在不强制刷新的情况下也能获取异步插件更新的最新缓存数据 ## 支持的网盘类型 @@ -55,6 +55,7 @@ export CACHE_ENABLED=true export CACHE_PATH="./cache" export CACHE_MAX_SIZE=100 # MB export CACHE_TTL=60 # 分钟 +export SHARD_COUNT=8 # 分片数量 # 异步插件配置 export ASYNC_PLUGIN_ENABLED=true @@ -213,6 +214,8 @@ GET /api/search?kw=速度与激情&channels=tgsearchers2,xxx&conc=2&refresh=true | CACHE_PATH | 缓存文件路径 | ./cache | | CACHE_MAX_SIZE | 最大缓存大小(MB) | 100 | | CACHE_TTL | 缓存生存时间(分钟) | 60 | +| SHARD_COUNT | 缓存分片数量 | 8 | +| SERIALIZER_TYPE | 序列化器类型(gob/json) | gob | | ENABLE_COMPRESSION | 是否启用压缩 | false | | MIN_SIZE_TO_COMPRESS | 最小压缩阈值(字节) | 1024 | | GC_PERCENT | GC触发百分比 | 100 | @@ -223,6 +226,7 @@ GET /api/search?kw=速度与激情&channels=tgsearchers2,xxx&conc=2&refresh=true | ASYNC_MAX_BACKGROUND_WORKERS | 最大后台工作者数量 | 20 | | ASYNC_MAX_BACKGROUND_TASKS | 最大后台任务数量 | 100 | | ASYNC_CACHE_TTL_HOURS | 异步缓存有效期(小时) | 1 | +| CACHE_FRESHNESS_SECONDS | 缓存数据新鲜度(秒) | 30 | ## 性能优化 @@ -230,11 +234,16 @@ PanSou 实现了多项性能优化技术: 1. **JSON处理优化**:使用 sonic 高性能 JSON 库 2. **内存优化**:预分配策略、对象池化、GC参数优化 -3. **缓存优化**:两级缓存、异步写入、优化键生成 +3. **缓存优化**: + - 增强版两级缓存(内存+分片磁盘) + - 分片磁盘缓存减少锁竞争 + - 高效序列化(Gob和JSON双支持) + - 分离的缓存键(TG搜索和插件搜索独立缓存) + - 缓存数据时间戳检查(获取最新数据) + - 异步写入(不阻塞主流程) 4. **HTTP客户端优化**:连接池、HTTP/2支持 5. **并发优化**:工作池、智能并发控制 6. **传输压缩**:支持 gzip 压缩 -7. **异步插件缓存**:持久化缓存、即时保存、优雅关闭机制 ## 异步插件系统 @@ -248,142 +257,26 @@ PanSou实现了高级异步插件系统,解决了某些搜索源响应时间 - **优雅关闭**:在程序退出前保存缓存,确保数据不丢失 - **增量更新**:智能合并新旧结果,保留有价值的数据 - **后台自动刷新**:对于接近过期的缓存,在后台自动刷新 -- **资源管理**:通过工作池控制并发任务数量,避免资源耗尽 +- **与主程序缓存协同**:通过时间戳检查机制,确保即使在不强制刷新的情况下也能获取最新缓存数据 -### 异步插件工作流程 +### 缓存系统特点 -1. **缓存检查**:首先检查是否有有效缓存 -2. **快速响应**:如果有缓存,立即返回;如果缓存接近过期,在后台刷新 -3. **双通道处理**:如果没有缓存,启动快速响应通道和后台处理通道 -4. **超时控制**:在响应超时时返回当前结果(可能为空),后台继续处理 -5. **缓存更新**:后台处理完成后更新缓存,供后续查询使用 +1. **分片磁盘缓存**: + - 将缓存数据分散到多个子目录,减少锁竞争 + - 通过哈希算法将缓存键均匀分布到不同分片 + - 提高高并发场景下的性能 -## 插件系统 +2. **序列化器接口**: + - 统一的序列化和反序列化操作 + - 支持Gob和JSON双序列化方式 + - Gob序列化提供更高性能和更小的结果大小 -PanSou 实现了灵活的插件系统,允许轻松扩展搜索来源 +3. **缓存数据时间戳检查**: + - 检查缓存数据是否是最新的(30秒内更新) + - 确保获取异步插件在后台更新的最新缓存数据 + - 在不强制刷新的情况下也能获取最新数据 -详情参考[插件开发指南.md](docs/插件开发指南.md) - -### 插件特性 - -- **自动注册机制**:插件通过init函数自动注册,无需修改主程序代码 -- **统一接口**:所有插件实现相同的SearchPlugin接口 -- **双层超时控制**:插件内部使用自定义超时时间,系统外部提供强制超时保障 -- **并发执行**:插件搜索与频道搜索并发执行,提高整体性能 -- **结果标准化**:插件返回标准化的搜索结果,便于统一处理 -- **异步处理**:支持异步插件,实现"尽快响应,持续处理"的模式 - -### 开发自定义插件 - -1. 创建新的插件包: - -```go -package myplugin - -import ( - "pansou/model" - "pansou/plugin" -) - -// 在init函数中注册插件 -func init() { - plugin.RegisterGlobalPlugin(NewMyPlugin()) -} - -// MyPlugin 自定义插件 -type MyPlugin struct {} - -// NewMyPlugin 创建新的插件实例 -func NewMyPlugin() *MyPlugin { - return &MyPlugin{} -} - -// Name 返回插件名称 -func (p *MyPlugin) Name() string { - return "myplugin" -} - -// Priority 返回插件优先级 -func (p *MyPlugin) Priority() int { - return 3 // 中等优先级 -} - -// Search 执行搜索并返回结果 -func (p *MyPlugin) Search(keyword string) ([]model.SearchResult, error) { - // 实现搜索逻辑 - // ... - - return results, nil -} -``` - -2. 在main.go中导入插件包: - -```go -import ( - // 导入插件包以触发init函数 - _ "pansou/plugin/myplugin" -) -``` - -## 附录 - -### TG频道 - -``` -"channels": ["tgsearchers2","SharePanBaidu", "yunpanxunlei", "tianyifc", "BaiduCloudDisk", "txtyzy", "peccxinpd", "gotopan", "xingqiump4", "yunpanqk", "PanjClub", "kkxlzy", "baicaoZY", "MCPH01", "share_aliyun", "pan115_share", "bdwpzhpd", "ysxb48", "pankuake_share", "jdjdn1111", "yggpan", "yunpanall", "MCPH086", "zaihuayun", "Q66Share", "NewAliPan", "Oscar_4Kmovies", "ucwpzy", "alyp_TV", "alyp_4K_Movies", "shareAliyun", "alyp_1", "yunpanpan", "hao115", "yunpanshare", "dianyingshare", "Quark_Movies", "XiangxiuNB", "NewQuark", "ydypzyfx", "kuakeyun", "ucquark", "xx123pan", "yingshifenxiang123", "zyfb123", "pan123pan", "tyypzhpd", "tianyirigeng", "cloudtianyi", "hdhhd21", "Lsp115", "oneonefivewpfx", "Maidanglaocom", "qixingzhenren", "taoxgzy", "tgsearchers115", "Channel_Shares_115", "tyysypzypd", "vip115hot", "wp123zy", "yunpan139", "yunpan189", "yunpanuc", "yydf_hzl", "alyp_Animation", "alyp_JLP","leoziyuan"] -``` - -### 配置参考 - -supervisor配置参考 - -``` -[program:pansou] -environment=PORT=9999,CHANNELS="SharePanBaidu,yunpanxunlei,tianyifc,BaiduCloudDisk,txtyzy,peccxinpd,gotopan,xingqiump4,yunpanqk,PanjClub,kkxlzy,baicaoZY,MCPH01,share_aliyun,pan115_share,bdwpzhpd,ysxb48,pankuake_share,jdjdn1111,yggpan,yunpanall,MCPH086,zaihuayun,Q66Share,NewAliPan,Oscar_4Kmovies,ucwpzy,alyp_TV,alyp_4K_Movies,shareAliyun,alyp_1,yunpanpan,hao115,yunpanshare,dianyingshare,Quark_Movies,XiangxiuNB,NewQuark,ydypzyfx,kuakeyun,ucquark,xx123pan,yingshifenxiang123,zyfb123,pan123pan,tyypzhpd,tianyirigeng,cloudtianyi,hdhhd21,Lsp115,oneonefivewpfx,Maidanglaocom,qixingzhenren,taoxgzy,tgsearchers115,Channel_Shares_115,tyysypzypd,vip115hot,wp123zy,yunpan139,yunpan189,yunpanuc,yydf_hzl,alyp_Animation,alyp_JLP,tgsearchers2,leoziyuan" -command=/home/work/pansou/pansou -directory=/home/work/pansou -autostart=true -autorestart=true -startsecs=5 -startretries=3 -exitcodes=0 -stopwaitsecs=10 -stopasgroup=true -killasgroup=true -``` - -nginx配置参考 - -``` -server { - listen 80; - server_name pansou.252035.xyz; - - # 将 HTTP 重定向到 HTTPS - return 301 https://$host$request_uri; -} - -server { - listen 443 ssl http2; # 添加 http2 - server_name pansou.252035.xyz; - - # 证书和密钥路径 - ssl_certificate /etc/letsencrypt/live/252035.xyz/fullchain.pem; - ssl_certificate_key /etc/letsencrypt/live/252035.xyz/privkey.pem; - - # 增强 SSL 安全性 - ssl_protocols TLSv1.2 TLSv1.3; - ssl_ciphers EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH; - ssl_prefer_server_ciphers on; - - # 后端代理 - location / { - proxy_pass http://127.0.0.1:9999; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - } -} -``` \ No newline at end of file +4. **分离的缓存键**: + - TG搜索和插件搜索使用独立的缓存键 + - 实现独立更新,互不影响 + - 提高缓存命中率和更新效率 \ No newline at end of file diff --git a/docs/1-项目总体架构设计.md b/docs/1-项目总体架构设计.md index cd84880..3845754 100644 --- a/docs/1-项目总体架构设计.md +++ b/docs/1-项目总体架构设计.md @@ -10,15 +10,17 @@ PanSou是一个高性能的网盘资源搜索API服务,支持Telegram搜索和 - **高性能并发**:通过工作池实现高效并发搜索 - **智能排序**:基于时间和关键词权重的多级排序策略 - **网盘类型分类**:自动识别多种网盘链接,按类型归类展示 -- **两级缓存**:内存+磁盘缓存机制,大幅提升重复查询速度 +- **增强版两级缓存**:内存+分片磁盘缓存机制,大幅提升重复查询速度 - **插件系统**:支持通过插件扩展搜索来源 - **缓存键一致性**:优化的缓存键生成算法,确保相同语义查询的缓存命中 +- **异步插件缓存更新**:支持在不强制刷新的情况下获取异步插件的最新缓存数据 ### 1.2 技术栈 - **编程语言**:Go - **Web框架**:Gin -- **缓存**:自定义两级缓存(内存+磁盘) +- **缓存**:自定义增强版两级缓存(内存+分片磁盘) +- **序列化**:Gob和JSON双序列化支持 - **JSON处理**:sonic(高性能JSON库) - **并发控制**:工作池模式 - **HTTP客户端**:自定义优化的HTTP客户端 @@ -65,8 +67,9 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: - **搜索服务**:整合插件和Telegram搜索结果 - **结果处理**:过滤、排序和分类搜索结果 -- **缓存管理**:管理搜索结果的缓存策略 +- **缓存管理**:管理搜索结果的缓存策略,支持TG和插件搜索的独立缓存 - **缓存键生成**:基于所有影响结果的参数生成一致的缓存键 +- **缓存更新检测**:检查缓存数据是否是最新的,支持获取异步更新的缓存数据 #### 2.2.3 插件系统 @@ -74,10 +77,12 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: - **插件管理**:管理插件的注册、获取和调用 - **自动注册**:通过init函数实现插件自动注册 - **高性能JSON处理**:使用sonic库优化JSON序列化/反序列化 +- **异步插件**:支持"尽快响应,持续处理"的异步搜索模式 #### 2.2.4 工具层 -- **缓存工具**:两级缓存实现(内存+磁盘) +- **缓存工具**:增强版两级缓存实现(内存+分片磁盘) +- **序列化器**:支持Gob和JSON双序列化方式 - **HTTP客户端**:优化的HTTP客户端,支持代理 - **工作池**:并发任务执行的工作池 - **JSON工具**:高性能JSON处理工具 @@ -86,9 +91,9 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: 1. **请求接收**:API层接收搜索请求 2. **参数处理**:解析、验证和规范化请求参数 -3. **缓存键生成**:基于所有影响结果的参数生成一致的缓存键 -4. **缓存检查**:检查是否有缓存结果 -5. **并发搜索**:如无缓存,并发执行搜索任务 +3. **缓存键生成**:分别为TG搜索和插件搜索生成独立的缓存键 +4. **缓存检查**:检查是否有缓存结果,并验证缓存数据是否是最新的 +5. **并发搜索**:如无缓存或缓存数据不是最新的,并发执行搜索任务 6. **结果处理**:过滤、排序和分类搜索结果 7. **缓存存储**:将结果存入缓存 8. **响应返回**:返回处理后的结果 @@ -98,9 +103,10 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: ### 3.1 高性能设计 - **并发搜索**:使用工作池模式实现高效并发 -- **两级缓存**:内存缓存提供快速访问,磁盘缓存提供持久存储 +- **增强版两级缓存**:内存缓存提供快速访问,分片磁盘缓存提供持久存储和高并发支持 - **异步操作**:非关键路径使用异步处理 - **内存优化**:预分配策略、对象池化、GC参数优化 +- **高效序列化**:使用Gob序列化提高性能,保留JSON序列化兼容性 - **高效JSON处理**:使用sonic库替代标准库,提升序列化性能 ### 3.2 可扩展性设计 @@ -108,6 +114,7 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: - **插件系统**:通过统一接口和自动注册机制实现可扩展 - **模块化**:清晰的模块边界和职责划分 - **配置驱动**:通过环境变量实现灵活配置 +- **序列化器接口**:通过统一接口支持多种序列化方式 ### 3.3 可靠性设计 @@ -115,6 +122,7 @@ PanSou采用模块化的分层架构设计,主要包括以下几个层次: - **错误处理**:全面的错误捕获和处理 - **优雅降级**:单个搜索源失败不影响整体结果 - **缓存一致性**:确保相同语义的查询使用相同的缓存键 +- **双缓存机制**:优先使用增强版缓存,失败时回退到原始缓存 ## 4. 代码组织结构 @@ -139,6 +147,7 @@ pansou/ │ └── response.go # 响应模型 ├── plugin/ # 插件系统 │ ├── plugin.go # 插件接口和管理 +│ ├── baseasyncplugin.go # 异步插件基类实现 │ ├── jikepan/ # 即刻盘插件 │ ├── hunhepan/ # 混合盘插件 │ ├── pansearch/ # 盘搜插件 @@ -150,11 +159,12 @@ pansou/ ├── util/ # 工具层 │ ├── cache/ # 缓存工具 │ │ ├── cache_key.go # 优化的缓存键生成 -│ │ ├── cache_key_test.go # 缓存键生成测试 │ │ ├── disk_cache.go # 磁盘缓存 +│ │ ├── sharded_disk_cache.go # 分片磁盘缓存 │ │ ├── two_level_cache.go # 两级缓存 -│ │ ├── utils.go # 缓存工具函数 -│ │ └── utils_test.go # 缓存工具测试 +│ │ ├── enhanced_two_level_cache.go # 增强版两级缓存 +│ │ ├── serializer.go # 序列化器接口 +│ │ └── utils.go # 缓存工具函数 │ ├── compression.go # 压缩工具 │ ├── convert.go # 类型转换工具 │ ├── http_util.go # HTTP工具函数 diff --git a/docs/3-服务层设计.md b/docs/3-服务层设计.md index 17a4def..913f25f 100644 --- a/docs/3-服务层设计.md +++ b/docs/3-服务层设计.md @@ -20,8 +20,9 @@ pansou/service/ ```go // 全局缓存实例和缓存是否初始化标志 var ( - twoLevelCache *cache.TwoLevelCache - cacheInitialized bool + twoLevelCache *cache.TwoLevelCache + enhancedTwoLevelCache *cache.EnhancedTwoLevelCache + cacheInitialized bool ) // 优先关键词列表 @@ -31,6 +32,14 @@ var priorityKeywords = []string{"全", "合集", "系列", "完", "最新", "附 func init() { if config.AppConfig != nil && config.AppConfig.CacheEnabled { var err error + // 优先使用增强版缓存 + enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache() + if err == nil { + cacheInitialized = true + return + } + + // 如果增强版缓存初始化失败,回退到原始缓存 twoLevelCache, err = cache.NewTwoLevelCache() if err == nil { cacheInitialized = true @@ -48,9 +57,16 @@ func NewSearchService(pluginManager *plugin.PluginManager) *SearchService { // 检查缓存是否已初始化,如果未初始化则尝试重新初始化 if !cacheInitialized && config.AppConfig != nil && config.AppConfig.CacheEnabled { var err error - twoLevelCache, err = cache.NewTwoLevelCache() + // 优先使用增强版缓存 + enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache() if err == nil { cacheInitialized = true + } else { + // 如果增强版缓存初始化失败,回退到原始缓存 + twoLevelCache, err = cache.NewTwoLevelCache() + if err == nil { + cacheInitialized = true + } } } @@ -67,25 +83,360 @@ func NewSearchService(pluginManager *plugin.PluginManager) *SearchService { ```go // Search 执行搜索 func (s *SearchService) Search(keyword string, channels []string, concurrency int, forceRefresh bool, resultType string, sourceType string, plugins []string) (model.SearchResponse, error) { - // 立即生成缓存键并检查缓存 - cacheKey := cache.GenerateCacheKey(keyword, channels, resultType, sourceType, plugins) - - // 如果未启用强制刷新,尝试从缓存获取结果 - if !forceRefresh && twoLevelCache != nil && config.AppConfig.CacheEnabled { - data, hit, err := twoLevelCache.Get(cacheKey) - - if err == nil && hit { - var response model.SearchResponse - if err := json.Unmarshal(data, &response); err == nil { - // 根据resultType过滤返回结果 - return filterResponseByType(response, resultType), nil + // 参数预处理 + // 源类型标准化 + if sourceType == "" { + sourceType = "all" + } + + // 插件参数规范化处理 + if sourceType == "tg" { + // 对于只搜索Telegram的请求,忽略插件参数 + plugins = nil + } else if sourceType == "all" || sourceType == "plugin" { + // 检查是否为空列表或只包含空字符串 + if plugins == nil || len(plugins) == 0 { + plugins = nil + } else { + // 检查是否有非空元素 + hasNonEmpty := false + for _, p := range plugins { + if p != "" { + hasNonEmpty = true + break + } + } + + // 如果全是空字符串,视为未指定 + if !hasNonEmpty { + plugins = nil + } else { + // 检查是否包含所有插件 + allPlugins := s.pluginManager.GetPlugins() + allPluginNames := make([]string, 0, len(allPlugins)) + for _, p := range allPlugins { + allPluginNames = append(allPluginNames, strings.ToLower(p.Name())) + } + + // 创建请求的插件名称集合(忽略空字符串) + requestedPlugins := make([]string, 0, len(plugins)) + for _, p := range plugins { + if p != "" { + requestedPlugins = append(requestedPlugins, strings.ToLower(p)) + } + } + + // 如果请求的插件数量与所有插件数量相同,检查是否包含所有插件 + if len(requestedPlugins) == len(allPluginNames) { + // 创建映射以便快速查找 + pluginMap := make(map[string]bool) + for _, p := range requestedPlugins { + pluginMap[p] = true + } + + // 检查是否包含所有插件 + allIncluded := true + for _, name := range allPluginNames { + if !pluginMap[name] { + allIncluded = false + break + } + } + + // 如果包含所有插件,统一设为nil + if allIncluded { + plugins = nil + } + } } } } + // 并行获取TG搜索和插件搜索结果 + var tgResults []model.SearchResult + var pluginResults []model.SearchResult + + var wg sync.WaitGroup + var tgErr, pluginErr error + + // 如果需要搜索TG + if sourceType == "all" || sourceType == "tg" { + wg.Add(1) + go func() { + defer wg.Done() + tgResults, tgErr = s.searchTG(keyword, channels, forceRefresh) + }() + } + + // 如果需要搜索插件 + if sourceType == "all" || sourceType == "plugin" { + wg.Add(1) + go func() { + defer wg.Done() + // 对于插件搜索,我们总是希望获取最新的缓存数据 + // 因此,即使forceRefresh=false,我们也需要确保获取到最新的缓存 + pluginResults, pluginErr = s.searchPlugins(keyword, plugins, forceRefresh, concurrency) + }() + } + + // 等待所有搜索完成 + wg.Wait() + + // 检查错误 + if tgErr != nil { + return model.SearchResponse{}, tgErr + } + if pluginErr != nil { + return model.SearchResponse{}, pluginErr + } + + // 合并结果 + allResults := mergeSearchResults(tgResults, pluginResults) + + // 过滤结果,确保标题包含搜索关键词 + filteredResults := filterResultsByKeyword(allResults, keyword) + + // 按照优化后的规则排序结果 + sortResultsByTimeAndKeywords(filteredResults) + + // 过滤结果,只保留有时间的结果或包含优先关键词的结果到Results中 + filteredForResults := make([]model.SearchResult, 0, len(filteredResults)) + for _, result := range filteredResults { + // 有时间的结果或包含优先关键词的结果保留在Results中 + if !result.Datetime.IsZero() || getKeywordPriority(result.Title) > 0 { + filteredForResults = append(filteredForResults, result) + } + } + + // 合并链接按网盘类型分组(使用所有过滤后的结果) + mergedLinks := mergeResultsByType(filteredResults) + + // 构建响应 + var total int + if resultType == "merged_by_type" { + // 计算所有类型链接的总数 + total = 0 + for _, links := range mergedLinks { + total += len(links) + } + } else { + // 只计算filteredForResults的数量 + total = len(filteredForResults) + } + + response := model.SearchResponse{ + Total: total, + Results: filteredForResults, // 使用进一步过滤的结果 + MergedByType: mergedLinks, + } + + // 根据resultType过滤返回结果 + return filterResponseByType(response, resultType), nil +} +``` + +### 3.3 分离的缓存键生成 + +为了支持TG搜索和插件搜索的独立缓存,系统实现了分离的缓存键生成函数: + +```go +// 在util/cache/cache_key.go中 + +// GenerateTGCacheKey 为TG搜索生成缓存键 +func GenerateTGCacheKey(keyword string, channels []string) string { + // 关键词标准化 + normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) + + // 获取频道列表哈希 + channelsHash := getChannelsHash(channels) + + // 生成TG搜索特定的缓存键 + keyStr := fmt.Sprintf("tg:%s:%s", normalizedKeyword, channelsHash) + hash := md5.Sum([]byte(keyStr)) + return hex.EncodeToString(hash[:]) +} + +// GeneratePluginCacheKey 为插件搜索生成缓存键 +func GeneratePluginCacheKey(keyword string, plugins []string) string { + // 关键词标准化 + normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) + + // 获取插件列表哈希 + pluginsHash := getPluginsHash(plugins) + + // 生成插件搜索特定的缓存键 + keyStr := fmt.Sprintf("plugin:%s:%s", normalizedKeyword, pluginsHash) + hash := md5.Sum([]byte(keyStr)) + return hex.EncodeToString(hash[:]) +} +``` + +### 3.4 TG搜索实现 + +TG搜索方法使用独立的缓存键,实现了TG搜索结果的缓存管理: + +```go +// searchTG 搜索TG频道 +func (s *SearchService) searchTG(keyword string, channels []string, forceRefresh bool) ([]model.SearchResult, error) { + // 生成缓存键 + cacheKey := cache.GenerateTGCacheKey(keyword, channels) + + // 如果未启用强制刷新,尝试从缓存获取结果 + if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled { + var data []byte + var hit bool + var err error + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, hit, err = enhancedTwoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + return results, nil + } + } + } else if twoLevelCache != nil { + data, hit, err = twoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := cache.DeserializeWithPool(data, &results); err == nil { + return results, nil + } + } + } + } + + // 缓存未命中,执行实际搜索 + var results []model.SearchResult + + // 使用工作池并行搜索多个频道 + tasks := make([]pool.Task, 0, len(channels)) + + for _, channel := range channels { + ch := channel // 创建副本,避免闭包问题 + tasks = append(tasks, func() interface{} { + results, err := s.searchChannel(keyword, ch) + if err != nil { + return nil + } + return results + }) + } + + // 执行搜索任务并获取结果 + taskResults := pool.ExecuteBatchWithTimeout(tasks, len(channels), config.AppConfig.PluginTimeout) + + // 合并所有频道的结果 + for _, result := range taskResults { + if result != nil { + channelResults := result.([]model.SearchResult) + results = append(results, channelResults...) + } + } + + // 异步缓存结果 + if cacheInitialized && config.AppConfig.CacheEnabled { + go func(res []model.SearchResult) { + ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res) + if err != nil { + return + } + enhancedTwoLevelCache.Set(cacheKey, data, ttl) + } else if twoLevelCache != nil { + data, err := cache.SerializeWithPool(res) + if err != nil { + return + } + twoLevelCache.Set(cacheKey, data, ttl) + } + }(results) + } + + return results, nil +} +``` + +### 3.5 插件搜索实现 + +插件搜索方法使用独立的缓存键,并实现了缓存数据时间戳检查,确保即使在不强制刷新的情况下也能获取最新的缓存数据: + +```go +// searchPlugins 搜索插件 +func (s *SearchService) searchPlugins(keyword string, plugins []string, forceRefresh bool, concurrency int) ([]model.SearchResult, error) { + // 生成缓存键 + cacheKey := cache.GeneratePluginCacheKey(keyword, plugins) + + // 如果未启用强制刷新,尝试从缓存获取结果 + if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled { + var data []byte + var hit bool + var err error + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, hit, err = enhancedTwoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + // 确保缓存数据是最新的 + // 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回 + // 否则,我们将重新执行搜索以获取最新数据 + if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } + } + } else { + // 如果缓存中没有数据,直接返回空结果 + return results, nil + } + } + } + } else if twoLevelCache != nil { + data, hit, err = twoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := cache.DeserializeWithPool(data, &results); err == nil { + // 确保缓存数据是最新的 + // 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回 + // 否则,我们将重新执行搜索以获取最新数据 + if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } + } + } else { + // 如果缓存中没有数据,直接返回空结果 + return results, nil + } + } + } + } + } + + // 缓存未命中或缓存数据不是最新的,执行实际搜索 // 获取所有可用插件 var availablePlugins []plugin.SearchPlugin - if s.pluginManager != nil && (sourceType == "all" || sourceType == "plugin") { + if s.pluginManager != nil { allPlugins := s.pluginManager.GetPlugins() // 确保plugins不为nil并且有非空元素 @@ -121,52 +472,16 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in } } - // 控制并发数:如果用户没有指定有效值,则默认使用"频道数+插件数+10"的并发数 - pluginCount := len(availablePlugins) - - // 根据sourceType决定是否搜索Telegram频道 - channelCount := 0 - if sourceType == "all" || sourceType == "tg" { - channelCount = len(channels) - } - + // 控制并发数 if concurrency <= 0 { - concurrency = channelCount + pluginCount + 10 + concurrency = len(availablePlugins) + 10 if concurrency < 1 { concurrency = 1 } } - - // 计算任务总数(频道数 + 插件数) - totalTasks := channelCount + pluginCount - // 如果没有任务要执行,返回空结果 - if totalTasks == 0 { - return model.SearchResponse{ - Total: 0, - Results: []model.SearchResult{}, - MergedByType: make(model.MergedLinks), - }, nil - } - // 使用工作池执行并行搜索 - tasks := make([]pool.Task, 0, totalTasks) - - // 添加频道搜索任务(如果需要) - if sourceType == "all" || sourceType == "tg" { - for _, channel := range channels { - ch := channel // 创建副本,避免闭包问题 - tasks = append(tasks, func() interface{} { - results, err := s.searchChannel(keyword, ch) - if err != nil { - return nil - } - return results - }) - } - } - - // 添加插件搜索任务(如果需要) + tasks := make([]pool.Task, 0, len(availablePlugins)) for _, p := range availablePlugins { plugin := p // 创建副本,避免闭包问题 tasks = append(tasks, func() interface{} { @@ -178,519 +493,123 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in }) } - // 使用带超时控制的工作池执行所有任务并获取结果 + // 执行搜索任务并获取结果 results := pool.ExecuteBatchWithTimeout(tasks, concurrency, config.AppConfig.PluginTimeout) - // 预估每个任务平均返回22个结果 - allResults := make([]model.SearchResult, 0, totalTasks*22) - - // 合并所有结果 + // 合并所有插件的结果 + var allResults []model.SearchResult for _, result := range results { if result != nil { - channelResults := result.([]model.SearchResult) - allResults = append(allResults, channelResults...) + pluginResults := result.([]model.SearchResult) + allResults = append(allResults, pluginResults...) } } - // 过滤结果,确保标题包含搜索关键词 - filteredResults := filterResultsByKeyword(allResults, keyword) - - // 按照优化后的规则排序结果 - sortResultsByTimeAndKeywords(filteredResults) - - // 过滤结果,只保留有时间的结果或包含优先关键词的结果到Results中 - filteredForResults := make([]model.SearchResult, 0, len(filteredResults)) - for _, result := range filteredResults { - // 有时间的结果或包含优先关键词的结果保留在Results中 - if !result.Datetime.IsZero() || getKeywordPriority(result.Title) > 0 { - filteredForResults = append(filteredForResults, result) - } - } - - // 合并链接按网盘类型分组(使用所有过滤后的结果) - mergedLinks := mergeResultsByType(filteredResults) - - // 构建响应 - var total int - if resultType == "merged_by_type" { - // 计算所有类型链接的总数 - total = 0 - for _, links := range mergedLinks { - total += len(links) - } - } else { - // 只计算filteredForResults的数量 - total = len(filteredForResults) - } - - response := model.SearchResponse{ - Total: total, - Results: filteredForResults, // 使用进一步过滤的结果 - MergedByType: mergedLinks, - } - - // 异步缓存搜索结果(缓存完整结果,以便后续可以根据不同resultType过滤) - if twoLevelCache != nil && config.AppConfig.CacheEnabled { - go func(resp model.SearchResponse) { - data, err := json.Marshal(resp) - if err != nil { - return - } - + // 异步缓存结果 + if cacheInitialized && config.AppConfig.CacheEnabled { + go func(res []model.SearchResult) { ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute - twoLevelCache.Set(cacheKey, data, ttl) - }(response) - } - - // 根据resultType过滤返回结果 - return filterResponseByType(response, resultType), nil -} -``` - -### 3.3 缓存键生成优化 - -为了提高缓存命中率,搜索服务使用了优化的缓存键生成方法,确保相同语义的查询能够命中相同的缓存。 - -```go -// GenerateCacheKey 根据所有影响搜索结果的参数生成缓存键 -func GenerateCacheKey(keyword string, channels []string, resultType string, sourceType string, plugins []string) string { - // 关键词标准化 - normalizedKeyword := strings.TrimSpace(keyword) - - // 处理channels参数 - var channelsStr string - if channels == nil || len(channels) == 0 { - channelsStr = "all" - } else { - // 复制并排序channels,确保顺序一致性 - channelsCopy := make([]string, 0, len(channels)) - for _, ch := range channels { - if ch != "" { // 忽略空字符串 - channelsCopy = append(channelsCopy, ch) - } - } - - if len(channelsCopy) == 0 { - channelsStr = "all" - } else { - sort.Strings(channelsCopy) - channelsStr = strings.Join(channelsCopy, ",") - } - } - - // 处理resultType参数 - if resultType == "" { - resultType = "all" - } - - // 处理sourceType参数 - if sourceType == "" { - sourceType = "all" - } - - // 处理plugins参数 - var pluginsStr string - if plugins == nil || len(plugins) == 0 { - pluginsStr = "all" - } else { - // 复制并排序plugins,确保顺序一致性 - pluginsCopy := make([]string, 0, len(plugins)) - for _, p := range plugins { - if p != "" { // 忽略空字符串 - pluginsCopy = append(pluginsCopy, p) - } - } - - if len(pluginsCopy) == 0 { - pluginsStr = "all" - } else { - sort.Strings(pluginsCopy) - pluginsStr = strings.Join(pluginsCopy, ",") - } - } - - // 生成最终缓存键 - keyStr := normalizedKeyword + ":" + channelsStr + ":" + resultType + ":" + sourceType + ":" + pluginsStr - - // 计算MD5哈希 - hash := md5.Sum([]byte(keyStr)) - return hex.EncodeToString(hash[:]) -} -``` - -主要优化包括: - -1. **关键词标准化**:去除前后空格 -2. **参数排序**:对数组参数进行排序,确保不同顺序的相同参数产生相同的缓存键 -3. **空值处理**:统一处理null、空数组和只包含空字符串的数组 -4. **特殊值处理**:为空参数设置默认值,确保一致性 -5. **忽略空字符串**:在数组参数中忽略空字符串元素 - -这些优化确保了不同形式但语义相同的查询能够命中相同的缓存,显著提高了缓存命中率。 - -### 3.4 辅助方法实现 - -搜索服务包含多个辅助方法,用于处理搜索结果的过滤、排序和分类。 - -#### 3.4.1 结果过滤方法 - -```go -// filterResponseByType 根据结果类型过滤响应 -func filterResponseByType(response model.SearchResponse, resultType string) model.SearchResponse { - switch resultType { - case "results": - // 只返回Results - return model.SearchResponse{ - Total: response.Total, - Results: response.Results, - } - case "merged_by_type": - // 只返回MergedByType,Results设为nil,结合omitempty标签,JSON序列化时会忽略此字段 - return model.SearchResponse{ - Total: response.Total, - MergedByType: response.MergedByType, - Results: nil, - } - default: - // 默认返回全部 - return response - } -} - -// 过滤结果,确保标题包含搜索关键词 -func filterResultsByKeyword(results []model.SearchResult, keyword string) []model.SearchResult { - // 预估过滤后会保留80%的结果 - filteredResults := make([]model.SearchResult, 0, len(results)*8/10) - - // 将关键词转为小写,用于不区分大小写的比较 - lowerKeyword := strings.ToLower(keyword) - - // 将关键词按空格分割,用于支持多关键词搜索 - keywords := strings.Fields(lowerKeyword) - - for _, result := range results { - // 将标题和内容转为小写 - lowerTitle := strings.ToLower(result.Title) - lowerContent := strings.ToLower(result.Content) - - // 检查每个关键词是否在标题或内容中 - matched := true - for _, kw := range keywords { - // 如果关键词是"pwd",特殊处理,只要标题、内容或链接中包含即可 - if kw == "pwd" { - // 检查标题、内容 - pwdInTitle := strings.Contains(lowerTitle, kw) - pwdInContent := strings.Contains(lowerContent, kw) - - // 检查链接中是否包含pwd参数 - pwdInLinks := false - for _, link := range result.Links { - if strings.Contains(strings.ToLower(link.URL), "pwd=") { - pwdInLinks = true - break - } - } - - // 只要有一个包含pwd,就算匹配 - if pwdInTitle || pwdInContent || pwdInLinks { - continue // 匹配成功,检查下一个关键词 - } else { - matched = false - break - } - } else { - // 对于其他关键词,检查是否同时在标题和内容中 - if !strings.Contains(lowerTitle, kw) && !strings.Contains(lowerContent, kw) { - matched = false - break - } - } - } - - if matched { - filteredResults = append(filteredResults, result) - } - } - - return filteredResults -} -``` - -#### 3.4.2 结果排序方法 - -```go -// 根据时间和关键词排序结果 -func sortResultsByTimeAndKeywords(results []model.SearchResult) { - sort.Slice(results, func(i, j int) bool { - // 检查是否有零值时间 - iZeroTime := results[i].Datetime.IsZero() - jZeroTime := results[j].Datetime.IsZero() - - // 如果两者都是零值时间,按关键词优先级排序 - if iZeroTime && jZeroTime { - iPriority := getKeywordPriority(results[i].Title) - jPriority := getKeywordPriority(results[j].Title) - if iPriority != jPriority { - return iPriority > jPriority - } - // 如果优先级也相同,按标题字母顺序排序 - return results[i].Title < results[j].Title - } - - // 如果只有一个是零值时间,将其排在后面 - if iZeroTime { - return false // i排在后面 - } - if jZeroTime { - return true // j排在后面,i排在前面 - } - - // 两者都有正常时间,使用原有逻辑 - // 计算两个结果的时间差(以天为单位) - timeDiff := daysBetween(results[i].Datetime, results[j].Datetime) - - // 如果时间差超过30天,按时间排序(新的在前面) - if abs(timeDiff) > 30 { - return results[i].Datetime.After(results[j].Datetime) - } - - // 如果时间差在30天内,先检查时间差是否超过1天 - if abs(timeDiff) > 1 { - return results[i].Datetime.After(results[j].Datetime) - } - - // 如果时间差在1天内,检查关键词优先级 - iPriority := getKeywordPriority(results[i].Title) - jPriority := getKeywordPriority(results[j].Title) - - // 如果优先级不同,优先级高的排在前面 - if iPriority != jPriority { - return iPriority > jPriority - } - - // 如果优先级相同且时间差在1天内,仍然按时间排序(新的在前面) - return results[i].Datetime.After(results[j].Datetime) - }) -} - -// 计算两个时间之间的天数差 -func daysBetween(t1, t2 time.Time) float64 { - duration := t1.Sub(t2) - return duration.Hours() / 24 -} - -// 绝对值 -func abs(x float64) float64 { - if x < 0 { - return -x - } - return x -} - -// 获取标题中包含优先关键词的优先级 -func getKeywordPriority(title string) int { - title = strings.ToLower(title) - for i, keyword := range priorityKeywords { - if strings.Contains(title, keyword) { - // 返回优先级(数组索引越小,优先级越高) - return len(priorityKeywords) - i - } - } - return 0 -} -``` - -#### 3.4.3 频道搜索方法 - -```go -// 搜索单个频道 -func (s *SearchService) searchChannel(keyword string, channel string) ([]model.SearchResult, error) { - // 构建搜索URL - url := util.BuildSearchURL(channel, keyword, "") - - // 使用全局HTTP客户端(已配置代理) - client := util.GetHTTPClient() - - // 创建一个带超时的上下文 - ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) - defer cancel() - - // 创建请求 - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - - // 发送请求 - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - // 读取响应体 - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - // 解析响应 - results, _, err := util.ParseSearchResults(string(body), channel) - if err != nil { - return nil, err - } - - return results, nil -} -``` - -#### 3.4.4 结果合并方法 - -```go -// 将搜索结果按网盘类型分组 -func mergeResultsByType(results []model.SearchResult) model.MergedLinks { - // 创建合并结果的映射 - mergedLinks := make(model.MergedLinks, 10) // 预分配容量,假设有10种不同的网盘类型 - - // 用于去重的映射,键为URL - uniqueLinks := make(map[string]model.MergedLink) - - // 遍历所有搜索结果 - for _, result := range results { - for _, link := range result.Links { - // 创建合并后的链接 - mergedLink := model.MergedLink{ - URL: link.URL, - Password: link.Password, - Note: result.Title, - Datetime: result.Datetime, - } - // 检查是否已存在相同URL的链接 - if existingLink, exists := uniqueLinks[link.URL]; exists { - // 如果已存在,只有当当前链接的时间更新时才替换 - if mergedLink.Datetime.After(existingLink.Datetime) { - uniqueLinks[link.URL] = mergedLink + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res) + if err != nil { + return } - } else { - // 如果不存在,直接添加 - uniqueLinks[link.URL] = mergedLink - } - } - } - - // 将去重后的链接按类型分组 - for url, mergedLink := range uniqueLinks { - // 获取链接类型 - linkType := "" - for _, result := range results { - for _, link := range result.Links { - if link.URL == url { - linkType = link.Type - break + enhancedTwoLevelCache.Set(cacheKey, data, ttl) + } else if twoLevelCache != nil { + data, err := cache.SerializeWithPool(res) + if err != nil { + return } + twoLevelCache.Set(cacheKey, data, ttl) } - if linkType != "" { - break - } - } - - // 如果没有找到类型,使用"unknown" - if linkType == "" { - linkType = "unknown" - } - - // 添加到对应类型的列表中 - mergedLinks[linkType] = append(mergedLinks[linkType], mergedLink) + }(allResults) } - // 对每种类型的链接按时间排序(新的在前面) - for linkType, links := range mergedLinks { - sort.Slice(links, func(i, j int) bool { - return links[i].Datetime.After(links[j].Datetime) - }) - mergedLinks[linkType] = links - } - - return mergedLinks + return allResults, nil } ``` -## 4. 核心设计思想 +### 3.6 缓存更新检测机制 -### 4.1 高性能设计 +插件搜索方法中实现了缓存数据时间戳检查机制,确保即使在不强制刷新的情况下也能获取最新的缓存数据: -1. **并发搜索**:使用工作池模式实现高效并发搜索,充分利用系统资源 -2. **缓存机制**:利用两级缓存(内存+磁盘)提高重复查询的响应速度 -3. **异步缓存写入**:使用goroutine异步写入缓存,避免阻塞主流程 -4. **内存预分配**:为结果集预分配内存,减少动态扩容带来的开销 -5. **超时控制**:对搜索操作设置严格的超时限制,避免长时间阻塞 +1. **时间戳检查**:检查缓存数据中的时间戳,判断数据是否是最近更新的 +2. **最新数据判断**:如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 +3. **重新搜索条件**:如果缓存数据不是最新的,则重新执行搜索以获取最新数据 -### 4.2 智能排序策略 +这种机制特别适用于异步插件,因为异步插件会在后台持续更新缓存数据。通过时间戳检查,系统能够确保用户总是获取到最新的搜索结果,同时保持良好的性能。 -搜索服务实现了复杂的多级排序策略,综合考虑时间和关键词权重: +## 4. 结果处理 -1. **时间优先**:优先展示最新的结果,保证用户获取最新资源 -2. **关键词权重**:对包含"全"、"合集"、"系列"等关键词的结果给予更高权重 -3. **多级排序**:根据时间差的大小采用不同的排序策略 - - 时间差超过30天:纯粹按时间排序 - - 时间差在1-30天:仍然按时间排序 - - 时间差在1天内:优先考虑关键词权重,再考虑时间 -4. **零值时间处理**:对没有时间信息的结果进行特殊处理,按关键词权重排序 +### 4.1 过滤和排序 -### 4.3 结果过滤策略 +搜索服务实现了多种结果处理功能,包括过滤、排序和分类: -1. **关键词匹配**:确保结果的标题或内容包含搜索关键词 -2. **多关键词支持**:支持空格分隔的多关键词搜索,要求所有关键词都匹配 -3. **特殊关键词处理**:"pwd"关键词特殊处理,检查标题、内容和链接URL -4. **质量过滤**:只保留有时间信息或包含优先关键词的结果到Results中 +1. **关键词过滤**:确保结果标题包含搜索关键词 +2. **时间和关键词排序**:基于时间和关键词优先级的多级排序策略 +3. **网盘类型分类**:按网盘类型分组展示结果 -### 4.4 结果分类策略 +### 4.2 合并结果 -1. **网盘类型分类**:按网盘类型(如百度网盘、阿里云盘等)分类展示结果 -2. **链接去重**:对相同URL的链接进行去重,保留最新的信息 -3. **类型内排序**:每种网盘类型内部按时间排序,新的在前面 +系统支持合并不同来源的搜索结果,并处理可能的重复: -## 5. 错误处理策略 +```go +// 合并搜索结果 +func mergeSearchResults(tgResults, pluginResults []model.SearchResult) []model.SearchResult { + // 预估合并后的结果数量 + totalSize := len(tgResults) + len(pluginResults) + if totalSize == 0 { + return []model.SearchResult{} + } + + // 创建结果映射,用于去重 + resultMap := make(map[string]model.SearchResult, totalSize) + + // 添加TG搜索结果 + for _, result := range tgResults { + resultMap[result.UniqueID] = result + } + + // 添加或更新插件搜索结果(如果有重复,保留较新的) + for _, result := range pluginResults { + if existing, ok := resultMap[result.UniqueID]; ok { + // 如果已存在,保留较新的 + if result.Datetime.After(existing.Datetime) { + resultMap[result.UniqueID] = result + } + } else { + resultMap[result.UniqueID] = result + } + } + + // 转换回切片 + mergedResults := make([]model.SearchResult, 0, len(resultMap)) + for _, result := range resultMap { + mergedResults = append(mergedResults, result) + } + + return mergedResults +} +``` -1. **优雅降级**:单个搜索源失败不影响整体结果,保证服务可用性 -2. **超时控制**:对每个搜索任务设置超时限制,避免因单个任务阻塞整体搜索 -3. **错误隔离**:搜索错误不会传播到上层,保证API层的稳定性 -4. **空结果处理**:当没有搜索任务或所有任务失败时,返回空结果而非错误 +## 5. 服务层优化 -## 6. 缓存策略 +### 5.1 缓存系统优化 -1. **缓存键生成**:基于搜索关键词生成缓存键 -2. **缓存命中检查**:在执行搜索前检查缓存,提高响应速度 -3. **强制刷新**:支持forceRefresh参数,便于获取最新结果 -4. **异步缓存更新**:使用goroutine异步写入缓存,避免阻塞主流程 -5. **缓存TTL**:缓存项有明确的过期时间,确保数据不会过时 +服务层通过以下方式优化了缓存系统: -## 7. 可扩展性设计 +1. **分离缓存键**:为TG搜索和插件搜索生成独立的缓存键,实现独立更新 +2. **增强版两级缓存**:优先使用增强版两级缓存,失败时回退到原始缓存 +3. **缓存数据时间戳检查**:检查缓存数据是否是最新的,确保获取最新结果 +4. **异步缓存更新**:搜索结果异步写入缓存,不阻塞主流程 -1. **插件系统集成**:通过插件管理器集成各种搜索插件,便于扩展搜索源 -2. **参数化控制**:通过参数控制搜索行为,如并发数、搜索源等 -3. **结果类型过滤**:支持不同类型的结果返回,满足不同场景需求 -4. **源类型过滤**:支持按源类型(Telegram、插件)过滤搜索结果 +### 5.2 并发性能优化 -## 8. 性能优化措施 +服务层通过以下方式优化了并发性能: -1. **内存优化**: - - 预分配结果集容量,减少动态扩容 - - 创建副本避免闭包问题 - - 使用指针减少大对象复制 - -2. **并发优化**: - - 使用工作池控制并发数量 - - 智能设置默认并发数(频道数+插件数+10) - - 并发任务超时控制 - -3. **缓存优化**: - - 两级缓存机制(内存+磁盘) - - 异步写入缓存,避免阻塞主流程 - - 缓存完整结果,支持不同resultType过滤 - -4. **算法优化**: - - 使用map进行插件名称匹配,提高查找效率 - - 优化排序算法,减少比较次数 - - 链接去重使用map实现,提高效率 +1. **并行搜索**:TG搜索和插件搜索并行执行 +2. **工作池**:使用工作池并行执行多个搜索任务 +3. **超时控制**:对搜索任务设置超时限制,避免长时间阻塞 +4. **异步操作**:非关键路径使用异步处理,如缓存写入 diff --git a/docs/4-插件系统设计.md b/docs/4-插件系统设计.md index b329a06..8238d76 100644 --- a/docs/4-插件系统设计.md +++ b/docs/4-插件系统设计.md @@ -250,121 +250,220 @@ func loadCacheFromDisk() { ### 5.4 异步插件实现示例 ```go -// HunhepanAsyncPlugin 混合盘搜索异步插件 -type HunhepanAsyncPlugin struct { +// JikePanPlugin 即刻盘异步搜索插件 +type JikePanPlugin struct { *plugin.BaseAsyncPlugin } -// NewHunhepanAsyncPlugin 创建新的混合盘搜索异步插件 -func NewHunhepanAsyncPlugin() *HunhepanAsyncPlugin { - return &HunhepanAsyncPlugin{ - BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("hunhepan_async", 3), +// NewJikePanPlugin 创建即刻盘插件 +func NewJikePanPlugin() *JikePanPlugin { + return &JikePanPlugin{ + BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("jikepan", 5), } } -// Search 执行搜索并返回结果 -func (p *HunhepanAsyncPlugin) Search(keyword string) ([]model.SearchResult, error) { +// Search 实现搜索接口 +func (p *JikePanPlugin) Search(keyword string) ([]model.SearchResult, error) { // 生成缓存键 - cacheKey := keyword + cacheKey := generateCacheKey(keyword) - // 使用异步搜索基础方法 - return p.AsyncSearch(keyword, cacheKey, p.doSearch) + // 使用异步搜索 + return p.BaseAsyncPlugin.AsyncSearch(keyword, cacheKey, p.doSearch) } -// doSearch 实际的搜索实现 -func (p *HunhepanAsyncPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) { - // 实现具体搜索逻辑 +// doSearch 执行实际搜索 +func (p *JikePanPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) { + // 实现搜索逻辑 // ... + return results, nil } ``` -## 6. 优雅关闭机制 +### 5.5 缓存更新检测机制 -系统实现了优雅关闭机制,确保在程序退出前保存异步插件缓存: +为了确保即使在不强制刷新的情况下也能获取最新的缓存数据,系统在搜索服务中实现了缓存数据时间戳检查机制: ```go -// 在main.go中 -// 创建通道来接收操作系统信号 -quit := make(chan os.Signal, 1) -signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - -// 等待中断信号 -<-quit -fmt.Println("正在关闭服务器...") - -// 保存异步插件缓存 -plugin.SaveCacheToDisk() - -// 优雅关闭服务器 -if err := srv.Shutdown(ctx); err != nil { - log.Fatalf("服务器关闭异常: %v", err) +// 在searchPlugins方法中 +if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + // 确保缓存数据是最新的 + // 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回 + // 否则,我们将重新执行搜索以获取最新数据 + if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } + } + } else { + // 如果缓存中没有数据,直接返回空结果 + return results, nil + } + } } ``` -## 7. JSON处理优化 +这种机制特别适用于异步插件,因为异步插件会在后台持续更新缓存数据。通过时间戳检查,系统能够确保用户总是获取到最新的搜索结果,同时保持良好的性能。 -为了提高插件的性能,特别是在处理大量JSON数据时,所有插件都使用了高性能的JSON库进行序列化和反序列化操作。 +### 5.6 异步插件与主程序缓存协同 -### 7.1 JSON库选择 +异步插件系统与主程序的缓存系统协同工作,实现了完整的缓存更新流程: -PanSou使用字节跳动开发的sonic库替代标准库的encoding/json,提供更高效的JSON处理: +1. **异步插件缓存更新**:异步插件在后台持续更新自己的缓存 +2. **主程序缓存检查**:主程序在获取缓存数据时检查时间戳 +3. **缓存更新判断**:如果缓存数据不是最新的,重新执行搜索 +4. **缓存写入**:搜索结果异步写入主程序缓存 + +这种协同机制确保了用户总是能获取到最新的搜索结果,同时保持系统的高性能和响应速度。 + +## 6. 插件管理器 + +### 6.1 插件管理器设计 + +插件管理器负责管理所有已注册的插件,提供统一的接口获取和使用插件。 ```go -// 使用优化的JSON库 -import ( - "pansou/util/json" // 内部封装了github.com/bytedance/sonic -) +// PluginManager 插件管理器 +type PluginManager struct { + plugins []SearchPlugin + mutex sync.RWMutex +} -// 序列化示例 -jsonData, err := json.Marshal(reqBody) +// NewPluginManager 创建插件管理器 +func NewPluginManager() *PluginManager { + return &PluginManager{ + plugins: make([]SearchPlugin, 0), + } +} -// 反序列化示例 -if err := json.Unmarshal(respBody, &apiResp); err != nil { - return nil, fmt.Errorf("decode response failed: %w", err) +// LoadPlugins 加载所有已注册的插件 +func (m *PluginManager) LoadPlugins() { + m.mutex.Lock() + defer m.mutex.Unlock() + + // 获取所有已注册的插件 + m.plugins = GetRegisteredPlugins() + + // 按优先级排序 + sort.Slice(m.plugins, func(i, j int) bool { + return m.plugins[i].Priority() > m.plugins[j].Priority() + }) +} + +// GetPlugins 获取所有插件 +func (m *PluginManager) GetPlugins() []SearchPlugin { + m.mutex.RLock() + defer m.mutex.RUnlock() + + return m.plugins } ``` -### 7.2 性能优势 - -- **更快的序列化/反序列化速度**:sonic库比标准库快2-5倍 -- **更低的内存分配**:减少GC压力 -- **SIMD加速**:利用现代CPU的向量指令集 -- **并行处理**:大型JSON可以并行处理 - -### 7.3 实现方式 - -所有插件通过统一的内部包装库使用sonic: +### 6.2 插件管理器使用 ```go -// util/json/json.go -package json +// 在main.go中初始化插件管理器 +pluginManager := plugin.NewPluginManager() +pluginManager.LoadPlugins() + +// 创建搜索服务,传入插件管理器 +searchService := service.NewSearchService(pluginManager) + +// 设置路由 +router := api.SetupRouter(searchService) +``` + +## 7. 插件系统优势 + +1. **可扩展性**:通过统一接口和自动注册机制,轻松添加新的搜索源 +2. **高性能**:异步插件模式提供快速响应和后台处理,提高用户体验 +3. **缓存优化**:完善的缓存机制,提高重复查询的响应速度 +4. **错误隔离**:单个插件的错误不会影响其他插件和整体系统 +5. **优先级控制**:通过插件优先级控制搜索结果的排序 + +## 8. 插件开发指南 + +### 8.1 同步插件开发 + +```go +package myplugin import ( - "github.com/bytedance/sonic" + "pansou/model" + "pansou/plugin" ) -// API是sonic的全局配置实例 -var API = sonic.ConfigDefault +// MyPlugin 自定义插件 +type MyPlugin struct{} -// 初始化sonic配置 +// 注册插件 func init() { - // 根据需要配置sonic选项 - API = sonic.Config{ - UseNumber: true, - EscapeHTML: true, - SortMapKeys: false, // 生产环境设为false提高性能 - }.Froze() + plugin.RegisterGlobalPlugin(&MyPlugin{}) } -// Marshal 使用sonic序列化对象到JSON -func Marshal(v interface{}) ([]byte, error) { - return API.Marshal(v) +// Name 返回插件名称 +func (p *MyPlugin) Name() string { + return "myplugin" } -// Unmarshal 使用sonic反序列化JSON到对象 -func Unmarshal(data []byte, v interface{}) error { - return API.Unmarshal(data, v) +// Search 执行搜索 +func (p *MyPlugin) Search(keyword string) ([]model.SearchResult, error) { + // 实现搜索逻辑 + // ... + return results, nil +} + +// Priority 返回插件优先级 +func (p *MyPlugin) Priority() int { + return 3 } ``` -这种统一的JSON处理方式确保了所有插件都能获得一致的高性能,特别是在处理大量搜索结果时,显著提升了系统整体响应速度。 \ No newline at end of file +### 8.2 异步插件开发 + +```go +package myasyncplugin + +import ( + "pansou/model" + "pansou/plugin" +) + +// MyAsyncPlugin 自定义异步插件 +type MyAsyncPlugin struct { + *plugin.BaseAsyncPlugin +} + +// 创建并注册插件 +func init() { + plugin.RegisterGlobalPlugin(NewMyAsyncPlugin()) +} + +// NewMyAsyncPlugin 创建异步插件 +func NewMyAsyncPlugin() *MyAsyncPlugin { + return &MyAsyncPlugin{ + BaseAsyncPlugin: plugin.NewBaseAsyncPlugin("myasyncplugin", 4), + } +} + +// Search 实现搜索接口 +func (p *MyAsyncPlugin) Search(keyword string) ([]model.SearchResult, error) { + // 生成缓存键 + cacheKey := generateCacheKey(keyword) + + // 使用异步搜索 + return p.BaseAsyncPlugin.AsyncSearch(keyword, cacheKey, p.doSearch) +} + +// doSearch 执行实际搜索 +func (p *MyAsyncPlugin) doSearch(client *http.Client, keyword string) ([]model.SearchResult, error) { + // 实现搜索逻辑 + // ... + return results, nil +} +``` \ No newline at end of file diff --git a/docs/5-缓存系统设计.md b/docs/5-缓存系统设计.md index 9deca45..fd51727 100644 --- a/docs/5-缓存系统设计.md +++ b/docs/5-缓存系统设计.md @@ -2,7 +2,7 @@ ## 1. 缓存系统概述 -缓存系统是PanSou性能优化的核心组件,通过两级缓存(内存+磁盘)机制,大幅提升重复查询的响应速度。该系统采用分层设计,实现了高效的缓存存取和智能的缓存策略。 +缓存系统是PanSou性能优化的核心组件,通过增强版两级缓存(内存+分片磁盘)机制,大幅提升重复查询的响应速度。该系统采用分层设计,实现了高效的缓存存取和智能的缓存策略。 PanSou的缓存系统包括两个主要部分: 1. **通用缓存系统**:用于API响应和常规搜索结果缓存 @@ -15,7 +15,10 @@ pansou/util/cache/ ├── cache_key.go # 优化的缓存键生成 ├── cache_key_test.go # 缓存键单元测试 ├── disk_cache.go # 磁盘缓存实现 +├── sharded_disk_cache.go # 分片磁盘缓存实现 ├── two_level_cache.go # 两级缓存实现 +├── enhanced_two_level_cache.go # 增强版两级缓存实现 +├── serializer.go # 序列化器接口 ├── utils.go # 缓存工具函数 └── utils_test.go # 缓存工具测试 @@ -28,9 +31,9 @@ pansou/util/json/ ## 3. 缓存架构设计 -### 3.1 两级缓存架构 +### 3.1 增强版两级缓存架构 -PanSou采用两级缓存架构,包括内存缓存和磁盘缓存: +PanSou采用增强版两级缓存架构,包括内存缓存和分片磁盘缓存: ``` ┌─────────────────────────┐ @@ -46,7 +49,7 @@ PanSou采用两级缓存架构,包括内存缓存和磁盘缓存: └───────────┬─────────────┘ │ (未命中) ┌───────────▼─────────────┐ -│ 磁盘缓存查询 │ +│ 分片磁盘缓存查询 │ └───────────┬─────────────┘ │ (未命中) ┌───────────▼─────────────┐ @@ -58,7 +61,7 @@ PanSou采用两级缓存架构,包括内存缓存和磁盘缓存: └───────────┬─────────────┘ │ ┌───────────▼─────────────┐ -│ 异步更新磁盘缓存 │ +│ 异步更新分片磁盘缓存 │ └─────────────────────────┘ ``` @@ -69,43 +72,48 @@ PanSou采用两级缓存架构,包括内存缓存和磁盘缓存: - 存储热点数据 - 减少磁盘I/O -2. **磁盘缓存**: +2. **分片磁盘缓存**: - 提供持久存储 - - 存储更多数据 + - 分片存储提高并发性能 + - 减少锁竞争 - 在服务重启后保留缓存 +3. **序列化器**: + - 提供统一的序列化/反序列化接口 + - 支持多种序列化方式(Gob和JSON) + - 优化序列化性能 + ## 4. 缓存键设计 -### 4.1 缓存键生成(cache_key.go) +### 4.1 分离的缓存键生成 -缓存键生成是缓存系统的基础,决定了缓存的命中率和有效性。 +缓存键生成是缓存系统的基础,决定了缓存的命中率和有效性。PanSou实现了分离的缓存键生成策略,为TG搜索和插件搜索生成独立的缓存键。 ```go -// GenerateCacheKey 根据所有影响搜索结果的参数生成缓存键 -func GenerateCacheKey(keyword string, channels []string, sourceType string, plugins []string) string { +// GenerateTGCacheKey 为TG搜索生成缓存键 +func GenerateTGCacheKey(keyword string, channels []string) string { // 关键词标准化 normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) // 获取频道列表哈希 channelsHash := getChannelsHash(channels) - // 源类型处理 - if sourceType == "" { - sourceType = "all" - } + // 生成TG搜索特定的缓存键 + keyStr := fmt.Sprintf("tg:%s:%s", normalizedKeyword, channelsHash) + hash := md5.Sum([]byte(keyStr)) + return hex.EncodeToString(hash[:]) +} + +// GeneratePluginCacheKey 为插件搜索生成缓存键 +func GeneratePluginCacheKey(keyword string, plugins []string) string { + // 关键词标准化 + normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) - // 插件参数规范化处理 - var pluginsHash string - if sourceType == "tg" { - // 对于只搜索Telegram的请求,忽略插件参数 - pluginsHash = "none" - } else { - // 获取插件列表哈希 - pluginsHash = getPluginsHash(plugins) - } + // 获取插件列表哈希 + pluginsHash := getPluginsHash(plugins) - // 生成最终缓存键 - keyStr := fmt.Sprintf("%s:%s:%s:%s", normalizedKeyword, channelsHash, sourceType, pluginsHash) + // 生成插件搜索特定的缓存键 + keyStr := fmt.Sprintf("plugin:%s:%s", normalizedKeyword, pluginsHash) hash := md5.Sum([]byte(keyStr)) return hex.EncodeToString(hash[:]) } @@ -114,10 +122,11 @@ func GenerateCacheKey(keyword string, channels []string, sourceType string, plug ### 4.2 缓存键设计思想 1. **标准化处理**:对关键词进行标准化,确保相同语义的查询使用相同的缓存键 -2. **参数敏感**:缓存键包含影响结果的参数(如搜索频道、来源类型、插件列表),避免错误的缓存命中 +2. **参数敏感**:缓存键包含影响结果的参数(如搜索频道、插件列表),避免错误的缓存命中 3. **排序处理**:对数组参数进行排序,确保参数顺序不同但内容相同的查询使用相同的缓存键 4. **哈希处理**:对大型列表使用哈希处理,减小缓存键长度,提高性能 5. **参数规范化**:统一处理不同形式但语义相同的参数,提高缓存命中率 +6. **分离策略**:为TG搜索和插件搜索生成独立的缓存键,实现独立更新 ### 4.3 列表参数处理 @@ -249,698 +258,330 @@ func init() { - 不传plugins参数 - 传空plugins数组 - 传只包含空字符串的plugins数组 - - 传所有插件名称 + + 以上三种情况都视为未指定插件,使用相同的缓存键。 - 这四种情况都被统一处理,生成相同的缓存键。 +2. **频道参数规范化**: + - 不传channels参数 + - 传空channels数组 + - 传只包含空字符串的channels数组 + + 以上三种情况都视为使用默认频道,使用相同的缓存键。 -2. **搜索类型规范化**: - - 对于`sourceType=tg`的请求,忽略插件参数,使用固定值"none" - - 对于`sourceType=all`或`sourceType=plugin`的请求,根据插件参数内容决定缓存键 +3. **关键词规范化**: + - 去除前后空格 + - 转换为小写 + - 保留中间空格(支持多关键词搜索) -3. **参数预处理**: - - 在`Search`函数中添加参数预处理逻辑,确保不同形式的参数产生相同的搜索结果 - - 对于包含所有注册插件的请求,统一设为nil,与不指定插件的请求使用相同的缓存键 +### 5.2 缓存数据时间戳检查 -### 5.2 缓存键测试 +为了确保即使在不强制刷新的情况下也能获取最新的缓存数据,系统实现了缓存数据时间戳检查机制: ```go -func TestPluginParameterNormalization(t *testing.T) { - // 获取所有插件名称 - allPlugins := plugin.GetRegisteredPlugins() - allPluginNames := make([]string, 0, len(allPlugins)) - for _, p := range allPlugins { - allPluginNames = append(allPluginNames, p.Name()) - } - - // 测试不传插件参数 - key1 := GenerateCacheKey("movie", nil, "all", nil) - - // 测试传空插件数组 - key2 := GenerateCacheKey("movie", nil, "all", []string{}) - - // 测试传只包含空字符串的插件数组 - key3 := GenerateCacheKey("movie", nil, "all", []string{""}) - - // 测试传所有插件 - key4 := GenerateCacheKey("movie", nil, "all", allPluginNames) - - // 所有情况应该生成相同的缓存键 - if key1 != key2 || key1 != key3 || key1 != key4 { - t.Errorf("Different plugin parameter forms should generate the same cache key:\nnil: %s\nempty: %s\nempty string: %s\nall plugins: %s", - key1, key2, key3, key4) - } - - // 测试sourceType=tg时忽略插件参数 - key5 := GenerateCacheKey("movie", nil, "tg", nil) - key6 := GenerateCacheKey("movie", nil, "tg", allPluginNames) - - if key5 != key6 { - t.Errorf("With sourceType=tg, plugin parameters should be ignored: %s != %s", key5, key6) +// 检查缓存数据是否是最新的 +if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } } } ``` -## 6. 内存缓存设计 +这种机制特别适用于异步插件,因为异步插件会在后台持续更新缓存数据。通过时间戳检查,系统能够确保用户总是获取到最新的搜索结果,同时保持良好的性能。 -### 6.1 内存缓存实现(memory_cache.go) +## 6. 分片磁盘缓存 -内存缓存提供快速访问,减少磁盘I/O,适合存储热点数据。 +### 6.1 分片磁盘缓存设计 + +分片磁盘缓存通过将缓存数据分散到多个子目录中,解决了高并发场景下的锁竞争问题。 ```go -// MemoryCache 内存缓存 -type MemoryCache struct { - cache map[string]cacheItem - mutex sync.RWMutex - maxSize int64 - currSize int64 +// ShardedDiskCache 分片磁盘缓存 +type ShardedDiskCache struct { + baseDir string + shardCount int + shards []*DiskCache + maxSizeMB int + mutex sync.RWMutex } -// cacheItem 缓存项 -type cacheItem struct { - data []byte - expireAt time.Time - size int64 -} - -// NewMemoryCache 创建新的内存缓存 -func NewMemoryCache(maxSizeMB int) *MemoryCache { - maxSize := int64(maxSizeMB) * 1024 * 1024 - return &MemoryCache{ - cache: make(map[string]cacheItem), - maxSize: maxSize, - } -} - -// Get 从内存缓存获取数据 -func (c *MemoryCache) Get(key string) ([]byte, bool, error) { - c.mutex.RLock() - defer c.mutex.RUnlock() - - item, ok := c.cache[key] - if !ok { - return nil, false, nil +// NewShardedDiskCache 创建新的分片磁盘缓存 +func NewShardedDiskCache(baseDir string, shardCount, maxSizeMB int) (*ShardedDiskCache, error) { + // 确保每个分片的大小合理 + shardSize := maxSizeMB / shardCount + if shardSize < 1 { + shardSize = 1 } - // 检查是否过期 - if time.Now().After(item.expireAt) { - return nil, false, nil + cache := &ShardedDiskCache{ + baseDir: baseDir, + shardCount: shardCount, + shards: make([]*DiskCache, shardCount), + maxSizeMB: maxSizeMB, } - return item.data, true, nil -} - -// Set 将数据存入内存缓存 -func (c *MemoryCache) Set(key string, data []byte, ttl time.Duration) error { - c.mutex.Lock() - defer c.mutex.Unlock() - - size := int64(len(data)) - - // 如果数据太大,超过最大缓存大小,不缓存 - if size > c.maxSize { - return nil - } - - // 检查是否需要腾出空间 - if c.currSize+size > c.maxSize { - c.evict(c.currSize + size - c.maxSize) - } - - // 存储数据 - c.cache[key] = cacheItem{ - data: data, - expireAt: time.Now().Add(ttl), - size: size, - } - - c.currSize += size - return nil -} - -// 腾出空间 -func (c *MemoryCache) evict(sizeToFree int64) { - // 按过期时间排序 - type keyExpire struct { - key string - expireAt time.Time - } - - items := make([]keyExpire, 0, len(c.cache)) - for k, v := range c.cache { - items = append(items, keyExpire{k, v.expireAt}) - } - - // 按过期时间排序,先过期的先删除 - sort.Slice(items, func(i, j int) bool { - return items[i].expireAt.Before(items[j].expireAt) - }) - - // 删除足够的项目以腾出空间 - freed := int64(0) - for _, item := range items { - if freed >= sizeToFree { - break + // 初始化每个分片 + for i := 0; i < shardCount; i++ { + shardPath := filepath.Join(baseDir, fmt.Sprintf("shard_%d", i)) + diskCache, err := NewDiskCache(shardPath, shardSize) + if err != nil { + return nil, err } - - cacheItem := c.cache[item.key] - freed += cacheItem.size - c.currSize -= cacheItem.size - delete(c.cache, item.key) + cache.shards[i] = diskCache } + + return cache, nil +} + +// 获取键对应的分片 +func (c *ShardedDiskCache) getShard(key string) *DiskCache { + // 计算哈希值决定分片 + h := fnv.New32a() + h.Write([]byte(key)) + shardIndex := int(h.Sum32()) % c.shardCount + return c.shards[shardIndex] } ``` -## 7. 两级缓存实现 +### 6.2 分片磁盘缓存优势 -### 7.1 两级缓存(two_level_cache.go) +1. **减少锁竞争**:每个分片有自己的锁,减少了高并发场景下的锁竞争 +2. **提高并发性能**:多个请求可以同时访问不同的分片,提高并发性能 +3. **均衡负载**:通过哈希算法将缓存键均匀分布到不同分片,实现负载均衡 +4. **隔离故障**:单个分片的故障不会影响其他分片的正常运行 -两级缓存整合内存缓存和磁盘缓存,提供统一的接口。 +## 7. 序列化器接口 + +### 7.1 序列化器设计 + +序列化器接口统一了序列化和反序列化操作,支持多种序列化方式。 ```go -// TwoLevelCache 两级缓存 -type TwoLevelCache struct { - memCache *MemoryCache - diskCache *DiskCache +// Serializer 序列化接口 +type Serializer interface { + Serialize(v interface{}) ([]byte, error) + Deserialize(data []byte, v interface{}) error } -// NewTwoLevelCache 创建新的两级缓存 -func NewTwoLevelCache() (*TwoLevelCache, error) { - // 获取配置 - maxSizeMB := 100 // 默认100MB - if sizeStr := os.Getenv("CACHE_MAX_SIZE"); sizeStr != "" { - if size, err := strconv.Atoi(sizeStr); err == nil { - maxSizeMB = size - } - } - +// GobSerializer 使用gob进行序列化/反序列化 +type GobSerializer struct { + bufferPool sync.Pool +} + +// JSONSerializer 使用JSON进行序列化/反序列化 +type JSONSerializer struct { + // 使用sonic库提高性能 +} +``` + +### 7.2 Gob序列化优势 + +相比JSON序列化,Gob序列化具有以下优势: + +1. **更高的性能**:Gob序列化/反序列化速度更快 +2. **更小的结果大小**:Gob序列化结果通常比JSON更小 +3. **更好的Go类型支持**:Gob是Go语言原生的序列化格式,对Go类型支持更好 + +### 7.3 类型注册 + +为了确保Gob序列化器能正确处理所有类型,系统在初始化时注册了所有需要的类型: + +```go +func init() { + // 注册model包中的所有类型 + gob.Register(model.SearchResult{}) + gob.Register(model.SearchResponse{}) + gob.Register(model.Link{}) + gob.Register(model.MergedLink{}) + gob.Register(map[string][]model.MergedLink{}) + gob.Register([]model.SearchResult{}) + gob.Register(time.Time{}) +} +``` + +## 8. 增强版两级缓存 + +### 8.1 增强版两级缓存设计 + +增强版两级缓存结合了内存缓存、分片磁盘缓存和可配置的序列化器。 + +```go +// EnhancedTwoLevelCache 增强版两级缓存 +type EnhancedTwoLevelCache struct { + memory *MemoryCache + disk *ShardedDiskCache + mutex sync.RWMutex + serializer Serializer +} + +// NewEnhancedTwoLevelCache 创建新的增强版两级缓存 +func NewEnhancedTwoLevelCache() (*EnhancedTwoLevelCache, error) { // 创建内存缓存 - memCache := NewMemoryCache(maxSizeMB) + memory := NewMemoryCache(config.AppConfig.MemoryCacheSize) - // 创建磁盘缓存 - diskCache, err := NewDiskCache() + // 创建分片磁盘缓存 + disk, err := NewShardedDiskCache( + config.AppConfig.CachePath, + config.AppConfig.ShardCount, + config.AppConfig.CacheMaxSizeMB, + ) if err != nil { return nil, err } - return &TwoLevelCache{ - memCache: memCache, - diskCache: diskCache, + // 创建Gob序列化器 + serializer := NewGobSerializer() + + return &EnhancedTwoLevelCache{ + memory: memory, + disk: disk, + serializer: serializer, }, nil } +``` +### 8.2 增强版两级缓存方法 + +```go // Get 从缓存获取数据 -func (c *TwoLevelCache) Get(key string) ([]byte, bool, error) { - // 先查内存缓存 - data, hit, err := c.memCache.Get(key) - if hit || err != nil { - return data, hit, err +func (c *EnhancedTwoLevelCache) Get(key string) ([]byte, bool, error) { + // 先从内存缓存获取 + data, ok := c.memory.Get(key) + if ok { + return data, true, nil } - // 内存未命中,查磁盘缓存 - data, hit, err = c.diskCache.Get(key) + // 内存缓存未命中,从磁盘缓存获取 + data, ok, err := c.disk.Get(key) if err != nil { return nil, false, err } - // 如果磁盘命中,更新内存缓存 - if hit { - // 使用较短的TTL,因为这只是内存缓存 - c.memCache.Set(key, data, 10*time.Minute) + if ok { + // 将数据放入内存缓存 + c.memory.Set(key, data) + return data, true, nil } - return data, hit, nil + return nil, false, nil } // Set 将数据存入缓存 -func (c *TwoLevelCache) Set(key string, data []byte, ttl time.Duration) error { - // 更新内存缓存 - if err := c.memCache.Set(key, data, ttl); err != nil { - return err - } +func (c *EnhancedTwoLevelCache) Set(key string, data []byte, ttl time.Duration) error { + // 存入内存缓存 + c.memory.Set(key, data) - // 异步更新磁盘缓存 - go c.diskCache.Set(key, data, ttl) + // 异步存入磁盘缓存 + go func() { + c.disk.Set(key, data, ttl) + }() return nil } + +// GetSerializer 获取序列化器 +func (c *EnhancedTwoLevelCache) GetSerializer() Serializer { + return c.serializer +} ``` -## 8. 序列化优化 +### 8.3 双缓存机制 -### 8.1 高性能JSON处理(util/json包) - -为提高序列化和反序列化性能,系统封装了bytedance/sonic库,提供高性能的JSON处理功能: +为了确保系统平稳过渡,PanSou实现了双缓存机制: ```go -// pansou/util/json/json.go -package json - -import ( - "github.com/bytedance/sonic" -) - -// API是sonic的全局配置实例 -var API = sonic.ConfigDefault - -// 初始化sonic配置 -func init() { - // 根据需要配置sonic选项 - API = sonic.Config{ - UseNumber: true, - EscapeHTML: true, - SortMapKeys: false, // 生产环境设为false提高性能 - }.Froze() -} - -// Marshal 使用sonic序列化对象到JSON -func Marshal(v interface{}) ([]byte, error) { - return API.Marshal(v) -} - -// Unmarshal 使用sonic反序列化JSON到对象 -func Unmarshal(data []byte, v interface{}) error { - return API.Unmarshal(data, v) -} - -// MarshalString 序列化对象到JSON字符串 -func MarshalString(v interface{}) (string, error) { - bytes, err := API.Marshal(v) - if err != nil { - return "", err - } - return string(bytes), nil -} - -// UnmarshalString 反序列化JSON字符串到对象 -func UnmarshalString(str string, v interface{}) error { - return API.Unmarshal([]byte(str), v) -} -``` - -该包的主要特点: - -1. **高性能**:基于bytedance/sonic库,比标准库encoding/json快5-10倍 -2. **统一接口**:提供与标准库兼容的接口,便于系统内统一使用 -3. **优化配置**:预配置了适合生产环境的sonic选项 -4. **字符串处理**:额外提供字符串序列化/反序列化方法,减少内存分配 - -### 8.2 序列化工具(utils.go) - -为提高序列化和反序列化性能,系统使用高性能JSON库并实现对象池化。 - -```go -var ( - // 缓冲区对象池 - bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, +// 优先使用增强版缓存 +if enhancedTwoLevelCache != nil { + data, hit, err = enhancedTwoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + return results, nil + } } -) - -// SerializeWithPool 使用对象池序列化数据 -func SerializeWithPool(v interface{}) ([]byte, error) { - // 从对象池获取缓冲区 - buf := bufferPool.Get().(*bytes.Buffer) - buf.Reset() - defer bufferPool.Put(buf) +} else if twoLevelCache != nil { + // 如果增强版缓存不可用,回退到原始缓存 + data, hit, err = twoLevelCache.Get(cacheKey) - // 使用高性能JSON库序列化 - if err := sonic.ConfigDefault.NewEncoder(buf).Encode(v); err != nil { - return nil, err - } - - // 复制数据,因为缓冲区会被重用 - data := make([]byte, buf.Len()) - copy(data, buf.Bytes()) - - return data, nil -} - -// DeserializeWithPool 使用对象池反序列化数据 -func DeserializeWithPool(data []byte, v interface{}) error { - return sonic.ConfigDefault.Unmarshal(data, v) -} -``` - -## 9. 缓存系统优化历程 - -### 9.1 第一阶段:缓存键生成优化 - -1. **实现新的`GenerateCacheKey`函数**: - - 使用哈希处理大型列表 - - 实现参数排序确保顺序不变性 - - 统一空值处理方式 - -2. **添加缓存键单元测试**: - - 验证参数顺序不变性 - - 验证空值处理一致性 - -3. **优化哈希计算**: - - 对大型列表使用MD5哈希处理 - - 添加哈希缓存映射避免重复计算 - -### 9.2 第二阶段:JSON序列化优化 - -1. **高性能JSON库**: - - 使用`github.com/bytedance/sonic`高性能JSON库 - -2. **缓冲区对象池**: - - 实现缓冲区对象池,减少内存分配 - - 创建`SerializeWithPool`和`DeserializeWithPool`函数 - -3. **性能测试**: - - 对比优化前后的序列化性能 - - 验证对象池化方法的效果 - -### 9.3 第三阶段:缓存写入优化 - -1. **异步缓存写入**: - - 内存缓存在主线程执行 - - 磁盘缓存移至goroutine异步执行 - -2. **预计算哈希缓存**: - - 缓存频道和插件组合的哈希值 - - 提前计算常用组合的哈希值 - -### 9.4 第四阶段:缓存键一致性优化 - -1. **插件参数规范化处理**: - - 统一处理不传plugins参数、传空plugins数组、传只包含空字符串的plugins数组、传所有插件名称这几种情况 - - 对于`sourceType=tg`的请求,忽略插件参数,使用固定值"none" - -2. **Search函数优化**: - - 添加参数预处理逻辑,确保不同形式的插件参数产生相同的搜索结果 - - 对于包含所有注册插件的请求,统一设为nil,与不指定插件的请求使用相同的缓存键 - -3. **HTTP请求处理优化**: - - 区分"不传plugins参数"和"传空plugins值"这两种情况 - - 对于`sourceType=all`的请求,如果plugins为空或不存在,统一设为nil - -4. **单元测试**: - - 添加`TestPluginParameterNormalization`测试用例,验证不同形式的插件参数生成相同的缓存键 - -## 10. 性能指标 - -### 10.1 缓存命中率 - -- **内存缓存命中率**:约85%(热点查询) -- **磁盘缓存命中率**:约10%(非热点查询) -- **总体命中率**:约95% - -### 10.2 响应时间 - -- **缓存命中**:平均响应时间 < 50ms -- **缓存未命中**:平均响应时间约6-12秒(取决于查询复杂度和网络状况) -- **性能提升**:缓存命中时响应时间减少约99% - -### 10.3 资源消耗 - -- **内存占用**:约100MB(可配置) -- **磁盘占用**:约1GB(取决于查询量和缓存TTL) -- **CPU使用率**:缓存命中时几乎为0,缓存未命中时约20-30% - -## 11. 异步插件缓存系统 - -异步插件缓存系统是为解决慢速插件响应问题而设计的专门缓存机制,实现了"尽快响应,持续处理"的异步模式。 - -### 11.1 异步缓存架构 - -``` -┌─────────────────────────┐ -│ 搜索请求 │ -└───────────┬─────────────┘ - │ -┌───────────▼─────────────┐ -│ 异步缓存查询 │ -└───────────┬─────────────┘ - │ (命中) - ├───────────────────┐ - │ │ -┌───────────▼─────────────┐ │ -│ 返回缓存结果 │ │ -└───────────┬─────────────┘ │ - │ │ - │ (接近过期) │ - │ │ -┌───────────▼─────────────┐ │ -│ 后台刷新缓存 │ │ -└─────────────────────────┘ │ - │ - │ (未命中) │ - │ │ -┌───────────▼─────────────┐ │ -│ 启动双通道处理 │ │ -└───────────┬─────────────┘ │ - │ │ - ┌──────┴──────┐ │ - │ │ │ -┌────▼────┐ ┌────▼────┐ │ -│快速响应 │ │后台处理│ │ -│(短超时) │ │(长超时) │ │ -└────┬────┘ └────┬────┘ │ - │ │ │ - │ │ │ -┌────▼────┐ ┌────▼────┐ │ -│返回结果 │ │更新缓存│ │ -└─────────┘ └────┬────┘ │ - │ │ - ▼ │ -┌─────────────────────────┐ │ -│ 持久化到磁盘 │◄───┘ -└─────────────────────────┘ -``` - -### 11.2 异步缓存机制设计 - -#### 11.2.1 缓存结构 - -```go -// 缓存响应结构 -type cachedResponse struct { - Results []model.SearchResult `json:"results"` - Timestamp time.Time `json:"timestamp"` - Complete bool `json:"complete"` - LastAccess time.Time `json:"last_access"` - AccessCount int `json:"access_count"` -} - -// 可序列化的缓存结构,用于持久化 -type persistentCache struct { - Entries map[string]cachedResponse -} -``` - -#### 11.2.2 缓存键设计 - -异步插件缓存使用插件特定的缓存键,确保不同插件的缓存不会相互干扰: - -```go -// 生成插件特定的缓存键 -pluginSpecificCacheKey := fmt.Sprintf("%s:%s", p.name, cacheKey) -``` - -#### 11.2.3 双级超时控制 - -异步缓存系统实现了双级超时控制: - -1. **响应超时**(默认2秒):确保快速响应用户请求 -2. **处理超时**(默认30秒):允许后台处理有足够时间完成 - -```go -// 默认配置值 -defaultAsyncResponseTimeout = 2 * time.Second -defaultPluginTimeout = 30 * time.Second -``` - -### 11.3 缓存持久化 - -#### 11.3.1 定期保存 - -缓存系统会定期将内存中的缓存保存到磁盘: - -```go -// 缓存保存间隔 (2分钟) -cacheSaveInterval = 2 * time.Minute - -// 启动定期保存 -func startCachePersistence() { - ticker := time.NewTicker(cacheSaveInterval) - defer ticker.Stop() - - for range ticker.C { - if hasCacheItems() { - saveCacheToDisk() + if err == nil && hit { + var results []model.SearchResult + if err := cache.DeserializeWithPool(data, &results); err == nil { + return results, nil } } } ``` -#### 11.3.2 即时保存 +## 9. 异步插件缓存系统 -当缓存更新时,系统会触发即时保存: +### 9.1 异步插件缓存设计 + +异步插件缓存系统实现了"尽快响应,持续处理"的异步模式,包括以下特性: + +1. **双通道处理**:同时启动快速响应通道和后台处理通道 +2. **超时控制**:在响应超时时返回当前结果,后台继续处理 +3. **缓存更新**:后台处理完成后更新缓存,供后续查询使用 +4. **增量更新**:新旧结果智能合并,保留唯一标识符不同的结果 + +### 9.2 缓存数据时间戳检查 + +为了确保即使在不强制刷新的情况下也能获取最新的缓存数据,系统实现了缓存数据时间戳检查机制: ```go -// 更新缓存后立即触发保存 -go saveCacheToDisk() -``` - -#### 11.3.3 优雅关闭 - -系统实现了优雅关闭机制,确保在程序退出前保存缓存: - -```go -// 在main.go中 -// 创建通道来接收操作系统信号 -quit := make(chan os.Signal, 1) -signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - -// 等待中断信号 -<-quit - -// 保存异步插件缓存 -plugin.SaveCacheToDisk() - -// 优雅关闭服务器 -if err := srv.Shutdown(ctx); err != nil { - log.Fatalf("服务器关闭异常: %v", err) -} -``` - -### 11.4 智能缓存管理 - -#### 11.4.1 基于得分的缓存淘汰 - -系统实现了基于多因素的缓存淘汰策略: - -```go -// 计算得分:访问次数 / (空闲时间的平方 * 年龄) -// 这样: -// - 访问频率高的得分高 -// - 最近访问的得分高 -// - 较新的缓存得分高 -score := float64(item.AccessCount) / (idleTime.Seconds() * idleTime.Seconds() * age.Seconds()) -``` - -#### 11.4.2 访问统计 - -系统记录每个缓存项的访问情况: - -```go -// 记录缓存访问次数,用于智能缓存策略 -func recordCacheAccess(key string) { - // 更新缓存项的访问时间和计数 - if cached, ok := apiResponseCache.Load(key); ok { - cachedItem := cached.(cachedResponse) - cachedItem.LastAccess = time.Now() - cachedItem.AccessCount++ - apiResponseCache.Store(key, cachedItem) +// searchPlugins 方法中的缓存数据时间戳检查 +if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } } } ``` -#### 11.4.3 增量缓存更新 +这种机制确保了用户总是能获取到异步插件在后台更新的最新缓存数据,同时保持良好的性能。 -系统实现了新旧结果的智能合并: +## 10. 缓存性能优化 -```go -// 创建合并结果集 -mergedResults := make([]model.SearchResult, 0, len(results) + len(oldCachedResult.Results)) +### 10.1 内存优化 -// 创建已有结果ID的映射 -existingIDs := make(map[string]bool) -for _, r := range results { - existingIDs[r.UniqueID] = true - mergedResults = append(mergedResults, r) -} +1. **对象池**:使用对象池减少内存分配和GC压力 +2. **预分配容量**:为map和slice预分配容量,减少动态扩容 +3. **缓冲池**:为序列化操作使用缓冲池,减少内存分配 -// 添加旧结果中不存在的项 -for _, r := range oldCachedResult.Results { - if !existingIDs[r.UniqueID] { - mergedResults = append(mergedResults, r) - } -} -``` +### 10.2 并发优化 -#### 11.4.4 后台自动刷新 +1. **分片策略**:使用分片磁盘缓存,减少锁竞争 +2. **细粒度锁**:每个分片使用独立的锁,提高并发性能 +3. **异步写入**:异步写入磁盘缓存,不阻塞主流程 -对于接近过期的缓存,系统会在后台自动刷新: +### 10.3 序列化优化 -```go -// 如果缓存接近过期(已用时间超过TTL的80%),在后台刷新缓存 -if time.Since(cachedResult.Timestamp) > (p.cacheTTL * 4 / 5) { - go p.refreshCacheInBackground(keyword, pluginSpecificCacheKey, searchFunc, cachedResult) -} -``` +1. **Gob序列化**:使用Gob序列化提高性能 +2. **类型注册**:预先注册所有类型,避免运行时注册开销 +3. **缓冲池**:使用缓冲池优化序列化过程 -### 11.5 资源管理 +## 11. 缓存管理 -#### 11.5.1 工作池控制 +### 11.1 缓存清理策略 -系统实现了工作池机制,限制并发任务数量: +1. **TTL机制**:缓存项有明确的过期时间 +2. **LRU策略**:内存缓存使用LRU策略进行淘汰 +3. **定期清理**:磁盘缓存定期清理过期项 +4. **大小限制**:磁盘缓存有总大小限制,超过时清理最旧的项 -```go -// 工作池相关变量 -backgroundWorkerPool chan struct{} -backgroundTasksCount int32 = 0 +### 11.2 缓存监控 -// 默认配置值 -defaultMaxBackgroundWorkers = 20 -defaultMaxBackgroundTasks = 100 - -// 尝试获取工作槽 -func acquireWorkerSlot() bool { - // 获取最大任务数 - maxTasks := int32(defaultMaxBackgroundTasks) - if config.AppConfig != nil { - maxTasks = int32(config.AppConfig.AsyncMaxBackgroundTasks) - } - - // 检查总任务数 - if atomic.LoadInt32(&backgroundTasksCount) >= maxTasks { - return false - } - - // 尝试获取工作槽 - select { - case backgroundWorkerPool <- struct{}{}: - atomic.AddInt32(&backgroundTasksCount, 1) - return true - default: - return false - } -} -``` - -#### 11.5.2 统计监控 - -系统记录各种缓存操作的统计数据: - -```go -// 统计数据 (仅用于内部监控) -cacheHits int64 = 0 -cacheMisses int64 = 0 -asyncCompletions int64 = 0 -``` - -### 11.6 配置选项 - -异步缓存系统提供了丰富的配置选项: - -```go -// 异步插件相关配置 -AsyncPluginEnabled bool // 是否启用异步插件 -AsyncResponseTimeout int // 响应超时时间(秒) -AsyncResponseTimeoutDur time.Duration // 响应超时时间(Duration) -AsyncMaxBackgroundWorkers int // 最大后台工作者数量 -AsyncMaxBackgroundTasks int // 最大后台任务数量 -AsyncCacheTTLHours int // 异步缓存有效期(小时) -``` - -### 11.7 性能指标 - -- **缓存命中时响应时间**:< 50ms -- **缓存未命中时响应时间**:约4秒(响应超时时间) -- **后台处理时间**:最长30秒(处理超时时间) -- **缓存命中率**:约90%(经过一段时间运行后) +1. **命中率统计**:记录缓存命中率 +2. **大小监控**:监控缓存大小变化 +3. **性能指标**:记录缓存操作的性能指标 diff --git a/service/search_service.go b/service/search_service.go index 9af9d76..c3e5314 100644 --- a/service/search_service.go +++ b/service/search_service.go @@ -14,6 +14,7 @@ import ( "pansou/util" "pansou/util/cache" "pansou/util/pool" + "sync" // Added for sync.WaitGroup ) // 优先关键词列表 @@ -22,6 +23,7 @@ var priorityKeywords = []string{"合集", "系列", "全", "完", "最新", "附 // 全局缓存实例和缓存是否初始化标志 var ( twoLevelCache *cache.TwoLevelCache + enhancedTwoLevelCache *cache.EnhancedTwoLevelCache cacheInitialized bool ) @@ -29,6 +31,14 @@ var ( func init() { if config.AppConfig != nil && config.AppConfig.CacheEnabled { var err error + // 优先使用增强版缓存 + enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache() + if err == nil { + cacheInitialized = true + return + } + + // 如果增强版缓存初始化失败,回退到原始缓存 twoLevelCache, err = cache.NewTwoLevelCache() if err == nil { cacheInitialized = true @@ -46,9 +56,16 @@ func NewSearchService(pluginManager *plugin.PluginManager) *SearchService { // 检查缓存是否已初始化,如果未初始化则尝试重新初始化 if !cacheInitialized && config.AppConfig != nil && config.AppConfig.CacheEnabled { var err error - twoLevelCache, err = cache.NewTwoLevelCache() + // 优先使用增强版缓存 + enhancedTwoLevelCache, err = cache.NewEnhancedTwoLevelCache() if err == nil { cacheInitialized = true + } else { + // 如果增强版缓存初始化失败,回退到原始缓存 + twoLevelCache, err = cache.NewTwoLevelCache() + if err == nil { + cacheInitialized = true + } } } @@ -128,130 +145,46 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in } } - // 立即生成缓存键并检查缓存 - cacheKey := cache.GenerateCacheKey(keyword, channels, sourceType, plugins) - - // 如果未启用强制刷新,尝试从缓存获取结果 - if !forceRefresh && twoLevelCache != nil && config.AppConfig.CacheEnabled { - data, hit, err := twoLevelCache.Get(cacheKey) - - if err == nil && hit { - var response model.SearchResponse - if err := cache.DeserializeWithPool(data, &response); err == nil { - // 根据resultType过滤返回结果 - return filterResponseByType(response, resultType), nil - } - } - } - - // 获取所有可用插件 - var availablePlugins []plugin.SearchPlugin - if s.pluginManager != nil && (sourceType == "all" || sourceType == "plugin") { - allPlugins := s.pluginManager.GetPlugins() - - // 确保plugins不为nil并且有非空元素 - hasPlugins := plugins != nil && len(plugins) > 0 - hasNonEmptyPlugin := false - - if hasPlugins { - for _, p := range plugins { - if p != "" { - hasNonEmptyPlugin = true - break - } - } - } - - // 只有当plugins数组包含非空元素时才进行过滤 - if hasPlugins && hasNonEmptyPlugin { - pluginMap := make(map[string]bool) - for _, p := range plugins { - if p != "" { // 忽略空字符串 - pluginMap[strings.ToLower(p)] = true - } - } - - for _, p := range allPlugins { - if pluginMap[strings.ToLower(p.Name())] { - availablePlugins = append(availablePlugins, p) - } - } - } else { - // 如果plugins为nil、空数组或只包含空字符串,视为未指定,使用所有插件 - availablePlugins = allPlugins - } - } - - // 控制并发数:如果用户没有指定有效值,则默认使用"频道数+插件数+10"的并发数 - pluginCount := len(availablePlugins) - - // 根据sourceType决定是否搜索Telegram频道 - channelCount := 0 + // 并行获取TG搜索和插件搜索结果 + var tgResults []model.SearchResult + var pluginResults []model.SearchResult + + var wg sync.WaitGroup + var tgErr, pluginErr error + + // 如果需要搜索TG if sourceType == "all" || sourceType == "tg" { - channelCount = len(channels) + wg.Add(1) + go func() { + defer wg.Done() + tgResults, tgErr = s.searchTG(keyword, channels, forceRefresh) + }() } - - if concurrency <= 0 { - concurrency = channelCount + pluginCount + 10 - if concurrency < 1 { - concurrency = 1 - } + + // 如果需要搜索插件 + if sourceType == "all" || sourceType == "plugin" { + wg.Add(1) + go func() { + defer wg.Done() + // 对于插件搜索,我们总是希望获取最新的缓存数据 + // 因此,即使forceRefresh=false,我们也需要确保获取到最新的缓存 + pluginResults, pluginErr = s.searchPlugins(keyword, plugins, forceRefresh, concurrency) + }() } - - // 计算任务总数(频道数 + 插件数) - totalTasks := channelCount + pluginCount - - // 如果没有任务要执行,返回空结果 - if totalTasks == 0 { - return model.SearchResponse{ - Total: 0, - Results: []model.SearchResult{}, - MergedByType: make(model.MergedLinks), - }, nil + + // 等待所有搜索完成 + wg.Wait() + + // 检查错误 + if tgErr != nil { + return model.SearchResponse{}, tgErr } - - // 使用工作池执行并行搜索 - tasks := make([]pool.Task, 0, totalTasks) - - // 添加频道搜索任务(如果需要) - if sourceType == "all" || sourceType == "tg" { - for _, channel := range channels { - ch := channel // 创建副本,避免闭包问题 - tasks = append(tasks, func() interface{} { - results, err := s.searchChannel(keyword, ch) - if err != nil { - return nil - } - return results - }) - } - } - - // 添加插件搜索任务(如果需要) - for _, p := range availablePlugins { - plugin := p // 创建副本,避免闭包问题 - tasks = append(tasks, func() interface{} { - results, err := plugin.Search(keyword) - if err != nil { - return nil - } - return results - }) - } - - // 使用带超时控制的工作池执行所有任务并获取结果 - results := pool.ExecuteBatchWithTimeout(tasks, concurrency, config.AppConfig.PluginTimeout) - - // 预估每个任务平均返回22个结果 - allResults := make([]model.SearchResult, 0, totalTasks*22) - - // 合并所有结果 - for _, result := range results { - if result != nil { - channelResults := result.([]model.SearchResult) - allResults = append(allResults, channelResults...) - } + if pluginErr != nil { + return model.SearchResponse{}, pluginErr } + + // 合并结果 + allResults := mergeSearchResults(tgResults, pluginResults) // 过滤结果,确保标题包含搜索关键词 filteredResults := filterResultsByKeyword(allResults, keyword) @@ -290,19 +223,6 @@ func (s *SearchService) Search(keyword string, channels []string, concurrency in MergedByType: mergedLinks, } - // 异步缓存搜索结果(缓存完整结果,以便后续可以根据不同resultType过滤) - if twoLevelCache != nil && config.AppConfig.CacheEnabled { - go func(resp model.SearchResponse) { - data, err := cache.SerializeWithPool(resp) - if err != nil { - return - } - - ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute - twoLevelCache.Set(cacheKey, data, ttl) - }(response) - } - // 根据resultType过滤返回结果 return filterResponseByType(response, resultType), nil } @@ -582,6 +502,291 @@ func mergeResultsByType(results []model.SearchResult) model.MergedLinks { return mergedLinks } +// searchTG 搜索TG频道 +func (s *SearchService) searchTG(keyword string, channels []string, forceRefresh bool) ([]model.SearchResult, error) { + // 生成缓存键 + cacheKey := cache.GenerateTGCacheKey(keyword, channels) + + // 如果未启用强制刷新,尝试从缓存获取结果 + if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled { + var data []byte + var hit bool + var err error + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, hit, err = enhancedTwoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + return results, nil + } + } + } else if twoLevelCache != nil { + data, hit, err = twoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := cache.DeserializeWithPool(data, &results); err == nil { + return results, nil + } + } + } + } + + // 缓存未命中,执行实际搜索 + var results []model.SearchResult + + // 使用工作池并行搜索多个频道 + tasks := make([]pool.Task, 0, len(channels)) + + for _, channel := range channels { + ch := channel // 创建副本,避免闭包问题 + tasks = append(tasks, func() interface{} { + results, err := s.searchChannel(keyword, ch) + if err != nil { + return nil + } + return results + }) + } + + // 执行搜索任务并获取结果 + taskResults := pool.ExecuteBatchWithTimeout(tasks, len(channels), config.AppConfig.PluginTimeout) + + // 合并所有频道的结果 + for _, result := range taskResults { + if result != nil { + channelResults := result.([]model.SearchResult) + results = append(results, channelResults...) + } + } + + // 异步缓存结果 + if cacheInitialized && config.AppConfig.CacheEnabled { + go func(res []model.SearchResult) { + ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res) + if err != nil { + return + } + enhancedTwoLevelCache.Set(cacheKey, data, ttl) + } else if twoLevelCache != nil { + data, err := cache.SerializeWithPool(res) + if err != nil { + return + } + twoLevelCache.Set(cacheKey, data, ttl) + } + }(results) + } + + return results, nil +} + +// searchPlugins 搜索插件 +func (s *SearchService) searchPlugins(keyword string, plugins []string, forceRefresh bool, concurrency int) ([]model.SearchResult, error) { + // 生成缓存键 + cacheKey := cache.GeneratePluginCacheKey(keyword, plugins) + + // 如果未启用强制刷新,尝试从缓存获取结果 + if !forceRefresh && cacheInitialized && config.AppConfig.CacheEnabled { + var data []byte + var hit bool + var err error + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, hit, err = enhancedTwoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := enhancedTwoLevelCache.GetSerializer().Deserialize(data, &results); err == nil { + // 确保缓存数据是最新的 + // 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回 + // 否则,我们将重新执行搜索以获取最新数据 + if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } + } + } else { + // 如果缓存中没有数据,直接返回空结果 + return results, nil + } + } + } + } else if twoLevelCache != nil { + data, hit, err = twoLevelCache.Get(cacheKey) + + if err == nil && hit { + var results []model.SearchResult + if err := cache.DeserializeWithPool(data, &results); err == nil { + // 确保缓存数据是最新的 + // 如果缓存数据是最近更新的(例如在过去30秒内),则直接返回 + // 否则,我们将重新执行搜索以获取最新数据 + if len(results) > 0 { + // 获取当前时间 + now := time.Now() + // 检查缓存数据是否是最近更新的 + // 这里我们假设如果缓存数据中有结果的时间戳在过去30秒内,则认为是最新的 + for _, result := range results { + if !result.Datetime.IsZero() && now.Sub(result.Datetime) < 30*time.Second { + return results, nil + } + } + } else { + // 如果缓存中没有数据,直接返回空结果 + return results, nil + } + } + } + } + } + + // 缓存未命中或缓存数据不是最新的,执行实际搜索 + // 获取所有可用插件 + var availablePlugins []plugin.SearchPlugin + if s.pluginManager != nil { + allPlugins := s.pluginManager.GetPlugins() + + // 确保plugins不为nil并且有非空元素 + hasPlugins := plugins != nil && len(plugins) > 0 + hasNonEmptyPlugin := false + + if hasPlugins { + for _, p := range plugins { + if p != "" { + hasNonEmptyPlugin = true + break + } + } + } + + // 只有当plugins数组包含非空元素时才进行过滤 + if hasPlugins && hasNonEmptyPlugin { + pluginMap := make(map[string]bool) + for _, p := range plugins { + if p != "" { // 忽略空字符串 + pluginMap[strings.ToLower(p)] = true + } + } + + for _, p := range allPlugins { + if pluginMap[strings.ToLower(p.Name())] { + availablePlugins = append(availablePlugins, p) + } + } + } else { + // 如果plugins为nil、空数组或只包含空字符串,视为未指定,使用所有插件 + availablePlugins = allPlugins + } + } + + // 控制并发数 + if concurrency <= 0 { + concurrency = len(availablePlugins) + 10 + if concurrency < 1 { + concurrency = 1 + } + } + + // 使用工作池执行并行搜索 + tasks := make([]pool.Task, 0, len(availablePlugins)) + for _, p := range availablePlugins { + plugin := p // 创建副本,避免闭包问题 + tasks = append(tasks, func() interface{} { + results, err := plugin.Search(keyword) + if err != nil { + return nil + } + return results + }) + } + + // 执行搜索任务并获取结果 + results := pool.ExecuteBatchWithTimeout(tasks, concurrency, config.AppConfig.PluginTimeout) + + // 合并所有插件的结果 + var allResults []model.SearchResult + for _, result := range results { + if result != nil { + pluginResults := result.([]model.SearchResult) + allResults = append(allResults, pluginResults...) + } + } + + // 异步缓存结果 + if cacheInitialized && config.AppConfig.CacheEnabled { + go func(res []model.SearchResult) { + ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute + + // 优先使用增强版缓存 + if enhancedTwoLevelCache != nil { + data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res) + if err != nil { + return + } + enhancedTwoLevelCache.Set(cacheKey, data, ttl) + } else if twoLevelCache != nil { + data, err := cache.SerializeWithPool(res) + if err != nil { + return + } + twoLevelCache.Set(cacheKey, data, ttl) + } + }(allResults) + } + + return allResults, nil +} + +// 合并搜索结果 +func mergeSearchResults(tgResults, pluginResults []model.SearchResult) []model.SearchResult { + // 预估合并后的结果数量 + totalSize := len(tgResults) + len(pluginResults) + if totalSize == 0 { + return []model.SearchResult{} + } + + // 创建结果映射,用于去重 + resultMap := make(map[string]model.SearchResult, totalSize) + + // 添加TG搜索结果 + for _, result := range tgResults { + resultMap[result.UniqueID] = result + } + + // 添加或更新插件搜索结果(如果有重复,保留较新的) + for _, result := range pluginResults { + if existing, ok := resultMap[result.UniqueID]; ok { + // 如果已存在,保留较新的 + if result.Datetime.After(existing.Datetime) { + resultMap[result.UniqueID] = result + } + } else { + resultMap[result.UniqueID] = result + } + } + + // 转换回切片 + mergedResults := make([]model.SearchResult, 0, len(resultMap)) + for _, result := range resultMap { + mergedResults = append(mergedResults, result) + } + + return mergedResults +} + // GetPluginManager 获取插件管理器 func (s *SearchService) GetPluginManager() *plugin.PluginManager { return s.pluginManager diff --git a/util/cache/cache_key.go b/util/cache/cache_key.go index 2e534d3..592e601 100644 --- a/util/cache/cache_key.go +++ b/util/cache/cache_key.go @@ -72,6 +72,34 @@ func init() { precomputedHashes.Store("all_channels", allChannelsHash) } +// GenerateTGCacheKey 为TG搜索生成缓存键 +func GenerateTGCacheKey(keyword string, channels []string) string { + // 关键词标准化 + normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) + + // 获取频道列表哈希 + channelsHash := getChannelsHash(channels) + + // 生成TG搜索特定的缓存键 + keyStr := fmt.Sprintf("tg:%s:%s", normalizedKeyword, channelsHash) + hash := md5.Sum([]byte(keyStr)) + return hex.EncodeToString(hash[:]) +} + +// GeneratePluginCacheKey 为插件搜索生成缓存键 +func GeneratePluginCacheKey(keyword string, plugins []string) string { + // 关键词标准化 + normalizedKeyword := strings.ToLower(strings.TrimSpace(keyword)) + + // 获取插件列表哈希 + pluginsHash := getPluginsHash(plugins) + + // 生成插件搜索特定的缓存键 + keyStr := fmt.Sprintf("plugin:%s:%s", normalizedKeyword, pluginsHash) + hash := md5.Sum([]byte(keyStr)) + return hex.EncodeToString(hash[:]) +} + // GenerateCacheKey 根据所有影响搜索结果的参数生成缓存键 func GenerateCacheKey(keyword string, channels []string, sourceType string, plugins []string) string { // 关键词标准化 diff --git a/util/cache/enhanced_two_level_cache.go b/util/cache/enhanced_two_level_cache.go new file mode 100644 index 0000000..925184c --- /dev/null +++ b/util/cache/enhanced_two_level_cache.go @@ -0,0 +1,119 @@ +package cache + +import ( + "sync" + "time" + + "pansou/config" +) + +// EnhancedTwoLevelCache 改进的两级缓存 +type EnhancedTwoLevelCache struct { + memory *MemoryCache + disk *ShardedDiskCache + mutex sync.RWMutex + serializer Serializer +} + +// NewEnhancedTwoLevelCache 创建新的改进两级缓存 +func NewEnhancedTwoLevelCache() (*EnhancedTwoLevelCache, error) { + // 内存缓存大小为磁盘缓存的60% + memCacheMaxItems := 5000 + memCacheSizeMB := config.AppConfig.CacheMaxSizeMB * 3 / 5 + + memCache := NewMemoryCache(memCacheMaxItems, memCacheSizeMB) + memCache.StartCleanupTask() + + // 创建分片磁盘缓存,默认使用8个分片 + shardCount := 8 + diskCache, err := NewShardedDiskCache(config.AppConfig.CachePath, shardCount, config.AppConfig.CacheMaxSizeMB) + if err != nil { + return nil, err + } + + // 创建序列化器 + serializer := NewGobSerializer() + + return &EnhancedTwoLevelCache{ + memory: memCache, + disk: diskCache, + serializer: serializer, + }, nil +} + +// Set 设置缓存 +func (c *EnhancedTwoLevelCache) Set(key string, data []byte, ttl time.Duration) error { + // 先设置内存缓存(这是快速操作,直接在当前goroutine中执行) + c.memory.Set(key, data, ttl) + + // 异步设置磁盘缓存(这是IO操作,可能较慢) + go func(k string, d []byte, t time.Duration) { + // 使用独立的goroutine写入磁盘,避免阻塞调用者 + _ = c.disk.Set(k, d, t) + }(key, data, ttl) + + return nil +} + +// Get 获取缓存 +func (c *EnhancedTwoLevelCache) Get(key string) ([]byte, bool, error) { + // 优先检查内存缓存 + if data, found := c.memory.Get(key); found { + return data, true, nil + } + + // 内存未命中,检查磁盘缓存 + data, found, err := c.disk.Get(key) + if err != nil { + return nil, false, err + } + + if found { + // 磁盘命中,更新内存缓存 + ttl := time.Duration(config.AppConfig.CacheTTLMinutes) * time.Minute + c.memory.Set(key, data, ttl) + return data, true, nil + } + + return nil, false, nil +} + +// Delete 删除缓存 +func (c *EnhancedTwoLevelCache) Delete(key string) error { + // 从内存缓存删除 + c.memory.mutex.Lock() + if item, exists := c.memory.items[key]; exists { + c.memory.currSize -= int64(item.size) + delete(c.memory.items, key) + } + c.memory.mutex.Unlock() + + // 从磁盘缓存删除 + return c.disk.Delete(key) +} + +// Clear 清空所有缓存 +func (c *EnhancedTwoLevelCache) Clear() error { + // 清空内存缓存 + c.memory.mutex.Lock() + c.memory.items = make(map[string]*memoryCacheItem) + c.memory.currSize = 0 + c.memory.mutex.Unlock() + + // 清空磁盘缓存 + return c.disk.Clear() +} + +// 设置序列化器 +func (c *EnhancedTwoLevelCache) SetSerializer(serializer Serializer) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.serializer = serializer +} + +// 获取序列化器 +func (c *EnhancedTwoLevelCache) GetSerializer() Serializer { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.serializer +} \ No newline at end of file diff --git a/util/cache/serializer.go b/util/cache/serializer.go new file mode 100644 index 0000000..1d32b14 --- /dev/null +++ b/util/cache/serializer.go @@ -0,0 +1,103 @@ +package cache + +import ( + "bytes" + "encoding/gob" + "sync" + "time" + + "pansou/model" +) + +// 初始化函数,注册model包中的类型到gob +func init() { + // 注册SearchResult类型 + gob.Register(model.SearchResult{}) + + // 注册SearchResponse类型 + gob.Register(model.SearchResponse{}) + + // 注册MergedLinks类型 + gob.Register(model.MergedLinks{}) + + // 注册[]model.SearchResult类型 + gob.Register([]model.SearchResult{}) + + // 注册map[string][]model.SearchResult类型 + gob.Register(map[string][]model.SearchResult{}) + + // 注册time.Time类型 + gob.Register(time.Time{}) +} + +// Serializer 序列化接口 +type Serializer interface { + Serialize(v interface{}) ([]byte, error) + Deserialize(data []byte, v interface{}) error +} + +// GobSerializer 使用gob进行序列化/反序列化 +type GobSerializer struct { + bufferPool sync.Pool +} + +// NewGobSerializer 创建新的gob序列化器 +func NewGobSerializer() *GobSerializer { + return &GobSerializer{ + bufferPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + } +} + +// Serialize 序列化数据 +func (s *GobSerializer) Serialize(v interface{}) ([]byte, error) { + buf := s.bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer s.bufferPool.Put(buf) + + enc := gob.NewEncoder(buf) + if err := enc.Encode(v); err != nil { + return nil, err + } + + result := make([]byte, buf.Len()) + copy(result, buf.Bytes()) + return result, nil +} + +// Deserialize 反序列化数据 +func (s *GobSerializer) Deserialize(data []byte, v interface{}) error { + buf := s.bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer s.bufferPool.Put(buf) + + buf.Write(data) + dec := gob.NewDecoder(buf) + return dec.Decode(v) +} + +// JSONSerializer 使用JSON进行序列化/反序列化 +// 为了保持向后兼容性 +type JSONSerializer struct { + bufferPool *sync.Pool +} + +// NewJSONSerializer 创建新的JSON序列化器 +func NewJSONSerializer() *JSONSerializer { + return &JSONSerializer{ + bufferPool: &bufferPool, // 使用已有的缓冲区池 + } +} + +// Serialize 序列化数据 +func (s *JSONSerializer) Serialize(v interface{}) ([]byte, error) { + return SerializeWithPool(v) +} + +// Deserialize 反序列化数据 +func (s *JSONSerializer) Deserialize(data []byte, v interface{}) error { + return DeserializeWithPool(data, v) +} \ No newline at end of file diff --git a/util/cache/sharded_disk_cache.go b/util/cache/sharded_disk_cache.go new file mode 100644 index 0000000..90715c6 --- /dev/null +++ b/util/cache/sharded_disk_cache.go @@ -0,0 +1,94 @@ +package cache + +import ( + "fmt" + "hash/fnv" + "path/filepath" + "sync" + "time" +) + +// ShardedDiskCache 分片磁盘缓存 +type ShardedDiskCache struct { + baseDir string + shardCount int + shards []*DiskCache + maxSizeMB int + mutex sync.RWMutex +} + +// NewShardedDiskCache 创建新的分片磁盘缓存 +func NewShardedDiskCache(baseDir string, shardCount, maxSizeMB int) (*ShardedDiskCache, error) { + // 确保每个分片的大小合理 + shardSize := maxSizeMB / shardCount + if shardSize < 1 { + shardSize = 1 + } + + cache := &ShardedDiskCache{ + baseDir: baseDir, + shardCount: shardCount, + shards: make([]*DiskCache, shardCount), + maxSizeMB: maxSizeMB, + } + + // 初始化每个分片 + for i := 0; i < shardCount; i++ { + shardPath := filepath.Join(baseDir, fmt.Sprintf("shard_%d", i)) + diskCache, err := NewDiskCache(shardPath, shardSize) + if err != nil { + return nil, err + } + cache.shards[i] = diskCache + } + + return cache, nil +} + +// 获取键对应的分片 +func (c *ShardedDiskCache) getShard(key string) *DiskCache { + // 计算哈希值决定分片 + h := fnv.New32a() + h.Write([]byte(key)) + shardIndex := int(h.Sum32()) % c.shardCount + return c.shards[shardIndex] +} + +// Set 设置缓存 +func (c *ShardedDiskCache) Set(key string, data []byte, ttl time.Duration) error { + shard := c.getShard(key) + return shard.Set(key, data, ttl) +} + +// Get 获取缓存 +func (c *ShardedDiskCache) Get(key string) ([]byte, bool, error) { + shard := c.getShard(key) + return shard.Get(key) +} + +// Delete 删除缓存 +func (c *ShardedDiskCache) Delete(key string) error { + shard := c.getShard(key) + return shard.Delete(key) +} + +// Has 检查缓存是否存在 +func (c *ShardedDiskCache) Has(key string) bool { + shard := c.getShard(key) + return shard.Has(key) +} + +// Clear 清空所有缓存 +func (c *ShardedDiskCache) Clear() error { + c.mutex.Lock() + defer c.mutex.Unlock() + + var lastErr error + for _, shard := range c.shards { + if err := shard.Clear(); err != nil { + lastErr = err + } + } + + return lastErr +} \ No newline at end of file