fix: 修复资源自动处理的问题

This commit is contained in:
Kerwin
2025-08-25 09:51:45 +08:00
parent 0d54dffa19
commit 002267e436
10 changed files with 175 additions and 97 deletions

View File

@@ -509,6 +509,7 @@ func (r *ResourceRepositoryImpl) FindUnsyncedToMeilisearch(page, limit int) ([]e
Where("synced_to_meilisearch = ?", false).
Preload("Category").
Preload("Pan").
Preload("Tags"). // 添加Tags预加载
Order("updated_at DESC")
// 获取总数

View File

@@ -50,7 +50,7 @@ func (h *TaskHandler) CreateBatchTransferTask(c *gin.Context) {
return
}
utils.Info("创建批量转存任务: %s资源数量: %d选择账号数量: %d", req.Title, len(req.Resources), len(req.SelectedAccounts))
utils.Debug("创建批量转存任务: %s资源数量: %d选择账号数量: %d", req.Title, len(req.Resources), len(req.SelectedAccounts))
// 构建任务配置
taskConfig := map[string]interface{}{
@@ -105,7 +105,7 @@ func (h *TaskHandler) CreateBatchTransferTask(c *gin.Context) {
}
}
utils.Info("批量转存任务创建完成: %d, 共 %d 项", newTask.ID, len(req.Resources))
utils.Debug("批量转存任务创建完成: %d, 共 %d 项", newTask.ID, len(req.Resources))
SuccessResponse(c, gin.H{
"task_id": newTask.ID,
@@ -123,8 +123,6 @@ func (h *TaskHandler) StartTask(c *gin.Context) {
return
}
utils.Info("启动任务: %d", taskID)
err = h.taskManager.StartTask(uint(taskID))
if err != nil {
utils.Error("启动任务失败: %v", err)
@@ -132,6 +130,8 @@ func (h *TaskHandler) StartTask(c *gin.Context) {
return
}
utils.Debug("启动任务: %d", taskID)
SuccessResponse(c, gin.H{
"message": "任务启动成功",
})
@@ -146,8 +146,6 @@ func (h *TaskHandler) StopTask(c *gin.Context) {
return
}
utils.Info("停止任务: %d", taskID)
err = h.taskManager.StopTask(uint(taskID))
if err != nil {
utils.Error("停止任务失败: %v", err)
@@ -155,6 +153,8 @@ func (h *TaskHandler) StopTask(c *gin.Context) {
return
}
utils.Debug("停止任务: %d", taskID)
SuccessResponse(c, gin.H{
"message": "任务停止成功",
})
@@ -169,8 +169,6 @@ func (h *TaskHandler) PauseTask(c *gin.Context) {
return
}
utils.Info("暂停任务: %d", taskID)
err = h.taskManager.PauseTask(uint(taskID))
if err != nil {
utils.Error("暂停任务失败: %v", err)
@@ -178,6 +176,8 @@ func (h *TaskHandler) PauseTask(c *gin.Context) {
return
}
utils.Debug("暂停任务: %d", taskID)
SuccessResponse(c, gin.H{
"message": "任务暂停成功",
})
@@ -234,13 +234,25 @@ func (h *TaskHandler) GetTaskStatus(c *gin.Context) {
// GetTasks 获取任务列表
func (h *TaskHandler) GetTasks(c *gin.Context) {
page, _ := strconv.Atoi(c.DefaultQuery("page", "1"))
pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "10"))
taskType := c.Query("task_type")
// 获取查询参数
pageStr := c.DefaultQuery("page", "1")
pageSizeStr := c.DefaultQuery("pageSize", "10")
taskType := c.Query("taskType")
status := c.Query("status")
page, err := strconv.Atoi(pageStr)
if err != nil || page < 1 {
page = 1
}
pageSize, err := strconv.Atoi(pageSizeStr)
if err != nil || pageSize < 1 || pageSize > 100 {
pageSize = 10
}
utils.Debug("GetTasks: 获取任务列表 page=%d, pageSize=%d, taskType=%s, status=%s", page, pageSize, taskType, status)
// 获取任务列表
tasks, total, err := h.repoMgr.TaskRepository.GetList(page, pageSize, taskType, status)
if err != nil {
utils.Error("获取任务列表失败: %v", err)
@@ -250,17 +262,17 @@ func (h *TaskHandler) GetTasks(c *gin.Context) {
utils.Debug("GetTasks: 从数据库获取到 %d 个任务", len(tasks))
// 为每个任务添加运行状态
var result []gin.H
// 获取任务运行状态
var taskList []gin.H
for _, task := range tasks {
isRunning := h.taskManager.IsTaskRunning(task.ID)
utils.Debug("GetTasks: 任务 %d (%s) 数据库状态: %s, TaskManager运行状态: %v", task.ID, task.Title, task.Status, isRunning)
result = append(result, gin.H{
taskList = append(taskList, gin.H{
"id": task.ID,
"title": task.Title,
"description": task.Description,
"task_type": task.Type,
"type": task.Type,
"status": task.Status,
"total_items": task.TotalItems,
"processed_items": task.ProcessedItems,
@@ -273,10 +285,11 @@ func (h *TaskHandler) GetTasks(c *gin.Context) {
}
SuccessResponse(c, gin.H{
"items": result,
"total": total,
"page": page,
"size": pageSize,
"tasks": taskList,
"total": total,
"page": page,
"page_size": pageSize,
"total_pages": (total + int64(pageSize) - 1) / int64(pageSize),
})
}
@@ -348,7 +361,7 @@ func (h *TaskHandler) DeleteTask(c *gin.Context) {
// 检查任务是否在运行
if h.taskManager.IsTaskRunning(uint(taskID)) {
ErrorResponse(c, "任务正在运行,请先停止任务", http.StatusBadRequest)
ErrorResponse(c, "任务正在运行中,无法删除", http.StatusBadRequest)
return
}
@@ -368,7 +381,8 @@ func (h *TaskHandler) DeleteTask(c *gin.Context) {
return
}
utils.Info("任务删除成功: %d", taskID)
utils.Debug("任务删除成功: %d", taskID)
SuccessResponse(c, gin.H{
"message": "任务删除成功",
})

26
main.go
View File

@@ -7,6 +7,7 @@ import (
"strings"
"github.com/ctwj/urldb/db"
"github.com/ctwj/urldb/db/entity"
"github.com/ctwj/urldb/db/repo"
"github.com/ctwj/urldb/handlers"
"github.com/ctwj/urldb/middleware"
@@ -125,6 +126,31 @@ func main() {
// 设置全局调度器的Meilisearch管理器
scheduler.SetGlobalMeilisearchManager(meilisearchManager)
// 初始化并启动调度器
globalScheduler := scheduler.GetGlobalScheduler(
repoManager.HotDramaRepository,
repoManager.ReadyResourceRepository,
repoManager.ResourceRepository,
repoManager.SystemConfigRepository,
repoManager.PanRepository,
repoManager.CksRepository,
repoManager.TagRepository,
repoManager.CategoryRepository,
)
// 根据系统配置启动相应的调度任务
autoFetchHotDrama, _ := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoFetchHotDramaEnabled)
autoProcessReadyResources, _ := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoProcessReadyResources)
autoTransferEnabled, _ := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoTransferEnabled)
globalScheduler.UpdateSchedulerStatusWithAutoTransfer(
autoFetchHotDrama,
autoProcessReadyResources,
autoTransferEnabled,
)
utils.Info("调度器初始化完成")
// 设置公开API中间件的Repository管理器
middleware.SetRepositoryManager(repoManager)

View File

@@ -27,8 +27,8 @@ type Claims struct {
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
authHeader := c.GetHeader("Authorization")
utils.Info("AuthMiddleware - 收到请求: %s %s", c.Request.Method, c.Request.URL.Path)
utils.Info("AuthMiddleware - Authorization头: %s", authHeader)
// utils.Info("AuthMiddleware - 收到请求: %s %s", c.Request.Method, c.Request.URL.Path)
// utils.Info("AuthMiddleware - Authorization头: %s", authHeader)
if authHeader == "" {
utils.Error("AuthMiddleware - 未提供认证令牌")
@@ -39,24 +39,24 @@ func AuthMiddleware() gin.HandlerFunc {
// 检查Bearer前缀
if !strings.HasPrefix(authHeader, "Bearer ") {
utils.Error("AuthMiddleware - 无效的认证格式: %s", authHeader)
// utils.Error("AuthMiddleware - 无效的认证格式: %s", authHeader)
c.JSON(http.StatusUnauthorized, gin.H{"error": "无效的认证格式"})
c.Abort()
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
utils.Info("AuthMiddleware - 解析令牌: %s", tokenString[:10]+"...")
// utils.Info("AuthMiddleware - 解析令牌: %s", tokenString[:10]+"...")
claims, err := parseToken(tokenString)
if err != nil {
utils.Error("AuthMiddleware - 令牌解析失败: %v", err)
// utils.Error("AuthMiddleware - 令牌解析失败: %v", err)
c.JSON(http.StatusUnauthorized, gin.H{"error": "无效的令牌"})
c.Abort()
return
}
utils.Info("AuthMiddleware - 令牌验证成功,用户: %s, 角色: %s", claims.Username, claims.Role)
// utils.Info("AuthMiddleware - 令牌验证成功,用户: %s, 角色: %s", claims.Username, claims.Role)
// 将用户信息存储到上下文中
c.Set("user_id", claims.UserID)
@@ -72,13 +72,13 @@ func AdminMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
role, exists := c.Get("role")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "未认证"})
// c.JSON(http.StatusUnauthorized, gin.H{"error": "未认证"})
c.Abort()
return
}
if role != "admin" {
c.JSON(http.StatusForbidden, gin.H{"error": "需要管理员权限"})
// c.JSON(http.StatusForbidden, gin.H{"error": "需要管理员权限"})
c.Abort()
return
}
@@ -106,23 +106,23 @@ func GenerateToken(user *entity.User) (string, error) {
// parseToken 解析JWT令牌
func parseToken(tokenString string) (*Claims, error) {
utils.Info("parseToken - 开始解析令牌")
// utils.Info("parseToken - 开始解析令牌")
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return jwtSecret, nil
})
if err != nil {
utils.Error("parseToken - JWT解析失败: %v", err)
// utils.Error("parseToken - JWT解析失败: %v", err)
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
utils.Info("parseToken - 令牌解析成功用户ID: %d", claims.UserID)
// utils.Info("parseToken - 令牌解析成功用户ID: %d", claims.UserID)
return claims, nil
}
utils.Error("parseToken - 令牌无效或签名错误")
// utils.Error("parseToken - 令牌无效或签名错误")
return nil, jwt.ErrSignatureInvalid
}

View File

@@ -47,12 +47,12 @@ func (gs *GlobalScheduler) StartHotDramaScheduler() {
defer gs.mutex.Unlock()
if gs.manager.IsHotDramaRunning() {
utils.Info("热播剧定时任务已在运行中")
utils.Debug("热播剧定时任务已在运行中")
return
}
gs.manager.StartHotDramaScheduler()
utils.Info("全局调度器已启动热播剧定时任务")
utils.Debug("全局调度器已启动热播剧定时任务")
}
// StopHotDramaScheduler 停止热播剧定时任务
@@ -61,12 +61,12 @@ func (gs *GlobalScheduler) StopHotDramaScheduler() {
defer gs.mutex.Unlock()
if !gs.manager.IsHotDramaRunning() {
utils.Info("热播剧定时任务未在运行")
utils.Debug("热播剧定时任务未在运行")
return
}
gs.manager.StopHotDramaScheduler()
utils.Info("全局调度器已停止热播剧定时任务")
utils.Debug("全局调度器已停止热播剧定时任务")
}
// IsHotDramaSchedulerRunning 检查热播剧定时任务是否在运行
@@ -87,12 +87,12 @@ func (gs *GlobalScheduler) StartReadyResourceScheduler() {
defer gs.mutex.Unlock()
if gs.manager.IsReadyResourceRunning() {
utils.Info("待处理资源自动处理任务已在运行中")
utils.Debug("待处理资源自动处理任务已在运行中")
return
}
gs.manager.StartReadyResourceScheduler()
utils.Info("全局调度器已启动待处理资源自动处理任务")
utils.Debug("全局调度器已启动待处理资源自动处理任务")
}
// StopReadyResourceScheduler 停止待处理资源自动处理任务
@@ -101,12 +101,12 @@ func (gs *GlobalScheduler) StopReadyResourceScheduler() {
defer gs.mutex.Unlock()
if !gs.manager.IsReadyResourceRunning() {
utils.Info("待处理资源自动处理任务未在运行")
utils.Debug("待处理资源自动处理任务未在运行")
return
}
gs.manager.StopReadyResourceScheduler()
utils.Info("全局调度器已停止待处理资源自动处理任务")
utils.Debug("全局调度器已停止待处理资源自动处理任务")
}
// IsReadyResourceRunning 检查待处理资源自动处理任务是否在运行
@@ -122,12 +122,12 @@ func (gs *GlobalScheduler) StartAutoTransferScheduler() {
defer gs.mutex.Unlock()
if gs.manager.IsAutoTransferRunning() {
utils.Info("自动转存定时任务已在运行中")
utils.Debug("自动转存定时任务已在运行中")
return
}
gs.manager.StartAutoTransferScheduler()
utils.Info("全局调度器已启动自动转存定时任务")
utils.Debug("全局调度器已启动自动转存定时任务")
}
// StopAutoTransferScheduler 停止自动转存定时任务
@@ -136,12 +136,12 @@ func (gs *GlobalScheduler) StopAutoTransferScheduler() {
defer gs.mutex.Unlock()
if !gs.manager.IsAutoTransferRunning() {
utils.Info("自动转存定时任务未在运行")
utils.Debug("自动转存定时任务未在运行")
return
}
gs.manager.StopAutoTransferScheduler()
utils.Info("全局调度器已停止自动转存定时任务")
utils.Debug("全局调度器已停止自动转存定时任务")
}
// IsAutoTransferRunning 检查自动转存定时任务是否在运行

View File

@@ -51,34 +51,34 @@ func NewManager(
// StartAll 启动所有调度任务
func (m *Manager) StartAll() {
utils.Info("启动所有调度任务")
utils.Debug("启动所有调度任务")
// 启动热播剧调度任务
m.hotDramaScheduler.Start()
// 启动热播剧定时任务
m.StartHotDramaScheduler()
// 启动待处理资源调度任务
m.readyResourceScheduler.Start()
// 启动待处理资源自动处理任务
m.StartReadyResourceScheduler()
// 启动自动转存调度任务
m.autoTransferScheduler.Start()
// 启动自动转存定时任务
m.StartAutoTransferScheduler()
utils.Info("所有调度任务已启动")
utils.Debug("所有调度任务已启动")
}
// StopAll 停止所有调度任务
func (m *Manager) StopAll() {
utils.Info("停止所有调度任务")
utils.Debug("停止所有调度任务")
// 停止热播剧调度任务
m.hotDramaScheduler.Stop()
// 停止热播剧定时任务
m.StopHotDramaScheduler()
// 停止待处理资源调度任务
m.readyResourceScheduler.Stop()
// 停止待处理资源自动处理任务
m.StopReadyResourceScheduler()
// 停止自动转存调度任务
m.autoTransferScheduler.Stop()
// 停止自动转存定时任务
m.StopAutoTransferScheduler()
utils.Info("所有调度任务已停止")
utils.Debug("所有调度任务已停止")
}
// StartHotDramaScheduler 启动热播剧调度任务

View File

@@ -31,7 +31,7 @@ func NewReadyResourceScheduler(base *BaseScheduler) *ReadyResourceScheduler {
// Start 启动待处理资源定时任务
func (r *ReadyResourceScheduler) Start() {
if r.readyResourceRunning {
utils.Info("待处理资源自动处理任务已在运行中")
utils.Debug("待处理资源自动处理任务已在运行中")
return
}
@@ -63,7 +63,7 @@ func (r *ReadyResourceScheduler) Start() {
r.processReadyResources()
}()
} else {
utils.Info("上一次待处理资源任务还在执行中,跳过本次执行")
utils.Debug("上一次待处理资源任务还在执行中,跳过本次执行")
}
case <-r.GetStopChan():
utils.Info("停止待处理资源自动处理任务")
@@ -76,7 +76,7 @@ func (r *ReadyResourceScheduler) Start() {
// Stop 停止待处理资源定时任务
func (r *ReadyResourceScheduler) Stop() {
if !r.readyResourceRunning {
utils.Info("待处理资源自动处理任务未在运行")
utils.Debug("待处理资源自动处理任务未在运行")
return
}
@@ -92,7 +92,7 @@ func (r *ReadyResourceScheduler) IsReadyResourceRunning() bool {
// processReadyResources 处理待处理资源
func (r *ReadyResourceScheduler) processReadyResources() {
utils.Info("开始处理待处理资源...")
utils.Debug("开始处理待处理资源...")
// 检查系统配置,确认是否启用自动处理
autoProcess, err := r.systemConfigRepo.GetConfigBool(entity.ConfigKeyAutoProcessReadyResources)
@@ -102,7 +102,7 @@ func (r *ReadyResourceScheduler) processReadyResources() {
}
if !autoProcess {
utils.Info("自动处理待处理资源功能已禁用")
utils.Debug("自动处理待处理资源功能已禁用")
return
}
@@ -115,11 +115,11 @@ func (r *ReadyResourceScheduler) processReadyResources() {
}
if len(readyResources) == 0 {
utils.Info("没有待处理的资源")
utils.Debug("没有待处理的资源")
return
}
utils.Info(fmt.Sprintf("找到 %d 个待处理资源,开始处理...", len(readyResources)))
utils.Debug(fmt.Sprintf("找到 %d 个待处理资源,开始处理...", len(readyResources)))
processedCount := 0
factory := panutils.GetInstance() // 使用单例模式
@@ -132,7 +132,7 @@ func (r *ReadyResourceScheduler) processReadyResources() {
continue
}
if exits {
utils.Info(fmt.Sprintf("资源已存在: %s", readyResource.URL))
utils.Debug(fmt.Sprintf("资源已存在: %s", readyResource.URL))
r.readyResourceRepo.Delete(readyResource.ID)
continue
}
@@ -146,7 +146,7 @@ func (r *ReadyResourceScheduler) processReadyResources() {
if updateErr := r.readyResourceRepo.Update(&readyResource); updateErr != nil {
utils.Error(fmt.Sprintf("更新错误信息失败 (ID: %d): %v", readyResource.ID, updateErr))
} else {
utils.Info(fmt.Sprintf("已保存错误信息到资源 (ID: %d): %s", readyResource.ID, err.Error()))
utils.Debug(fmt.Sprintf("已保存错误信息到资源 (ID: %d): %s", readyResource.ID, err.Error()))
}
// 处理失败后删除资源,避免重复处理
@@ -155,11 +155,13 @@ func (r *ReadyResourceScheduler) processReadyResources() {
// 处理成功删除readyResource
r.readyResourceRepo.Delete(readyResource.ID)
processedCount++
utils.Info(fmt.Sprintf("成功处理资源: %s", readyResource.URL))
utils.Debug(fmt.Sprintf("成功处理资源: %s", readyResource.URL))
}
}
utils.Info(fmt.Sprintf("待处理资源处理完成,共处理 %d 个资源", processedCount))
if processedCount > 0 {
utils.Info(fmt.Sprintf("待处理资源处理完成,共处理 %d 个资源", processedCount))
}
}
// convertReadyResourceToResource 将待处理资源转换为正式资源
@@ -343,16 +345,28 @@ func (r *ReadyResourceScheduler) convertReadyResourceToResource(readyResource en
}
// 同步到Meilisearch
if globalMeilisearchManager != nil && globalMeilisearchManager.IsEnabled() {
go func() {
if err := globalMeilisearchManager.SyncResourceToMeilisearch(resource); err != nil {
utils.Error("同步资源到Meilisearch失败: %v", err)
} else {
utils.Info(fmt.Sprintf("资源已同步到Meilisearch: %s", resource.URL))
}
}()
utils.Debug(fmt.Sprintf("准备同步资源到Meilisearch - 资源ID: %d, URL: %s", resource.ID, resource.URL))
utils.Debug(fmt.Sprintf("globalMeilisearchManager: %v", globalMeilisearchManager != nil))
if globalMeilisearchManager != nil {
utils.Debug(fmt.Sprintf("Meilisearch管理器已初始化检查启用状态"))
isEnabled := globalMeilisearchManager.IsEnabled()
utils.Debug(fmt.Sprintf("Meilisearch启用状态: %v", isEnabled))
if isEnabled {
utils.Debug(fmt.Sprintf("Meilisearch已启用开始同步资源"))
go func() {
if err := globalMeilisearchManager.SyncResourceToMeilisearch(resource); err != nil {
utils.Error("同步资源到Meilisearch失败: %v", err)
} else {
utils.Info(fmt.Sprintf("资源已同步到Meilisearch: %s", resource.URL))
}
}()
} else {
utils.Debug("Meilisearch未启用跳过同步")
}
} else {
utils.Debug("Meilisearch未启用或未初始化,跳过同步")
utils.Debug("Meilisearch管理器未初始化,跳过同步")
}
return nil

View File

@@ -221,8 +221,23 @@ func (m *MeilisearchManager) GetStatusWithHealthCheck() (MeilisearchStatus, erro
// SyncResourceToMeilisearch 同步资源到Meilisearch
func (m *MeilisearchManager) SyncResourceToMeilisearch(resource *entity.Resource) error {
utils.Debug(fmt.Sprintf("开始同步资源到Meilisearch - 资源ID: %d, URL: %s", resource.ID, resource.URL))
if m.service == nil || !m.service.IsEnabled() {
return nil
utils.Debug("Meilisearch服务未初始化或未启用")
return fmt.Errorf("Meilisearch服务未初始化或未启用")
}
// 先进行健康检查
if err := m.service.HealthCheck(); err != nil {
utils.Error(fmt.Sprintf("Meilisearch健康检查失败: %v", err))
return fmt.Errorf("Meilisearch健康检查失败: %v", err)
}
// 确保索引存在
if err := m.service.CreateIndex(); err != nil {
utils.Error(fmt.Sprintf("创建Meilisearch索引失败: %v", err))
return fmt.Errorf("创建Meilisearch索引失败: %v", err)
}
doc := m.convertResourceToDocument(resource)

View File

@@ -197,27 +197,35 @@ func (m *MeilisearchService) UpdateIndexSettings() error {
// BatchAddDocuments 批量添加文档
func (m *MeilisearchService) BatchAddDocuments(docs []MeilisearchDocument) error {
utils.Debug(fmt.Sprintf("开始批量添加文档到Meilisearch - 文档数量: %d", len(docs)))
if !m.enabled {
return nil
utils.Debug("Meilisearch未启用跳过批量添加")
return fmt.Errorf("Meilisearch未启用")
}
if len(docs) == 0 {
utils.Debug("文档列表为空,跳过批量添加")
return nil
}
// 转换为interface{}切片
var documents []interface{}
for _, doc := range docs {
for i, doc := range docs {
utils.Debug(fmt.Sprintf("转换文档 %d - ID: %d, 标题: %s", i+1, doc.ID, doc.Title))
documents = append(documents, doc)
}
utils.Debug(fmt.Sprintf("开始调用Meilisearch API添加 %d 个文档", len(documents)))
// 批量添加文档
_, err := m.index.AddDocuments(documents, nil)
if err != nil {
return fmt.Errorf("批量添加文档失败: %v", err)
utils.Error(fmt.Sprintf("Meilisearch批量添加文档失败: %v", err))
return fmt.Errorf("Meilisearch批量添加文档失败: %v", err)
}
utils.Debug("批量添加 %d 个文档到Meilisearch成功", len(docs))
utils.Debug(fmt.Sprintf("成功批量添加 %d 个文档到Meilisearch", len(docs)))
return nil
}

View File

@@ -39,7 +39,7 @@ func (tm *TaskManager) RegisterProcessor(processor TaskProcessor) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.processors[processor.GetTaskType()] = processor
utils.Info("注册任务处理器: %s", processor.GetTaskType())
utils.Debug("注册任务处理器: %s", processor.GetTaskType())
}
// getRegisteredProcessors 获取已注册的处理器列表(用于调试)
@@ -56,11 +56,11 @@ func (tm *TaskManager) StartTask(taskID uint) error {
tm.mu.Lock()
defer tm.mu.Unlock()
utils.Info("StartTask: 尝试启动任务 %d", taskID)
utils.Debug("StartTask: 尝试启动任务 %d", taskID)
// 检查任务是否已在运行
if _, exists := tm.running[taskID]; exists {
utils.Info("任务 %d 已在运行中", taskID)
utils.Debug("任务 %d 已在运行中", taskID)
return fmt.Errorf("任务 %d 已在运行中", taskID)
}
@@ -71,7 +71,7 @@ func (tm *TaskManager) StartTask(taskID uint) error {
return fmt.Errorf("获取任务失败: %v", err)
}
utils.Info("StartTask: 获取到任务 %d, 类型: %s, 状态: %s", task.ID, task.Type, task.Status)
utils.Debug("StartTask: 获取到任务 %d, 类型: %s, 状态: %s", task.ID, task.Type, task.Status)
// 获取处理器
processor, exists := tm.processors[string(task.Type)]
@@ -80,13 +80,13 @@ func (tm *TaskManager) StartTask(taskID uint) error {
return fmt.Errorf("未找到任务类型 %s 的处理器", task.Type)
}
utils.Info("StartTask: 找到处理器 %s", task.Type)
utils.Debug("StartTask: 找到处理器 %s", task.Type)
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
tm.running[taskID] = cancel
utils.Info("StartTask: 启动后台任务协程")
utils.Debug("StartTask: 启动后台任务协程")
// 启动后台任务
go tm.processTask(ctx, task, processor)
@@ -189,10 +189,10 @@ func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, proce
tm.mu.Lock()
delete(tm.running, task.ID)
tm.mu.Unlock()
utils.Info("processTask: 任务 %d 处理完成,清理资源", task.ID)
utils.Debug("processTask: 任务 %d 处理完成,清理资源", task.ID)
}()
utils.Info("processTask: 开始处理任务: %d, 类型: %s", task.ID, task.Type)
utils.Debug("processTask: 开始处理任务: %d, 类型: %s", task.ID, task.Type)
// 更新任务状态为运行中
err := tm.repoMgr.TaskRepository.UpdateStatus(task.ID, "running")
@@ -230,7 +230,7 @@ func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, proce
// 如果当前批次有处理中的任务项重置它们为pending状态服务器重启恢复
if processingItems > 0 {
utils.Info("任务 %d 发现 %d 个处理中的任务项重置为pending状态", task.ID, processingItems)
utils.Debug("任务 %d 发现 %d 个处理中的任务项重置为pending状态", task.ID, processingItems)
err = tm.repoMgr.TaskItemRepository.ResetProcessingItems(task.ID)
if err != nil {
utils.Error("重置处理中任务项失败: %v", err)
@@ -249,13 +249,13 @@ func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, proce
successItems := completedItems
failedItems := initialFailedItems
utils.Info("任务 %d 统计信息: 总计=%d, 已完成=%d, 已失败=%d, 待处理=%d",
utils.Debug("任务 %d 统计信息: 总计=%d, 已完成=%d, 已失败=%d, 待处理=%d",
task.ID, totalItems, completedItems, failedItems, currentBatchItems)
for _, item := range items {
select {
case <-ctx.Done():
utils.Info("任务 %d 被取消", task.ID)
utils.Debug("任务 %d 被取消", task.ID)
return
default:
// 处理单个任务项