Files
urldb/services/meilisearch_manager.go

828 lines
23 KiB
Go
Raw Normal View History

2025-08-20 15:03:14 +08:00
package services
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/ctwj/urldb/db/entity"
"github.com/ctwj/urldb/db/repo"
"github.com/ctwj/urldb/utils"
)
// MeilisearchManager Meilisearch管理器
type MeilisearchManager struct {
service *MeilisearchService
repoMgr *repo.RepositoryManager
configRepo repo.SystemConfigRepository
mutex sync.RWMutex
status MeilisearchStatus
stopChan chan struct{}
isRunning bool
// 同步进度控制
syncMutex sync.RWMutex
syncProgress SyncProgress
isSyncing bool
syncStopChan chan struct{}
}
// SyncProgress 同步进度
type SyncProgress struct {
IsRunning bool `json:"is_running"`
TotalCount int64 `json:"total_count"`
ProcessedCount int64 `json:"processed_count"`
SyncedCount int64 `json:"synced_count"`
FailedCount int64 `json:"failed_count"`
StartTime time.Time `json:"start_time"`
EstimatedTime string `json:"estimated_time"`
CurrentBatch int `json:"current_batch"`
TotalBatches int `json:"total_batches"`
ErrorMessage string `json:"error_message"`
}
// MeilisearchStatus Meilisearch状态
type MeilisearchStatus struct {
Enabled bool `json:"enabled"`
Healthy bool `json:"healthy"`
LastCheck time.Time `json:"last_check"`
ErrorCount int `json:"error_count"`
LastError string `json:"last_error"`
DocumentCount int64 `json:"document_count"`
}
// NewMeilisearchManager 创建Meilisearch管理器
func NewMeilisearchManager(repoMgr *repo.RepositoryManager) *MeilisearchManager {
return &MeilisearchManager{
repoMgr: repoMgr,
stopChan: make(chan struct{}),
syncStopChan: make(chan struct{}),
status: MeilisearchStatus{
Enabled: false,
Healthy: false,
LastCheck: time.Now(),
},
}
}
// Initialize 初始化Meilisearch服务
func (m *MeilisearchManager) Initialize() error {
m.mutex.Lock()
defer m.mutex.Unlock()
// 设置configRepo
m.configRepo = m.repoMgr.SystemConfigRepository
// 获取配置
enabled, err := m.configRepo.GetConfigBool(entity.ConfigKeyMeilisearchEnabled)
if err != nil {
utils.Error("获取Meilisearch启用状态失败: %v", err)
return err
}
if !enabled {
utils.Debug("Meilisearch未启用清理服务状态")
m.status.Enabled = false
m.service = nil
// 停止监控循环
if m.stopChan != nil {
close(m.stopChan)
m.stopChan = make(chan struct{})
}
return nil
}
host, err := m.configRepo.GetConfigValue(entity.ConfigKeyMeilisearchHost)
if err != nil {
utils.Error("获取Meilisearch主机配置失败: %v", err)
return err
}
port, err := m.configRepo.GetConfigValue(entity.ConfigKeyMeilisearchPort)
if err != nil {
utils.Error("获取Meilisearch端口配置失败: %v", err)
return err
}
masterKey, err := m.configRepo.GetConfigValue(entity.ConfigKeyMeilisearchMasterKey)
if err != nil {
utils.Error("获取Meilisearch主密钥配置失败: %v", err)
return err
}
indexName, err := m.configRepo.GetConfigValue(entity.ConfigKeyMeilisearchIndexName)
if err != nil {
utils.Error("获取Meilisearch索引名配置失败: %v", err)
return err
}
m.service = NewMeilisearchService(host, port, masterKey, indexName, enabled)
m.status.Enabled = enabled
// 如果启用,创建索引并更新设置
if enabled {
utils.Debug("Meilisearch已启用创建索引并更新设置")
// 创建索引
if err := m.service.CreateIndex(); err != nil {
utils.Error("创建Meilisearch索引失败: %v", err)
}
// 更新索引设置
if err := m.service.UpdateIndexSettings(); err != nil {
utils.Error("更新Meilisearch索引设置失败: %v", err)
}
// 立即进行一次健康检查
go func() {
m.checkHealth()
// 启动监控
go m.monitorLoop()
}()
} else {
utils.Debug("Meilisearch未启用")
}
utils.Debug("Meilisearch服务初始化完成")
return nil
}
// IsEnabled 检查是否启用
func (m *MeilisearchManager) IsEnabled() bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.status.Enabled
}
// ReloadConfig 重新加载配置
func (m *MeilisearchManager) ReloadConfig() error {
utils.Debug("重新加载Meilisearch配置")
return m.Initialize()
}
// GetService 获取Meilisearch服务
func (m *MeilisearchManager) GetService() *MeilisearchService {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.service
}
// GetStatus 获取状态
func (m *MeilisearchManager) GetStatus() (MeilisearchStatus, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
utils.Debug("获取Meilisearch状态 - 启用状态: %v, 健康状态: %v, 服务实例: %v", m.status.Enabled, m.status.Healthy, m.service != nil)
if m.service != nil && m.service.IsEnabled() {
utils.Debug("Meilisearch服务已初始化且启用尝试获取索引统计")
// 获取索引统计
stats, err := m.service.GetIndexStats()
if err != nil {
utils.Error("获取Meilisearch索引统计失败: %v", err)
// 即使获取统计失败,也返回当前状态
} else {
utils.Debug("Meilisearch索引统计: %+v", stats)
// 更新文档数量
if count, ok := stats["numberOfDocuments"].(float64); ok {
m.status.DocumentCount = int64(count)
utils.Debug("文档数量 (float64): %d", int64(count))
} else if count, ok := stats["numberOfDocuments"].(int64); ok {
m.status.DocumentCount = count
utils.Debug("文档数量 (int64): %d", count)
} else if count, ok := stats["numberOfDocuments"].(int); ok {
m.status.DocumentCount = int64(count)
utils.Debug("文档数量 (int): %d", int64(count))
} else {
utils.Error("无法解析文档数量,类型: %T, 值: %v", stats["numberOfDocuments"], stats["numberOfDocuments"])
}
// 不更新启用状态,保持配置中的状态
// 启用状态应该由配置控制,而不是由服务状态控制
}
} else {
utils.Debug("Meilisearch服务未初始化或未启用 - service: %v, enabled: %v", m.service != nil, m.service != nil && m.service.IsEnabled())
}
return m.status, nil
}
// GetStatusWithHealthCheck 获取状态并同时进行健康检查
func (m *MeilisearchManager) GetStatusWithHealthCheck() (MeilisearchStatus, error) {
// 先进行健康检查
m.checkHealth()
// 然后获取状态
return m.GetStatus()
}
// SyncResourceToMeilisearch 同步资源到Meilisearch
func (m *MeilisearchManager) SyncResourceToMeilisearch(resource *entity.Resource) error {
2025-08-25 09:51:45 +08:00
utils.Debug(fmt.Sprintf("开始同步资源到Meilisearch - 资源ID: %d, URL: %s", resource.ID, resource.URL))
2025-08-20 15:03:14 +08:00
if m.service == nil || !m.service.IsEnabled() {
2025-08-25 09:51:45 +08:00
utils.Debug("Meilisearch服务未初始化或未启用")
return fmt.Errorf("Meilisearch服务未初始化或未启用")
}
// 先进行健康检查
if err := m.service.HealthCheck(); err != nil {
utils.Error(fmt.Sprintf("Meilisearch健康检查失败: %v", err))
return fmt.Errorf("Meilisearch健康检查失败: %v", err)
}
// 确保索引存在
if err := m.service.CreateIndex(); err != nil {
utils.Error(fmt.Sprintf("创建Meilisearch索引失败: %v", err))
return fmt.Errorf("创建Meilisearch索引失败: %v", err)
2025-08-20 15:03:14 +08:00
}
2025-10-09 17:52:49 +08:00
// 重新加载资源及其关联数据确保Tags被正确加载
resourcesWithRelations, err := m.repoMgr.ResourceRepository.FindByIDs([]uint{resource.ID})
if err != nil {
utils.Error(fmt.Sprintf("重新加载资源失败: %v", err))
return fmt.Errorf("重新加载资源失败: %v", err)
}
if len(resourcesWithRelations) == 0 {
utils.Error(fmt.Sprintf("资源未找到: %d", resource.ID))
return fmt.Errorf("资源未找到: %d", resource.ID)
}
resourceWithRelations := resourcesWithRelations[0]
doc := m.convertResourceToDocument(&resourceWithRelations)
// 添加调试日志,记录标签数量
utils.Debug(fmt.Sprintf("资源ID %d 的标签数量: %d", resource.ID, len(resourceWithRelations.Tags)))
for i, tag := range resourceWithRelations.Tags {
utils.Debug(fmt.Sprintf(" 标签 %d: ID=%d, Name=%s", i+1, tag.ID, tag.Name))
}
// 验证转换后的文档
utils.Debug(fmt.Sprintf("转换后的文档标签数量: %d", len(doc.Tags)))
if len(doc.Tags) > 0 {
utils.Debug(fmt.Sprintf("转换后的文档标签内容: %v", doc.Tags))
}
err = m.service.BatchAddDocuments([]MeilisearchDocument{doc})
2025-08-20 15:03:14 +08:00
if err != nil {
return err
}
// 标记为已同步
return m.repoMgr.ResourceRepository.MarkAsSyncedToMeilisearch([]uint{resource.ID})
}
// SyncAllResources 同步所有资源
func (m *MeilisearchManager) SyncAllResources() (int, error) {
if m.service == nil || !m.service.IsEnabled() {
return 0, fmt.Errorf("Meilisearch未启用")
}
// 检查是否已经在同步中
m.syncMutex.Lock()
if m.isSyncing {
m.syncMutex.Unlock()
return 0, fmt.Errorf("同步操作正在进行中")
}
// 初始化同步状态
m.isSyncing = true
m.syncProgress = SyncProgress{
IsRunning: true,
TotalCount: 0,
ProcessedCount: 0,
SyncedCount: 0,
FailedCount: 0,
StartTime: time.Now(),
CurrentBatch: 0,
TotalBatches: 0,
ErrorMessage: "",
}
// 重新创建停止通道
m.syncStopChan = make(chan struct{})
m.syncMutex.Unlock()
// 在goroutine中执行同步避免阻塞
go func() {
defer func() {
m.syncMutex.Lock()
m.isSyncing = false
m.syncProgress.IsRunning = false
m.syncMutex.Unlock()
}()
m.syncAllResourcesInternal()
}()
return 0, nil
}
// DebugGetAllDocuments 调试:获取所有文档
func (m *MeilisearchManager) DebugGetAllDocuments() error {
if m.service == nil || !m.service.IsEnabled() {
return fmt.Errorf("Meilisearch未启用")
}
utils.Debug("开始调试获取Meilisearch中的所有文档")
_, err := m.service.GetAllDocuments()
if err != nil {
utils.Error("调试获取所有文档失败: %v", err)
return err
}
utils.Debug("调试完成:已获取所有文档")
return nil
}
// syncAllResourcesInternal 内部同步方法
func (m *MeilisearchManager) syncAllResourcesInternal() {
// 健康检查
if err := m.service.HealthCheck(); err != nil {
m.updateSyncProgress("", "", fmt.Sprintf("Meilisearch不可用: %v", err))
return
}
// 创建索引
if err := m.service.CreateIndex(); err != nil {
m.updateSyncProgress("", "", fmt.Sprintf("创建索引失败: %v", err))
return
}
utils.Debug("开始同步所有资源到Meilisearch...")
// 获取总资源数量
totalCount, err := m.repoMgr.ResourceRepository.CountUnsyncedToMeilisearch()
if err != nil {
m.updateSyncProgress("", "", fmt.Sprintf("获取资源总数失败: %v", err))
return
}
// 分批处理
batchSize := 100
totalBatches := int((totalCount + int64(batchSize) - 1) / int64(batchSize))
// 更新总数量和总批次
m.syncMutex.Lock()
m.syncProgress.TotalCount = totalCount
m.syncProgress.TotalBatches = totalBatches
m.syncMutex.Unlock()
offset := 0
totalSynced := 0
currentBatch := 0
// 预加载所有分类和平台数据到缓存
categoryCache := make(map[uint]string)
panCache := make(map[uint]string)
// 获取所有分类
categories, err := m.repoMgr.CategoryRepository.FindAll()
if err == nil {
for _, category := range categories {
categoryCache[category.ID] = category.Name
}
}
// 获取所有平台
pans, err := m.repoMgr.PanRepository.FindAll()
if err == nil {
for _, pan := range pans {
panCache[pan.ID] = pan.Name
}
}
for {
// 检查是否需要停止
select {
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
default:
}
currentBatch++
// 获取一批资源在goroutine中执行避免阻塞
resourcesChan := make(chan []entity.Resource, 1)
errChan := make(chan error, 1)
go func() {
// 直接查询未同步的资源,不使用分页
resources, _, err := m.repoMgr.ResourceRepository.FindUnsyncedToMeilisearch(1, batchSize)
if err != nil {
errChan <- err
return
}
resourcesChan <- resources
}()
// 等待数据库查询结果或停止信号(添加超时)
select {
case resources := <-resourcesChan:
if len(resources) == 0 {
utils.Info("资源同步完成,总共同步 %d 个资源", totalSynced)
return
}
// 检查是否需要停止
select {
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
default:
}
2025-10-09 17:52:49 +08:00
// 转换为Meilisearch文档确保Tags被正确加载
2025-08-20 15:03:14 +08:00
var docs []MeilisearchDocument
for _, resource := range resources {
2025-10-09 17:52:49 +08:00
utils.Debug(fmt.Sprintf("批量同步开始处理资源 %d标签数量: %d", resource.ID, len(resource.Tags)))
// 使用带缓存的转换方法但传入的资源已经预加载了Tags数据
2025-08-20 15:03:14 +08:00
doc := m.convertResourceToDocumentWithCache(&resource, categoryCache, panCache)
docs = append(docs, doc)
2025-10-09 17:52:49 +08:00
utils.Debug(fmt.Sprintf("批量同步资源 %d 处理完成,最终标签数量: %d", resource.ID, len(doc.Tags)))
2025-08-20 15:03:14 +08:00
}
// 检查是否需要停止
select {
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
default:
}
// 批量添加到Meilisearch在goroutine中执行避免阻塞
meilisearchErrChan := make(chan error, 1)
go func() {
err := m.service.BatchAddDocuments(docs)
meilisearchErrChan <- err
}()
// 等待Meilisearch操作结果或停止信号添加超时
select {
case err := <-meilisearchErrChan:
if err != nil {
m.updateSyncProgress("", "", fmt.Sprintf("批量添加文档失败: %v", err))
return
}
case <-time.After(60 * time.Second): // 60秒超时
m.updateSyncProgress("", "", "Meilisearch操作超时")
utils.Error("Meilisearch操作超时")
return
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
}
// 检查是否需要停止
select {
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
default:
}
// 标记为已同步在goroutine中执行避免阻塞
var resourceIDs []uint
for _, resource := range resources {
resourceIDs = append(resourceIDs, resource.ID)
}
markErrChan := make(chan error, 1)
go func() {
err := m.repoMgr.ResourceRepository.MarkAsSyncedToMeilisearch(resourceIDs)
markErrChan <- err
}()
// 等待标记操作结果或停止信号(添加超时)
select {
case err := <-markErrChan:
if err != nil {
utils.Error("标记资源同步状态失败: %v", err)
}
case <-time.After(30 * time.Second): // 30秒超时
utils.Error("标记资源同步状态超时")
case <-m.syncStopChan:
utils.Debug("同步操作被停止")
return
}
totalSynced += len(docs)
offset += len(resources)
// 更新进度
m.updateSyncProgress(fmt.Sprintf("%d", totalSynced), fmt.Sprintf("%d", currentBatch), "")
utils.Debug("已同步 %d 个资源到Meilisearch (批次 %d/%d)", totalSynced, currentBatch, totalBatches)
// 检查是否已经同步完所有资源
if len(resources) == 0 {
utils.Info("资源同步完成,总共同步 %d 个资源", totalSynced)
return
}
case <-time.After(30 * time.Second): // 30秒超时
m.updateSyncProgress("", "", "数据库查询超时")
utils.Error("数据库查询超时")
return
case err := <-errChan:
m.updateSyncProgress("", "", fmt.Sprintf("获取资源失败: %v", err))
return
case <-m.syncStopChan:
utils.Info("同步操作被停止")
return
}
// 避免过于频繁的请求
time.Sleep(100 * time.Millisecond)
}
utils.Info("资源同步完成,总共同步 %d 个资源", totalSynced)
}
// updateSyncProgress 更新同步进度
func (m *MeilisearchManager) updateSyncProgress(syncedCount, currentBatch, errorMessage string) {
m.syncMutex.Lock()
defer m.syncMutex.Unlock()
if syncedCount != "" {
if count, err := strconv.ParseInt(syncedCount, 10, 64); err == nil {
m.syncProgress.SyncedCount = count
}
}
if currentBatch != "" {
if batch, err := strconv.Atoi(currentBatch); err == nil {
m.syncProgress.CurrentBatch = batch
}
}
if errorMessage != "" {
m.syncProgress.ErrorMessage = errorMessage
m.syncProgress.IsRunning = false
}
// 计算预估时间
if m.syncProgress.SyncedCount > 0 {
elapsed := time.Since(m.syncProgress.StartTime)
rate := float64(m.syncProgress.SyncedCount) / elapsed.Seconds()
if rate > 0 {
remaining := float64(m.syncProgress.TotalCount-m.syncProgress.SyncedCount) / rate
m.syncProgress.EstimatedTime = fmt.Sprintf("%.0f秒", remaining)
}
}
}
// GetUnsyncedCount 获取未同步资源数量
func (m *MeilisearchManager) GetUnsyncedCount() (int64, error) {
// 直接查询未同步的资源数量
return m.repoMgr.ResourceRepository.CountUnsyncedToMeilisearch()
}
// GetUnsyncedResources 获取未同步的资源
func (m *MeilisearchManager) GetUnsyncedResources(page, pageSize int) ([]entity.Resource, int64, error) {
// 查询未同步到Meilisearch的资源
return m.repoMgr.ResourceRepository.FindUnsyncedToMeilisearch(page, pageSize)
}
// GetSyncedResources 获取已同步的资源
func (m *MeilisearchManager) GetSyncedResources(page, pageSize int) ([]entity.Resource, int64, error) {
// 查询已同步到Meilisearch的资源
return m.repoMgr.ResourceRepository.FindSyncedToMeilisearch(page, pageSize)
}
// GetAllResources 获取所有资源
func (m *MeilisearchManager) GetAllResources(page, pageSize int) ([]entity.Resource, int64, error) {
// 查询所有资源
return m.repoMgr.ResourceRepository.FindAllWithPagination(page, pageSize)
}
// GetSyncProgress 获取同步进度
func (m *MeilisearchManager) GetSyncProgress() SyncProgress {
m.syncMutex.RLock()
defer m.syncMutex.RUnlock()
return m.syncProgress
}
// StopSync 停止同步
func (m *MeilisearchManager) StopSync() {
m.syncMutex.Lock()
defer m.syncMutex.Unlock()
if m.isSyncing {
// 发送停止信号
select {
case <-m.syncStopChan:
// 通道已经关闭,不需要再次关闭
default:
close(m.syncStopChan)
}
m.isSyncing = false
m.syncProgress.IsRunning = false
m.syncProgress.ErrorMessage = "同步已停止"
utils.Debug("同步操作已停止")
}
}
// ClearIndex 清空索引
func (m *MeilisearchManager) ClearIndex() error {
if m.service == nil || !m.service.IsEnabled() {
return fmt.Errorf("Meilisearch未启用")
}
// 清空Meilisearch索引
if err := m.service.ClearIndex(); err != nil {
return err
}
// 标记所有资源为未同步
return m.repoMgr.ResourceRepository.MarkAllAsUnsyncedToMeilisearch()
}
// convertResourceToDocument 转换资源为搜索文档
func (m *MeilisearchManager) convertResourceToDocument(resource *entity.Resource) MeilisearchDocument {
// 获取关联数据
var categoryName string
if resource.CategoryID != nil {
category, err := m.repoMgr.CategoryRepository.FindByID(*resource.CategoryID)
if err == nil {
categoryName = category.Name
}
}
var panName string
if resource.PanID != nil {
pan, err := m.repoMgr.PanRepository.FindByID(*resource.PanID)
if err == nil {
panName = pan.Name
}
}
// 获取标签 - 从关联的Tags字段获取
var tagNames []string
2025-10-09 17:52:49 +08:00
if len(resource.Tags) > 0 {
utils.Debug(fmt.Sprintf("处理资源 %d 的 %d 个标签", resource.ID, len(resource.Tags)))
for i, tag := range resource.Tags {
if tag.Name != "" {
utils.Debug(fmt.Sprintf("标签 %d: ID=%d, Name='%s'", i+1, tag.ID, tag.Name))
tagNames = append(tagNames, tag.Name)
} else {
utils.Debug(fmt.Sprintf("标签 %d: ID=%d, Name为空跳过", i+1, tag.ID))
}
2025-08-20 15:03:14 +08:00
}
2025-10-09 17:52:49 +08:00
} else {
utils.Debug(fmt.Sprintf("资源 %d 没有关联的标签", resource.ID))
2025-08-20 15:03:14 +08:00
}
2025-10-09 17:52:49 +08:00
utils.Debug(fmt.Sprintf("资源 %d 最终标签数量: %d", resource.ID, len(tagNames)))
2025-08-20 15:03:14 +08:00
return MeilisearchDocument{
ID: resource.ID,
Title: resource.Title,
Description: resource.Description,
URL: resource.URL,
SaveURL: resource.SaveURL,
FileSize: resource.FileSize,
Key: resource.Key,
Category: categoryName,
Tags: tagNames,
PanName: panName,
PanID: resource.PanID,
Author: resource.Author,
2025-10-14 16:37:11 +08:00
Cover: resource.Cover,
2025-08-20 15:03:14 +08:00
CreatedAt: resource.CreatedAt,
UpdatedAt: resource.UpdatedAt,
}
}
// convertResourceToDocumentWithCache 转换资源为搜索文档(使用缓存)
func (m *MeilisearchManager) convertResourceToDocumentWithCache(resource *entity.Resource, categoryCache map[uint]string, panCache map[uint]string) MeilisearchDocument {
// 从缓存获取关联数据
var categoryName string
if resource.CategoryID != nil {
if name, exists := categoryCache[*resource.CategoryID]; exists {
categoryName = name
}
}
var panName string
if resource.PanID != nil {
if name, exists := panCache[*resource.PanID]; exists {
panName = name
}
}
// 获取标签 - 从关联的Tags字段获取
var tagNames []string
2025-10-09 17:52:49 +08:00
if len(resource.Tags) > 0 {
utils.Debug(fmt.Sprintf("批量同步处理资源 %d 的 %d 个标签", resource.ID, len(resource.Tags)))
for i, tag := range resource.Tags {
if tag.Name != "" {
utils.Debug(fmt.Sprintf("批量同步标签 %d: ID=%d, Name='%s'", i+1, tag.ID, tag.Name))
tagNames = append(tagNames, tag.Name)
} else {
utils.Debug(fmt.Sprintf("批量同步标签 %d: ID=%d, Name为空跳过", i+1, tag.ID))
}
2025-08-20 15:03:14 +08:00
}
2025-10-09 17:52:49 +08:00
} else {
utils.Debug(fmt.Sprintf("批量同步资源 %d 没有关联的标签", resource.ID))
2025-08-20 15:03:14 +08:00
}
2025-10-09 17:52:49 +08:00
utils.Debug(fmt.Sprintf("批量同步资源 %d 最终标签数量: %d", resource.ID, len(tagNames)))
2025-08-20 15:03:14 +08:00
return MeilisearchDocument{
ID: resource.ID,
Title: resource.Title,
Description: resource.Description,
URL: resource.URL,
SaveURL: resource.SaveURL,
FileSize: resource.FileSize,
Key: resource.Key,
Category: categoryName,
Tags: tagNames,
PanName: panName,
PanID: resource.PanID,
Author: resource.Author,
2025-10-14 16:37:11 +08:00
Cover: resource.Cover,
2025-08-20 15:03:14 +08:00
CreatedAt: resource.CreatedAt,
UpdatedAt: resource.UpdatedAt,
}
}
// monitorLoop 监控循环
func (m *MeilisearchManager) monitorLoop() {
if m.isRunning {
return
}
m.isRunning = true
ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.checkHealth()
case <-m.stopChan:
return
}
}
}
// checkHealth 检查健康状态
func (m *MeilisearchManager) checkHealth() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.status.LastCheck = time.Now()
utils.Debug("开始健康检查 - 服务实例: %v, 启用状态: %v", m.service != nil, m.service != nil && m.service.IsEnabled())
if m.service == nil || !m.service.IsEnabled() {
utils.Debug("Meilisearch服务未初始化或未启用")
m.status.Healthy = false
m.status.LastError = "Meilisearch未启用"
return
}
utils.Debug("开始检查Meilisearch健康状态")
if err := m.service.HealthCheck(); err != nil {
m.status.Healthy = false
m.status.ErrorCount++
m.status.LastError = err.Error()
utils.Error("Meilisearch健康检查失败: %v", err)
} else {
m.status.Healthy = true
m.status.ErrorCount = 0
m.status.LastError = ""
utils.Debug("Meilisearch健康检查成功")
// 健康检查通过后,更新文档数量
if stats, err := m.service.GetIndexStats(); err == nil {
if count, ok := stats["numberOfDocuments"].(float64); ok {
m.status.DocumentCount = int64(count)
} else if count, ok := stats["numberOfDocuments"].(int64); ok {
m.status.DocumentCount = count
} else if count, ok := stats["numberOfDocuments"].(int); ok {
m.status.DocumentCount = int64(count)
}
}
}
}
// Stop 停止监控
func (m *MeilisearchManager) Stop() {
if !m.isRunning {
return
}
close(m.stopChan)
m.isRunning = false
utils.Debug("Meilisearch监控服务已停止")
}