优化缓冲区策略初始化;退出时保存内存缓存到磁盘

This commit is contained in:
www.xueximeng.com
2025-08-07 13:14:35 +08:00
parent 6f0f0396d0
commit ad318390ba
10 changed files with 330 additions and 59 deletions

View File

@@ -102,10 +102,10 @@ cd pansou
| CACHE_MAX_SIZE | 最大缓存大小(MB) | `100` |
| PLUGIN_TIMEOUT | 插件超时时间(秒) | `30` |
| ASYNC_RESPONSE_TIMEOUT | 快速响应超时(秒) | `4` |
| ASYNC_LOG_ENABLED** | 异步插件详细日志 | `true` |
| ASYNC_LOG_ENABLED | 异步插件详细日志 | `true` |
| CACHE_PATH | 缓存文件路径 | `./cache` |
| SHARD_COUNT | 缓存分片数量 | `8` |
| CACHE_WRITE_STRATEGY | 缓存写入策略 | `hybrid` |
| CACHE_WRITE_STRATEGY | 缓存写入策略(immediate/hybrid) | `hybrid` |
| ENABLE_COMPRESSION | 是否启用压缩 | `false` |
| MIN_SIZE_TO_COMPRESS | 最小压缩阈值(字节) | `1024` |
| GC_PERCENT | Go GC触发百分比 | `100` |

18
main.go
View File

@@ -74,20 +74,19 @@ func initApp() {
if err := globalCacheWriteManager.Initialize(); err != nil {
log.Fatalf("缓存写入管理器初始化失败: %v", err)
}
fmt.Println("✅ 缓存写入管理器已初始化")
// 🔗 将缓存写入管理器注入到service包
// 将缓存写入管理器注入到service包
service.SetGlobalCacheWriteManager(globalCacheWriteManager)
// 🔗 设置缓存写入管理器的主缓存更新函数
// 延迟设置主缓存更新函数确保service初始化完成
go func() {
// 等待一小段时间确保service包完全初始化
time.Sleep(100 * time.Millisecond)
if mainCache := service.GetEnhancedTwoLevelCache(); mainCache != nil {
globalCacheWriteManager.SetMainCacheUpdater(func(key string, data []byte, ttl time.Duration) error {
return mainCache.SetBothLevels(key, data, ttl)
})
fmt.Println("✅ 主缓存更新函数已设置")
} else {
fmt.Println("⚠️ 主缓存实例不可用,稍后将重试设置")
}
}()
// 确保异步插件系统初始化
plugin.InitAsyncPluginSystem()
@@ -169,6 +168,11 @@ func startServer() {
}
}
// 强制同步内存缓存到磁盘
if mainCache := service.GetEnhancedTwoLevelCache(); mainCache != nil {
mainCache.FlushMemoryToDisk()
}
// 设置关闭超时时间
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

View File

@@ -3,8 +3,12 @@ package fox4k
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
@@ -13,23 +17,39 @@ import (
"time"
"github.com/PuerkitoBio/goquery"
"golang.org/x/net/proxy"
"pansou/model"
"pansou/plugin"
)
// 常量定义
const (
// 基础URL
BaseURL = "https://4kfox.com"
// BaseURL = "https://btnull.pro/"
// BaseURL = "https://www.4kdy.vip/"
// 搜索URL格式
SearchURL = "https://www.4kfox.com/search/%s-------------.html"
SearchURL = BaseURL + "/search/%s-------------.html"
// 分页搜索URL格式
SearchPageURL = "https://www.4kfox.com/search/%s----------%d---.html"
SearchPageURL = BaseURL + "/search/%s----------%d---.html"
// 详情页URL格式
DetailURL = "https://www.4kfox.com/video/%s.html"
DetailURL = BaseURL + "/video/%s.html"
// 默认超时时间 - 优化为更短时间
DefaultTimeout = 8 * time.Second
// 默认超时时间 - 增加超时时间避免网络慢的问题
DefaultTimeout = 15 * time.Second
// 代理配置
DefaultHTTPProxy = "http://154.219.110.34:51422"
DefaultSocks5Proxy = "socks5://154.219.110.34:51423"
// 调试开关 - 默认关闭
DebugMode = false
// 代理开关 - 默认关闭
ProxyEnabled = false
// 并发数限制 - 大幅提高并发数
MaxConcurrency = 50
@@ -113,8 +133,8 @@ type Fox4kPlugin struct {
optimizedClient *http.Client
}
// createOptimizedHTTPClient 创建优化的HTTP客户端
func createOptimizedHTTPClient() *http.Client {
// createProxyTransport 创建支持代理的传输层
func createProxyTransport(proxyURL string) (*http.Transport, error) {
transport := &http.Transport{
MaxIdleConns: MaxIdleConns,
MaxIdleConnsPerHost: MaxIdleConnsPerHost,
@@ -126,6 +146,55 @@ func createOptimizedHTTPClient() *http.Client {
ReadBufferSize: 16 * 1024,
}
if proxyURL == "" {
return transport, nil
}
if strings.HasPrefix(proxyURL, "socks5://") {
// SOCKS5代理
dialer, err := proxy.SOCKS5("tcp", strings.TrimPrefix(proxyURL, "socks5://"), nil, proxy.Direct)
if err != nil {
return nil, fmt.Errorf("创建SOCKS5代理失败: %w", err)
}
transport.Dial = dialer.Dial
debugPrintf("🔧 [Fox4k DEBUG] 使用SOCKS5代理: %s\n", proxyURL)
} else {
// HTTP代理
parsedURL, err := url.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("解析代理URL失败: %w", err)
}
transport.Proxy = http.ProxyURL(parsedURL)
debugPrintf("🔧 [Fox4k DEBUG] 使用HTTP代理: %s\n", proxyURL)
}
return transport, nil
}
// createOptimizedHTTPClient 创建优化的HTTP客户端支持代理
func createOptimizedHTTPClient() *http.Client {
var selectedProxy string
if ProxyEnabled {
// 随机选择代理类型
proxyTypes := []string{"", DefaultHTTPProxy, DefaultSocks5Proxy}
selectedProxy = proxyTypes[rand.Intn(len(proxyTypes))]
} else {
// 代理未启用,使用直连
selectedProxy = ""
debugPrintf("🔧 [Fox4k DEBUG] 代理功能已禁用,使用直连模式\n")
}
transport, err := createProxyTransport(selectedProxy)
if err != nil {
debugPrintf("❌ [Fox4k DEBUG] 创建代理传输层失败: %v使用直连\n", err)
transport, _ = createProxyTransport("")
}
if selectedProxy == "" && ProxyEnabled {
debugPrintf("🔧 [Fox4k DEBUG] 使用直连模式\n")
}
return &http.Client{
Transport: transport,
Timeout: DefaultTimeout,
@@ -140,6 +209,13 @@ func NewFox4kPlugin() *Fox4kPlugin {
}
}
// debugPrintf 调试输出函数
func debugPrintf(format string, args ...interface{}) {
if DebugMode {
fmt.Printf(format, args...)
}
}
// 初始化插件
func init() {
plugin.RegisterGlobalPlugin(NewFox4kPlugin())
@@ -171,11 +247,27 @@ func (p *Fox4kPlugin) Search(keyword string, ext map[string]interface{}) ([]mode
// SearchWithResult 执行搜索并返回包含IsFinal标记的结果
func (p *Fox4kPlugin) SearchWithResult(keyword string, ext map[string]interface{}) (model.PluginSearchResult, error) {
return p.AsyncSearchWithResult(keyword, p.searchImpl, p.MainCacheKey, ext)
debugPrintf("🔧 [Fox4k DEBUG] SearchWithResult 开始 - keyword: %s, MainCacheKey: '%s'\n", keyword, p.MainCacheKey)
result, err := p.AsyncSearchWithResult(keyword, p.searchImpl, p.MainCacheKey, ext)
debugPrintf("🔧 [Fox4k DEBUG] SearchWithResult 完成 - 结果数: %d, IsFinal: %v, 错误: %v\n",
len(result.Results), result.IsFinal, err)
if len(result.Results) > 0 {
debugPrintf("🔧 [Fox4k DEBUG] 前3个结果示例:\n")
for i, r := range result.Results {
if i >= 3 { break }
debugPrintf(" %d. 标题: %s, 链接数: %d\n", i+1, r.Title, len(r.Links))
}
}
return result, err
}
// searchImpl 实现具体的搜索逻辑(支持分页)
func (p *Fox4kPlugin) searchImpl(client *http.Client, keyword string, ext map[string]interface{}) ([]model.SearchResult, error) {
debugPrintf("🔧 [Fox4k DEBUG] searchImpl 开始执行 - keyword: %s\n", keyword)
startTime := time.Now()
atomic.AddInt64(&searchRequests, 1)
@@ -187,7 +279,7 @@ func (p *Fox4kPlugin) searchImpl(client *http.Client, keyword string, ext map[st
encodedKeyword := url.QueryEscape(keyword)
allResults := make([]model.SearchResult, 0)
// 1. 搜索第一页
// 1. 搜索第一页,获取总页数
firstPageResults, totalPages, err := p.searchPage(client, encodedKeyword, 1)
if err != nil {
return nil, err
@@ -237,11 +329,18 @@ func (p *Fox4kPlugin) searchImpl(client *http.Client, keyword string, ext map[st
searchDuration := time.Since(startTime)
atomic.AddInt64(&totalSearchTime, int64(searchDuration))
debugPrintf("🔧 [Fox4k DEBUG] searchImpl 完成 - 原始结果: %d, 过滤后结果: %d, 耗时: %v\n",
len(allResults), len(results), searchDuration)
return results, nil
}
// searchPage 搜索指定页面
func (p *Fox4kPlugin) searchPage(client *http.Client, encodedKeyword string, page int) ([]model.SearchResult, int, error) {
debugPrintf("🔧 [Fox4k DEBUG] searchPage 开始 - 第%d页, keyword: %s\n", page, encodedKeyword)
// 1. 构建搜索URL
var searchURL string
if page == 1 {
@@ -250,6 +349,8 @@ func (p *Fox4kPlugin) searchPage(client *http.Client, encodedKeyword string, pag
searchURL = fmt.Sprintf(SearchPageURL, encodedKeyword, page)
}
debugPrintf("🔧 [Fox4k DEBUG] 构建的URL: %s\n", searchURL)
// 2. 创建带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancel()
@@ -260,29 +361,91 @@ func (p *Fox4kPlugin) searchPage(client *http.Client, encodedKeyword string, pag
return nil, 0, fmt.Errorf("[%s] 创建请求失败: %w", p.Name(), err)
}
// 4. 设置完整的请求头
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
// 4. 设置完整的请求头包含随机UA和IP
randomUA := getRandomUA()
randomIP := generateRandomIP()
req.Header.Set("User-Agent", randomUA)
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8")
req.Header.Set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8")
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Upgrade-Insecure-Requests", "1")
req.Header.Set("Cache-Control", "max-age=0")
req.Header.Set("Referer", "https://www.4kfox.com/")
req.Header.Set("Referer", BaseURL+"/")
req.Header.Set("X-Forwarded-For", randomIP)
req.Header.Set("X-Real-IP", randomIP)
req.Header.Set("sec-ch-ua-platform", "macOS")
debugPrintf("🔧 [Fox4k DEBUG] 使用随机UA: %s\n", randomUA)
debugPrintf("🔧 [Fox4k DEBUG] 使用随机IP: %s\n", randomIP)
// 5. 发送HTTP请求
debugPrintf("🔧 [Fox4k DEBUG] 开始发送HTTP请求到: %s\n", searchURL)
debugPrintf("🔧 [Fox4k DEBUG] 请求头信息:\n")
if DebugMode {
for key, values := range req.Header {
for _, value := range values {
debugPrintf(" %s: %s\n", key, value)
}
}
}
startTime := time.Now()
resp, err := p.doRequestWithRetry(req, client)
requestDuration := time.Since(startTime)
if err != nil {
debugPrintf("❌ [Fox4k DEBUG] HTTP请求失败 (耗时: %v): %v\n", requestDuration, err)
debugPrintf("❌ [Fox4k DEBUG] 错误类型分析:\n")
if netErr, ok := err.(*url.Error); ok {
fmt.Printf(" URL错误: %v\n", netErr.Err)
if netErr.Timeout() {
fmt.Printf(" -> 这是超时错误\n")
}
if netErr.Temporary() {
fmt.Printf(" -> 这是临时错误\n")
}
}
return nil, 0, fmt.Errorf("[%s] 第%d页搜索请求失败: %w", p.Name(), page, err)
}
defer resp.Body.Close()
debugPrintf("✅ [Fox4k DEBUG] HTTP请求成功 (耗时: %v)\n", requestDuration)
// 6. 检查状态码
debugPrintf("🔧 [Fox4k DEBUG] HTTP响应状态码: %d\n", resp.StatusCode)
if resp.StatusCode != 200 {
debugPrintf("❌ [Fox4k DEBUG] 状态码异常: %d\n", resp.StatusCode)
return nil, 0, fmt.Errorf("[%s] 第%d页请求返回状态码: %d", p.Name(), page, resp.StatusCode)
}
// 7. 解析HTML响应
doc, err := goquery.NewDocumentFromReader(resp.Body)
// 7. 读取并打印HTML响应
htmlBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, 0, fmt.Errorf("[%s] 第%d页读取响应失败: %w", p.Name(), page, err)
}
htmlContent := string(htmlBytes)
debugPrintf("🔧 [Fox4k DEBUG] 第%d页 HTML长度: %d bytes\n", page, len(htmlContent))
// 保存HTML到文件仅在调试模式下
if DebugMode {
htmlDir := "./html"
os.MkdirAll(htmlDir, 0755)
filename := fmt.Sprintf("fox4k_page_%d_%s.html", page, strings.ReplaceAll(encodedKeyword, "%", "_"))
filepath := filepath.Join(htmlDir, filename)
err = os.WriteFile(filepath, htmlBytes, 0644)
if err != nil {
debugPrintf("❌ [Fox4k DEBUG] 保存HTML文件失败: %v\n", err)
} else {
debugPrintf("✅ [Fox4k DEBUG] HTML已保存到: %s\n", filepath)
}
}
// 解析HTML响应
doc, err := goquery.NewDocumentFromReader(strings.NewReader(htmlContent))
if err != nil {
return nil, 0, fmt.Errorf("[%s] 第%d页HTML解析失败: %w", p.Name(), page, err)
}
@@ -336,7 +499,7 @@ func (p *Fox4kPlugin) parseSearchResultItem(s *goquery.Selection) *model.SearchR
// 补全URL
if strings.HasPrefix(href, "/") {
href = "https://www.4kfox.com" + href
href = BaseURL + href
}
// 提取ID
@@ -357,7 +520,7 @@ func (p *Fox4kPlugin) parseSearchResultItem(s *goquery.Selection) *model.SearchR
imgElement := s.Find(".hl-item-thumb")
imageURL, _ := imgElement.Attr("data-original")
if imageURL != "" && strings.HasPrefix(imageURL, "/") {
imageURL = "https://www.4kfox.com" + imageURL
imageURL = BaseURL + imageURL
}
// 获取资源状态
@@ -542,7 +705,7 @@ func (p *Fox4kPlugin) getDetailInfo(id string, client *http.Client) *detailPageR
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8")
req.Header.Set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8")
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Referer", "https://www.4kfox.com/")
req.Header.Set("Referer", BaseURL+"/")
// 发送请求
resp, err := client.Do(req)
@@ -575,7 +738,7 @@ func (p *Fox4kPlugin) getDetailInfo(id string, client *http.Client) *detailPageR
imgElement := doc.Find(".hl-dc-pic .hl-item-thumb")
if imageURL, exists := imgElement.Attr("data-original"); exists && imageURL != "" {
if strings.HasPrefix(imageURL, "/") {
imageURL = "https://www.4kfox.com" + imageURL
imageURL = BaseURL + imageURL
}
detail.ImageURL = imageURL
}
@@ -829,26 +992,86 @@ func (p *Fox4kPlugin) doRequestWithRetry(req *http.Request, client *http.Client)
maxRetries := 3
var lastErr error
debugPrintf("🔄 [Fox4k DEBUG] 开始重试机制 - 最大重试次数: %d\n", maxRetries)
for i := 0; i < maxRetries; i++ {
debugPrintf("🔄 [Fox4k DEBUG] 第 %d/%d 次尝试\n", i+1, maxRetries)
if i > 0 {
// 指数退避重试
backoff := time.Duration(1<<uint(i-1)) * 200 * time.Millisecond
debugPrintf("⏳ [Fox4k DEBUG] 等待 %v 后重试\n", backoff)
time.Sleep(backoff)
}
// 克隆请求避免并发问题
reqClone := req.Clone(req.Context())
attemptStart := time.Now()
resp, err := client.Do(reqClone)
if err == nil && resp.StatusCode == 200 {
attemptDuration := time.Since(attemptStart)
debugPrintf("🔧 [Fox4k DEBUG] 第 %d 次尝试耗时: %v\n", i+1, attemptDuration)
if err != nil {
debugPrintf("❌ [Fox4k DEBUG] 第 %d 次尝试失败: %v\n", i+1, err)
lastErr = err
continue
}
debugPrintf("🔧 [Fox4k DEBUG] 第 %d 次尝试获得响应 - 状态码: %d\n", i+1, resp.StatusCode)
if resp.StatusCode == 200 {
debugPrintf("✅ [Fox4k DEBUG] 第 %d 次尝试成功!\n", i+1)
return resp, nil
}
if resp != nil {
debugPrintf("❌ [Fox4k DEBUG] 第 %d 次尝试状态码异常: %d\n", i+1, resp.StatusCode)
// 读取响应体以便调试
if resp.Body != nil {
bodyBytes, readErr := io.ReadAll(resp.Body)
resp.Body.Close()
if readErr == nil && len(bodyBytes) > 0 {
bodyPreview := string(bodyBytes)
if len(bodyPreview) > 200 {
bodyPreview = bodyPreview[:200] + "..."
}
debugPrintf("🔧 [Fox4k DEBUG] 响应体预览: %s\n", bodyPreview)
}
lastErr = err
}
lastErr = fmt.Errorf("状态码 %d", resp.StatusCode)
}
debugPrintf("❌ [Fox4k DEBUG] 所有重试都失败了!\n")
return nil, fmt.Errorf("重试 %d 次后仍然失败: %w", maxRetries, lastErr)
}
// getRandomUA 获取随机User-Agent
func getRandomUA() string {
userAgents := []string{
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
}
return userAgents[rand.Intn(len(userAgents))]
}
// generateRandomIP 生成随机IP地址
func generateRandomIP() string {
// 生成随机的私有IP地址段
segments := [][]int{
{192, 168, rand.Intn(256), rand.Intn(256)},
{10, rand.Intn(256), rand.Intn(256), rand.Intn(256)},
{172, 16 + rand.Intn(16), rand.Intn(256), rand.Intn(256)},
}
segment := segments[rand.Intn(len(segments))]
return fmt.Sprintf("%d.%d.%d.%d", segment[0], segment[1], segment[2], segment[3])
}

View File

@@ -205,12 +205,11 @@ func NewSearchService(pluginManager *plugin.PluginManager) *SearchService {
// 将主缓存注入到异步插件中
injectMainCacheToAsyncPlugins(pluginManager, enhancedTwoLevelCache)
// 🔗 确保缓存写入管理器设置了主缓存更新函数
// 确保缓存写入管理器设置了主缓存更新函数
if globalCacheWriteManager != nil && enhancedTwoLevelCache != nil {
globalCacheWriteManager.SetMainCacheUpdater(func(key string, data []byte, ttl time.Duration) error {
return enhancedTwoLevelCache.SetBothLevels(key, data, ttl)
})
fmt.Println("✅ 主缓存更新函数已设置 (在SearchService中)")
}
return &SearchService{
@@ -1286,16 +1285,12 @@ func (s *SearchService) searchPlugins(keyword string, plugins []string, forceRef
if enhancedTwoLevelCache != nil {
data, err := enhancedTwoLevelCache.GetSerializer().Serialize(res)
if err != nil {
fmt.Printf("❌ [主程序] 缓存序列化失败: %s | 错误: %v\n", key, err)
return
}
// 主程序最后更新,覆盖可能有问题的异步插件缓存
enhancedTwoLevelCache.Set(key, data, ttl)
if config.AppConfig != nil && config.AppConfig.AsyncLogEnabled {
fmt.Printf("📝 [主程序] 缓存更新完成: %s | 结果数: %d",
key, len(res))
}
// 主程序最后更新,覆盖可能有问题的异步插件缓存(同步写入确保持久化)
enhancedTwoLevelCache.SetBothLevels(key, data, ttl)
}
}(allResults, keyword, cacheKey)
}

View File

@@ -301,7 +301,7 @@ func (b *BufferStatusMonitor) Start(globalManager *GlobalBufferManager) {
return // 已经在监控中
}
fmt.Printf("🔍 [缓冲区状态监控器] 启动监控,间隔: %v\n", b.monitorInterval)
// 状态监控器启动(静默)
go b.monitoringLoop(globalManager)
go b.healthCheckLoop()

View File

@@ -366,7 +366,7 @@ func (m *DelayedBatchWriteManager) Initialize() error {
// 🔍 启动全局缓冲区监控
go m.globalBufferMonitor()
fmt.Printf("🚀 [缓存写入管理器] 初始化完成,策略: %s\n", m.strategy)
fmt.Printf("缓存写入策略: %s\n", m.strategy)
return nil
}
@@ -605,7 +605,7 @@ func (m *DelayedBatchWriteManager) Shutdown(timeout time.Duration) error {
return nil // 已经关闭
}
fmt.Println("🔄 [缓存写入管理器] 正在保存缓存数据...")
// 正在保存缓存数据(静默)
// 关闭后台处理器
close(m.shutdownChan)
@@ -641,7 +641,7 @@ func (m *DelayedBatchWriteManager) Shutdown(timeout time.Duration) error {
if err != nil {
return fmt.Errorf("数据保存失败: %v", err)
}
fmt.Println("✅ [缓存写入管理器] 缓存数据已安全保存")
// 缓存数据已安全保存(静默)
return nil
case <-ctx.Done():
return fmt.Errorf("数据保存超时")

View File

@@ -3,6 +3,7 @@ package cache
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@@ -134,6 +135,11 @@ func (c *DiskCache) Set(key string, data []byte, ttl time.Duration) error {
filename := c.getFilename(key)
filePath := filepath.Join(c.path, filename)
// 确保目录存在(防止外部删除缓存目录)
if err := os.MkdirAll(c.path, 0755); err != nil {
return fmt.Errorf("创建缓存目录失败: %v", err)
}
// 写入文件
if err := ioutil.WriteFile(filePath, data, 0644); err != nil {
return err

View File

@@ -77,12 +77,8 @@ func (c *EnhancedTwoLevelCache) SetBothLevels(key string, data []byte, ttl time.
// 同步更新内存缓存
c.memory.SetWithTimestamp(key, data, ttl, now)
// 步更新磁盘缓存
go func(k string, d []byte, t time.Duration) {
_ = c.disk.Set(k, d, t)
}(key, data, ttl)
return nil
// 步更新磁盘缓存(确保数据持久化)
return c.disk.Set(key, data, ttl)
}
// SetWithFinalFlag 根据结果状态选择更新策略
@@ -147,3 +143,23 @@ func (c *EnhancedTwoLevelCache) GetSerializer() Serializer {
defer c.mutex.RUnlock()
return c.serializer
}
// FlushMemoryToDisk 将内存中的所有缓存项同步到磁盘
func (c *EnhancedTwoLevelCache) FlushMemoryToDisk() {
c.mutex.Lock()
defer c.mutex.Unlock()
// 获取所有内存缓存项
items := c.memory.GetAllItems()
if len(items) == 0 {
return
}
for key, item := range items {
ttl := time.Until(item.Expiry)
if ttl > 0 {
_ = c.disk.Set(key, item.Data, ttl)
}
}
}

View File

@@ -116,10 +116,9 @@ type GlobalBufferStats struct {
// NewGlobalBufferManager 创建全局缓冲区管理器
func NewGlobalBufferManager(strategy GlobalBufferStrategy) *GlobalBufferManager {
// 高并发优化:强制使用插件策略,避免缓冲区爆炸
// 高并发优化:静默使用插件策略,避免缓冲区爆炸
if strategy == BufferHybrid {
strategy = BufferByPlugin
fmt.Printf("⚠️ [缓冲区优化] 检测到混合策略,自动切换为插件策略以支持高并发\n")
}
manager := &GlobalBufferManager{
@@ -154,7 +153,7 @@ func (g *GlobalBufferManager) Initialize() error {
// 启动状态监控
go g.statusMonitor.Start(g)
fmt.Printf("🚀 [全局缓冲区管理器] 初始化完成,策略: %s\n", g.strategy)
// 初始化完成(静默)
return nil
}
@@ -450,8 +449,7 @@ func (g *GlobalBufferManager) Shutdown() error {
totalOperations += len(ops)
}
fmt.Printf("🔄 [全局缓冲区管理器] 关闭完成,刷新%d个缓冲区%d个操作\n",
len(flushedBuffers), totalOperations)
return nil
}

View File

@@ -348,3 +348,32 @@ func (c *ShardedMemoryCache) getDiskCacheReference() *ShardedDiskCache {
defer c.diskCacheMutex.RUnlock()
return c.diskCache
}
// MemoryCacheItem 内存缓存项
type MemoryCacheItem struct {
Data []byte
Expiry time.Time
}
// GetAllItems 获取所有未过期的缓存项
func (c *ShardedMemoryCache) GetAllItems() map[string]*MemoryCacheItem {
result := make(map[string]*MemoryCacheItem)
now := time.Now()
for _, shard := range c.shards {
shard.mutex.RLock()
for key, item := range shard.items {
if item.expiry.After(now) {
result[key] = &MemoryCacheItem{
Data: item.data,
Expiry: item.expiry,
}
}
}
shard.mutex.RUnlock()
}
return result
}