diff --git a/db/repo/resource_repository.go b/db/repo/resource_repository.go index a7d2d73..2f8fcb6 100644 --- a/db/repo/resource_repository.go +++ b/db/repo/resource_repository.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ctwj/urldb/db/entity" - "gorm.io/gorm" ) @@ -32,7 +31,9 @@ type ResourceRepository interface { FindExists(url string, excludeID ...uint) (bool, error) BatchFindByURLs(urls []string) ([]entity.Resource, error) GetResourcesForTransfer(panID uint, sinceTime time.Time, limit int) ([]*entity.Resource, error) - CreateResourceTag(resourceID, tagID uint) error + GetByURL(url string) (*entity.Resource, error) + UpdateSaveURL(id uint, saveURL string) error + CreateResourceTag(resourceTag *entity.ResourceTag) error } // ResourceRepositoryImpl Resource的Repository实现 @@ -432,11 +433,22 @@ func (r *ResourceRepositoryImpl) GetResourcesForTransfer(panID uint, sinceTime t return resources, nil } -// CreateResourceTag 创建资源与标签的关联 -func (r *ResourceRepositoryImpl) CreateResourceTag(resourceID, tagID uint) error { - resourceTag := &entity.ResourceTag{ - ResourceID: resourceID, - TagID: tagID, +// GetByURL 根据URL获取资源 +func (r *ResourceRepositoryImpl) GetByURL(url string) (*entity.Resource, error) { + var resource entity.Resource + err := r.GetDB().Where("url = ?", url).First(&resource).Error + if err != nil { + return nil, err } + return &resource, nil +} + +// UpdateSaveURL 更新资源的转存链接 +func (r *ResourceRepositoryImpl) UpdateSaveURL(id uint, saveURL string) error { + return r.GetDB().Model(&entity.Resource{}).Where("id = ?", id).Update("save_url", saveURL).Error +} + +// CreateResourceTag 创建资源与标签的关联 +func (r *ResourceRepositoryImpl) CreateResourceTag(resourceTag *entity.ResourceTag) error { return r.GetDB().Create(resourceTag).Error } diff --git a/db/repo/task_item_repository.go b/db/repo/task_item_repository.go index b4b382e..25796af 100644 --- a/db/repo/task_item_repository.go +++ b/db/repo/task_item_repository.go @@ -7,52 +7,82 @@ import ( // TaskItemRepository 任务项仓库接口 type TaskItemRepository interface { - BaseRepository[entity.TaskItem] - FindByTaskID(taskID uint) ([]entity.TaskItem, error) - FindByTaskIDWithPagination(taskID uint, page, pageSize int) ([]entity.TaskItem, int64, error) - UpdateStatus(id uint, status entity.TaskItemStatus, errorMsg string) error - UpdateSuccess(id uint, outputData string) error - GetPendingItemsByTaskID(taskID uint) ([]entity.TaskItem, error) - BatchCreate(items []entity.TaskItem) error + GetByID(id uint) (*entity.TaskItem, error) + Create(item *entity.TaskItem) error + Delete(id uint) error + DeleteByTaskID(taskID uint) error + GetByTaskIDAndStatus(taskID uint, status string) ([]*entity.TaskItem, error) + GetListByTaskID(taskID uint, page, pageSize int, status string) ([]*entity.TaskItem, int64, error) + UpdateStatus(id uint, status string) error + UpdateStatusAndOutput(id uint, status, outputData string) error + GetStatsByTaskID(taskID uint) (map[string]int, error) } // TaskItemRepositoryImpl 任务项仓库实现 type TaskItemRepositoryImpl struct { - BaseRepositoryImpl[entity.TaskItem] + db *gorm.DB } // NewTaskItemRepository 创建任务项仓库 func NewTaskItemRepository(db *gorm.DB) TaskItemRepository { return &TaskItemRepositoryImpl{ - BaseRepositoryImpl: BaseRepositoryImpl[entity.TaskItem]{db: db}, + db: db, } } -// FindByTaskID 根据任务ID查找所有任务项 -func (r *TaskItemRepositoryImpl) FindByTaskID(taskID uint) ([]entity.TaskItem, error) { - var items []entity.TaskItem - err := r.db.Where("task_id = ?", taskID).Order("created_at ASC").Find(&items).Error +// GetByID 根据ID获取任务项 +func (r *TaskItemRepositoryImpl) GetByID(id uint) (*entity.TaskItem, error) { + var item entity.TaskItem + err := r.db.First(&item, id).Error + if err != nil { + return nil, err + } + return &item, nil +} + +// Create 创建任务项 +func (r *TaskItemRepositoryImpl) Create(item *entity.TaskItem) error { + return r.db.Create(item).Error +} + +// Delete 删除任务项 +func (r *TaskItemRepositoryImpl) Delete(id uint) error { + return r.db.Delete(&entity.TaskItem{}, id).Error +} + +// DeleteByTaskID 根据任务ID删除所有任务项 +func (r *TaskItemRepositoryImpl) DeleteByTaskID(taskID uint) error { + return r.db.Where("task_id = ?", taskID).Delete(&entity.TaskItem{}).Error +} + +// GetByTaskIDAndStatus 根据任务ID和状态获取任务项 +func (r *TaskItemRepositoryImpl) GetByTaskIDAndStatus(taskID uint, status string) ([]*entity.TaskItem, error) { + var items []*entity.TaskItem + err := r.db.Where("task_id = ? AND status = ?", taskID, status).Order("id ASC").Find(&items).Error return items, err } -// FindByTaskIDWithPagination 根据任务ID分页查找任务项 -func (r *TaskItemRepositoryImpl) FindByTaskIDWithPagination(taskID uint, page, pageSize int) ([]entity.TaskItem, int64, error) { - var items []entity.TaskItem +// GetListByTaskID 根据任务ID分页获取任务项 +func (r *TaskItemRepositoryImpl) GetListByTaskID(taskID uint, page, pageSize int, status string) ([]*entity.TaskItem, int64, error) { + var items []*entity.TaskItem var total int64 + query := r.db.Model(&entity.TaskItem{}).Where("task_id = ?", taskID) + + // 添加状态过滤 + if status != "" { + query = query.Where("status = ?", status) + } + // 获取总数 - err := r.db.Model(&entity.TaskItem{}).Where("task_id = ?", taskID).Count(&total).Error + err := query.Count(&total).Error if err != nil { return nil, 0, err } // 分页查询 offset := (page - 1) * pageSize - err = r.db.Where("task_id = ?", taskID). - Order("created_at ASC"). - Offset(offset). - Limit(pageSize). - Find(&items).Error + err = query.Offset(offset).Limit(pageSize).Order("item_index ASC").Find(&items).Error if err != nil { return nil, 0, err } @@ -61,43 +91,47 @@ func (r *TaskItemRepositoryImpl) FindByTaskIDWithPagination(taskID uint, page, p } // UpdateStatus 更新任务项状态 -func (r *TaskItemRepositoryImpl) UpdateStatus(id uint, status entity.TaskItemStatus, errorMsg string) error { - updates := map[string]interface{}{ - "status": status, - } - - if errorMsg != "" { - updates["error_message"] = errorMsg - } - - if status != entity.TaskItemStatusPending { - updates["processed_at"] = gorm.Expr("CURRENT_TIMESTAMP") - } - - return r.db.Model(&entity.TaskItem{}).Where("id = ?", id).Updates(updates).Error +func (r *TaskItemRepositoryImpl) UpdateStatus(id uint, status string) error { + return r.db.Model(&entity.TaskItem{}).Where("id = ?", id).Update("status", status).Error } -// UpdateSuccess 更新任务项为成功状态 -func (r *TaskItemRepositoryImpl) UpdateSuccess(id uint, outputData string) error { - updates := map[string]interface{}{ - "status": entity.TaskItemStatusSuccess, - "output_data": outputData, - "processed_at": gorm.Expr("CURRENT_TIMESTAMP"), +// UpdateStatusAndOutput 更新任务项状态和输出数据 +func (r *TaskItemRepositoryImpl) UpdateStatusAndOutput(id uint, status, outputData string) error { + return r.db.Model(&entity.TaskItem{}).Where("id = ?", id).Updates(map[string]interface{}{ + "status": status, + "output_data": outputData, + }).Error +} + +// GetStatsByTaskID 获取任务项统计信息 +func (r *TaskItemRepositoryImpl) GetStatsByTaskID(taskID uint) (map[string]int, error) { + var results []struct { + Status string + Count int } - return r.db.Model(&entity.TaskItem{}).Where("id = ?", id).Updates(updates).Error -} + err := r.db.Model(&entity.TaskItem{}). + Select("status, count(*) as count"). + Where("task_id = ?", taskID). + Group("status"). + Find(&results).Error -// GetPendingItemsByTaskID 获取任务的待处理项目 -func (r *TaskItemRepositoryImpl) GetPendingItemsByTaskID(taskID uint) ([]entity.TaskItem, error) { - var items []entity.TaskItem - err := r.db.Where("task_id = ? AND status = ?", taskID, entity.TaskItemStatusPending). - Order("created_at ASC"). - Find(&items).Error - return items, err -} + if err != nil { + return nil, err + } -// BatchCreate 批量创建任务项 -func (r *TaskItemRepositoryImpl) BatchCreate(items []entity.TaskItem) error { - return r.db.CreateInBatches(items, 100).Error + stats := map[string]int{ + "total": 0, + "pending": 0, + "processing": 0, + "completed": 0, + "failed": 0, + } + + for _, result := range results { + stats[result.Status] = result.Count + stats["total"] += result.Count + } + + return stats, nil } diff --git a/db/repo/task_repository.go b/db/repo/task_repository.go index 024244d..8b8f5b9 100644 --- a/db/repo/task_repository.go +++ b/db/repo/task_repository.go @@ -7,50 +7,71 @@ import ( // TaskRepository 任务仓库接口 type TaskRepository interface { - BaseRepository[entity.Task] - FindWithItems(id uint) (*entity.Task, error) - FindWithPagination(page, pageSize int) ([]entity.Task, int64, error) - UpdateProgress(id uint, processed, success, failed int) error - UpdateStatus(id uint, status entity.TaskStatus) error - GetRunningTasks() ([]entity.Task, error) + GetByID(id uint) (*entity.Task, error) + Create(task *entity.Task) error + Delete(id uint) error + GetList(page, pageSize int, taskType, status string) ([]*entity.Task, int64, error) + UpdateStatus(id uint, status string) error + UpdateProgress(id uint, progress float64, progressData string) error + UpdateStatusAndMessage(id uint, status, message string) error } // TaskRepositoryImpl 任务仓库实现 type TaskRepositoryImpl struct { - BaseRepositoryImpl[entity.Task] + db *gorm.DB } // NewTaskRepository 创建任务仓库 func NewTaskRepository(db *gorm.DB) TaskRepository { return &TaskRepositoryImpl{ - BaseRepositoryImpl: BaseRepositoryImpl[entity.Task]{db: db}, + db: db, } } -// FindWithItems 查找任务及其所有项目 -func (r *TaskRepositoryImpl) FindWithItems(id uint) (*entity.Task, error) { +// GetByID 根据ID获取任务 +func (r *TaskRepositoryImpl) GetByID(id uint) (*entity.Task, error) { var task entity.Task - err := r.db.Preload("TaskItems").First(&task, id).Error + err := r.db.First(&task, id).Error if err != nil { return nil, err } return &task, nil } -// FindWithPagination 分页查询任务 -func (r *TaskRepositoryImpl) FindWithPagination(page, pageSize int) ([]entity.Task, int64, error) { - var tasks []entity.Task +// Create 创建任务 +func (r *TaskRepositoryImpl) Create(task *entity.Task) error { + return r.db.Create(task).Error +} + +// Delete 删除任务 +func (r *TaskRepositoryImpl) Delete(id uint) error { + return r.db.Delete(&entity.Task{}, id).Error +} + +// GetList 获取任务列表 +func (r *TaskRepositoryImpl) GetList(page, pageSize int, taskType, status string) ([]*entity.Task, int64, error) { + var tasks []*entity.Task var total int64 + query := r.db.Model(&entity.Task{}) + + // 添加过滤条件 + if taskType != "" { + query = query.Where("task_type = ?", taskType) + } + if status != "" { + query = query.Where("status = ?", status) + } + // 获取总数 - err := r.db.Model(&entity.Task{}).Count(&total).Error + err := query.Count(&total).Error if err != nil { return nil, 0, err } // 分页查询 offset := (page - 1) * pageSize - err = r.db.Order("created_at DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error + err = query.Offset(offset).Limit(pageSize).Order("created_at DESC").Find(&tasks).Error if err != nil { return nil, 0, err } @@ -58,40 +79,23 @@ func (r *TaskRepositoryImpl) FindWithPagination(page, pageSize int) ([]entity.Ta return tasks, total, nil } +// UpdateStatus 更新任务状态 +func (r *TaskRepositoryImpl) UpdateStatus(id uint, status string) error { + return r.db.Model(&entity.Task{}).Where("id = ?", id).Update("status", status).Error +} + // UpdateProgress 更新任务进度 -func (r *TaskRepositoryImpl) UpdateProgress(id uint, processed, success, failed int) error { +func (r *TaskRepositoryImpl) UpdateProgress(id uint, progress float64, progressData string) error { return r.db.Model(&entity.Task{}).Where("id = ?", id).Updates(map[string]interface{}{ - "processed_items": processed, - "success_items": success, - "failed_items": failed, + "progress": progress, + "progress_data": progressData, }).Error } -// UpdateStatus 更新任务状态 -func (r *TaskRepositoryImpl) UpdateStatus(id uint, status entity.TaskStatus) error { - updates := map[string]interface{}{ - "status": status, - } - - // 如果状态为运行中,设置开始时间 - if status == entity.TaskStatusRunning { - updates["started_at"] = gorm.Expr("CURRENT_TIMESTAMP") - } - - // 如果状态为完成或失败,设置完成时间 - if status == entity.TaskStatusCompleted || status == entity.TaskStatusFailed { - updates["completed_at"] = gorm.Expr("CURRENT_TIMESTAMP") - } - - return r.db.Model(&entity.Task{}).Where("id = ?", id).Updates(updates).Error -} - -// GetRunningTasks 获取正在运行的任务 -func (r *TaskRepositoryImpl) GetRunningTasks() ([]entity.Task, error) { - var tasks []entity.Task - err := r.db.Where("status IN ?", []entity.TaskStatus{ - entity.TaskStatusRunning, - entity.TaskStatusPending, - }).Find(&tasks).Error - return tasks, err +// UpdateStatusAndMessage 更新任务状态和消息 +func (r *TaskRepositoryImpl) UpdateStatusAndMessage(id uint, status, message string) error { + return r.db.Model(&entity.Task{}).Where("id = ?", id).Updates(map[string]interface{}{ + "status": status, + "message": message, + }).Error } diff --git a/handlers/task_handler.go b/handlers/task_handler.go new file mode 100644 index 0000000..8d4fd1a --- /dev/null +++ b/handlers/task_handler.go @@ -0,0 +1,340 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "github.com/ctwj/urldb/db/entity" + "github.com/ctwj/urldb/db/repo" + "github.com/ctwj/urldb/task" + "github.com/ctwj/urldb/utils" + + "github.com/gin-gonic/gin" +) + +// TaskHandler 任务处理器 +type TaskHandler struct { + repoMgr *repo.RepositoryManager + taskManager *task.TaskManager +} + +// NewTaskHandler 创建任务处理器 +func NewTaskHandler(repoMgr *repo.RepositoryManager, taskManager *task.TaskManager) *TaskHandler { + return &TaskHandler{ + repoMgr: repoMgr, + taskManager: taskManager, + } +} + +// 批量转存任务资源项 +type BatchTransferResource struct { + Title string `json:"title" binding:"required"` + URL string `json:"url" binding:"required"` + CategoryID uint `json:"category_id,omitempty"` + Tags []uint `json:"tags,omitempty"` +} + +// CreateBatchTransferTask 创建批量转存任务 +func (h *TaskHandler) CreateBatchTransferTask(c *gin.Context) { + var req struct { + Title string `json:"title" binding:"required"` + Description string `json:"description"` + Resources []BatchTransferResource `json:"resources" binding:"required,min=1"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + ErrorResponse(c, "参数错误: "+err.Error(), http.StatusBadRequest) + return + } + + utils.Info("创建批量转存任务: %s,资源数量: %d", req.Title, len(req.Resources)) + + // 创建任务 + newTask := &entity.Task{ + Title: req.Title, + Description: req.Description, + Type: "transfer", + Status: "pending", + TotalItems: len(req.Resources), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := h.repoMgr.TaskRepository.Create(newTask) + if err != nil { + utils.Error("创建任务失败: %v", err) + ErrorResponse(c, "创建任务失败: "+err.Error(), http.StatusInternalServerError) + return + } + + // 创建任务项 + for _, resource := range req.Resources { + // 构建转存输入数据 + transferInput := task.TransferInput{ + Title: resource.Title, + URL: resource.URL, + CategoryID: resource.CategoryID, + Tags: resource.Tags, + } + + inputJSON, _ := json.Marshal(transferInput) + + taskItem := &entity.TaskItem{ + TaskID: newTask.ID, + Status: "pending", + InputData: string(inputJSON), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = h.repoMgr.TaskItemRepository.Create(taskItem) + if err != nil { + utils.Error("创建任务项失败: %v", err) + // 继续创建其他任务项 + } + } + + utils.Info("批量转存任务创建完成: %d, 共 %d 项", newTask.ID, len(req.Resources)) + + SuccessResponse(c, gin.H{ + "task_id": newTask.ID, + "total_items": len(req.Resources), + "message": "任务创建成功", + }) +} + +// StartTask 启动任务 +func (h *TaskHandler) StartTask(c *gin.Context) { + taskIDStr := c.Param("id") + taskID, err := strconv.ParseUint(taskIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的任务ID: "+err.Error(), http.StatusBadRequest) + return + } + + utils.Info("启动任务: %d", taskID) + + err = h.taskManager.StartTask(uint(taskID)) + if err != nil { + utils.Error("启动任务失败: %v", err) + ErrorResponse(c, "启动任务失败: "+err.Error(), http.StatusInternalServerError) + return + } + + SuccessResponse(c, gin.H{ + "message": "任务启动成功", + }) +} + +// StopTask 停止任务 +func (h *TaskHandler) StopTask(c *gin.Context) { + taskIDStr := c.Param("id") + taskID, err := strconv.ParseUint(taskIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的任务ID: "+err.Error(), http.StatusBadRequest) + return + } + + utils.Info("停止任务: %d", taskID) + + err = h.taskManager.StopTask(uint(taskID)) + if err != nil { + utils.Error("停止任务失败: %v", err) + ErrorResponse(c, "停止任务失败: "+err.Error(), http.StatusInternalServerError) + return + } + + SuccessResponse(c, gin.H{ + "message": "任务停止成功", + }) +} + +// GetTaskStatus 获取任务状态 +func (h *TaskHandler) GetTaskStatus(c *gin.Context) { + taskIDStr := c.Param("id") + taskID, err := strconv.ParseUint(taskIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的任务ID: "+err.Error(), http.StatusBadRequest) + return + } + + // 获取任务详情 + task, err := h.repoMgr.TaskRepository.GetByID(uint(taskID)) + if err != nil { + ErrorResponse(c, "任务不存在: "+err.Error(), http.StatusNotFound) + return + } + + // 获取任务项统计 + stats, err := h.repoMgr.TaskItemRepository.GetStatsByTaskID(uint(taskID)) + if err != nil { + utils.Error("获取任务项统计失败: %v", err) + stats = map[string]int{ + "total": 0, + "pending": 0, + "processing": 0, + "completed": 0, + "failed": 0, + } + } + + // 检查任务是否在运行 + isRunning := h.taskManager.IsTaskRunning(uint(taskID)) + + SuccessResponse(c, gin.H{ + "id": task.ID, + "title": task.Title, + "description": task.Description, + "task_type": task.Type, + "status": task.Status, + "total_items": task.TotalItems, + "is_running": isRunning, + "stats": stats, + "created_at": task.CreatedAt, + "updated_at": task.UpdatedAt, + }) +} + +// GetTasks 获取任务列表 +func (h *TaskHandler) GetTasks(c *gin.Context) { + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "10")) + taskType := c.Query("task_type") + status := c.Query("status") + + utils.Info("GetTasks: 获取任务列表 page=%d, pageSize=%d, taskType=%s, status=%s", page, pageSize, taskType, status) + + tasks, total, err := h.repoMgr.TaskRepository.GetList(page, pageSize, taskType, status) + if err != nil { + utils.Error("获取任务列表失败: %v", err) + ErrorResponse(c, "获取任务列表失败: "+err.Error(), http.StatusInternalServerError) + return + } + + utils.Info("GetTasks: 从数据库获取到 %d 个任务", len(tasks)) + + // 为每个任务添加运行状态 + var result []gin.H + for _, task := range tasks { + isRunning := h.taskManager.IsTaskRunning(task.ID) + utils.Info("GetTasks: 任务 %d (%s) 数据库状态: %s, TaskManager运行状态: %v", task.ID, task.Title, task.Status, isRunning) + + result = append(result, gin.H{ + "id": task.ID, + "title": task.Title, + "description": task.Description, + "task_type": task.Type, + "status": task.Status, + "total_items": task.TotalItems, + "processed_items": task.ProcessedItems, + "success_items": task.SuccessItems, + "failed_items": task.FailedItems, + "is_running": isRunning, + "created_at": task.CreatedAt, + "updated_at": task.UpdatedAt, + }) + } + + SuccessResponse(c, gin.H{ + "items": result, + "total": total, + "page": page, + "size": pageSize, + }) +} + +// GetTaskItems 获取任务项列表 +func (h *TaskHandler) GetTaskItems(c *gin.Context) { + taskIDStr := c.Param("id") + taskID, err := strconv.ParseUint(taskIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的任务ID: "+err.Error(), http.StatusBadRequest) + return + } + + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) + status := c.Query("status") + + items, total, err := h.repoMgr.TaskItemRepository.GetListByTaskID(uint(taskID), page, pageSize, status) + if err != nil { + utils.Error("获取任务项列表失败: %v", err) + ErrorResponse(c, "获取任务项列表失败: "+err.Error(), http.StatusInternalServerError) + return + } + + // 解析输入和输出数据 + var result []gin.H + for _, item := range items { + itemData := gin.H{ + "id": item.ID, + "status": item.Status, + "created_at": item.CreatedAt, + "updated_at": item.UpdatedAt, + } + + // 解析输入数据 + if item.InputData != "" { + var inputData map[string]interface{} + if err := json.Unmarshal([]byte(item.InputData), &inputData); err == nil { + itemData["input"] = inputData + } + } + + // 解析输出数据 + if item.OutputData != "" { + var outputData map[string]interface{} + if err := json.Unmarshal([]byte(item.OutputData), &outputData); err == nil { + itemData["output"] = outputData + } + } + + result = append(result, itemData) + } + + SuccessResponse(c, gin.H{ + "items": result, + "total": total, + "page": page, + "size": pageSize, + }) +} + +// DeleteTask 删除任务 +func (h *TaskHandler) DeleteTask(c *gin.Context) { + taskIDStr := c.Param("id") + taskID, err := strconv.ParseUint(taskIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的任务ID: "+err.Error(), http.StatusBadRequest) + return + } + + // 检查任务是否在运行 + if h.taskManager.IsTaskRunning(uint(taskID)) { + ErrorResponse(c, "任务正在运行,请先停止任务", http.StatusBadRequest) + return + } + + // 删除任务项 + err = h.repoMgr.TaskItemRepository.DeleteByTaskID(uint(taskID)) + if err != nil { + utils.Error("删除任务项失败: %v", err) + ErrorResponse(c, "删除任务项失败: "+err.Error(), http.StatusInternalServerError) + return + } + + // 删除任务 + err = h.repoMgr.TaskRepository.Delete(uint(taskID)) + if err != nil { + utils.Error("删除任务失败: %v", err) + ErrorResponse(c, "删除任务失败: "+err.Error(), http.StatusInternalServerError) + return + } + + utils.Info("任务删除成功: %d", taskID) + SuccessResponse(c, gin.H{ + "message": "任务删除成功", + }) +} diff --git a/main.go b/main.go index 4c0f7df..73b72e0 100644 --- a/main.go +++ b/main.go @@ -4,14 +4,12 @@ import ( "log" "os" - "github.com/ctwj/urldb/scheduler" - "github.com/ctwj/urldb/utils" - "github.com/ctwj/urldb/db" - "github.com/ctwj/urldb/db/entity" "github.com/ctwj/urldb/db/repo" "github.com/ctwj/urldb/handlers" "github.com/ctwj/urldb/middleware" + "github.com/ctwj/urldb/task" + "github.com/ctwj/urldb/utils" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" @@ -70,56 +68,14 @@ func main() { // 创建Repository管理器 repoManager := repo.NewRepositoryManager(db.DB) - // 创建全局调度器 - scheduler := scheduler.GetGlobalScheduler( - repoManager.HotDramaRepository, - repoManager.ReadyResourceRepository, - repoManager.ResourceRepository, - repoManager.SystemConfigRepository, - repoManager.PanRepository, - repoManager.CksRepository, - repoManager.TagRepository, - repoManager.CategoryRepository, - ) + // 创建任务管理器 + taskManager := task.NewTaskManager(repoManager) - // 确保默认配置存在 - // _, err := repoManager.SystemConfigRepository.GetOrCreateDefault() - // if err != nil { - // utils.Error("初始化默认配置失败: %v", err) - // } else { - // utils.Info("默认配置初始化完成") - // } + // 注册转存任务处理器 + transferProcessor := task.NewTransferProcessor(repoManager) + taskManager.RegisterProcessor(transferProcessor) - // 检查系统配置,决定是否启动各种自动任务 - autoProcessReadyResources, err := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoProcessReadyResources) - if err != nil { - utils.Error("获取自动处理待处理资源配置失败: %v", err) - } else if autoProcessReadyResources { - scheduler.StartReadyResourceScheduler() - utils.Info("已启动待处理资源自动处理任务") - } else { - utils.Info("系统配置中自动处理待处理资源功能已禁用,跳过启动定时任务") - } - - autoFetchHotDramaEnabled, err := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoFetchHotDramaEnabled) - if err != nil { - utils.Error("获取自动拉取热播剧配置失败: %v", err) - } else if autoFetchHotDramaEnabled { - scheduler.StartHotDramaScheduler() - utils.Info("已启动热播剧自动拉取任务") - } else { - utils.Info("系统配置中自动拉取热播剧功能已禁用,跳过启动定时任务") - } - - // autoTransferEnabled, err := repoManager.SystemConfigRepository.GetConfigBool(entity.ConfigKeyAutoTransferEnabled) - // if err != nil { - // utils.Error("获取自动转存配置失败: %v", err) - // } else if autoTransferEnabled { - // scheduler.StartAutoTransferScheduler() - // utils.Info("已启动自动转存任务") - // } else { - // utils.Info("系统配置中自动转存功能已禁用,跳过启动定时任务") - // } + utils.Info("任务管理器初始化完成") // 创建Gin实例 r := gin.Default() @@ -140,6 +96,9 @@ func main() { // 创建公开API处理器 publicAPIHandler := handlers.NewPublicAPIHandler() + // 创建任务处理器 + taskHandler := handlers.NewTaskHandler(repoManager, taskManager) + // API路由 api := r.Group("/api") { @@ -255,22 +214,14 @@ func main() { api.PUT("/hot-dramas/:id", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.UpdateHotDrama) api.DELETE("/hot-dramas/:id", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.DeleteHotDrama) - // 调度器管理路由(查询接口无需认证) - api.GET("/scheduler/status", handlers.GetSchedulerStatus) - api.GET("/scheduler/hot-drama/names", handlers.FetchHotDramaNames) - api.POST("/scheduler/hot-drama/start", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StartHotDramaScheduler) - api.POST("/scheduler/hot-drama/stop", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StopHotDramaScheduler) - api.POST("/scheduler/hot-drama/trigger", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.TriggerHotDramaScheduler) - - // 待处理资源自动处理管理路由 - api.POST("/scheduler/ready-resource/start", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StartReadyResourceScheduler) - api.POST("/scheduler/ready-resource/stop", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StopReadyResourceScheduler) - api.POST("/scheduler/ready-resource/trigger", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.TriggerReadyResourceScheduler) - - // 自动转存管理路由 - api.POST("/scheduler/auto-transfer/start", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StartAutoTransferScheduler) - api.POST("/scheduler/auto-transfer/stop", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.StopAutoTransferScheduler) - api.POST("/scheduler/auto-transfer/trigger", middleware.AuthMiddleware(), middleware.AdminMiddleware(), handlers.TriggerAutoTransferScheduler) + // 任务管理路由 + api.POST("/tasks/transfer", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.CreateBatchTransferTask) + api.GET("/tasks", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.GetTasks) + api.GET("/tasks/:id", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.GetTaskStatus) + api.POST("/tasks/:id/start", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.StartTask) + api.POST("/tasks/:id/stop", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.StopTask) + api.DELETE("/tasks/:id", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.DeleteTask) + api.GET("/tasks/:id/items", middleware.AuthMiddleware(), middleware.AdminMiddleware(), taskHandler.GetTaskItems) // 版本管理路由 api.GET("/version", handlers.GetVersion) diff --git a/scheduler/ready_resource.go b/scheduler/ready_resource.go index fb771e7..e261521 100644 --- a/scheduler/ready_resource.go +++ b/scheduler/ready_resource.go @@ -324,7 +324,11 @@ func (r *ReadyResourceScheduler) convertReadyResourceToResource(readyResource en // 创建资源标签关联 for _, tagID := range tagIDs { - err = r.resourceRepo.CreateResourceTag(resource.ID, tagID) + resourceTag := &entity.ResourceTag{ + ResourceID: resource.ID, + TagID: tagID, + } + err = r.resourceRepo.CreateResourceTag(resourceTag) if err != nil { utils.Error(fmt.Sprintf("创建资源标签关联失败: %v", err)) } diff --git a/task/task_processor.go b/task/task_processor.go new file mode 100644 index 0000000..de0cb3a --- /dev/null +++ b/task/task_processor.go @@ -0,0 +1,278 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/ctwj/urldb/db/entity" + "github.com/ctwj/urldb/db/repo" + "github.com/ctwj/urldb/utils" +) + +// TaskProcessor 任务处理器接口 +type TaskProcessor interface { + Process(ctx context.Context, taskID uint, item *entity.TaskItem) error + GetTaskType() string +} + +// TaskManager 任务管理器 +type TaskManager struct { + processors map[string]TaskProcessor + repoMgr *repo.RepositoryManager + mu sync.RWMutex + running map[uint]context.CancelFunc // 正在运行的任务 +} + +// NewTaskManager 创建任务管理器 +func NewTaskManager(repoMgr *repo.RepositoryManager) *TaskManager { + return &TaskManager{ + processors: make(map[string]TaskProcessor), + repoMgr: repoMgr, + running: make(map[uint]context.CancelFunc), + } +} + +// RegisterProcessor 注册任务处理器 +func (tm *TaskManager) RegisterProcessor(processor TaskProcessor) { + tm.mu.Lock() + defer tm.mu.Unlock() + tm.processors[processor.GetTaskType()] = processor + utils.Info("注册任务处理器: %s", processor.GetTaskType()) +} + +// getRegisteredProcessors 获取已注册的处理器列表(用于调试) +func (tm *TaskManager) getRegisteredProcessors() []string { + var types []string + for taskType := range tm.processors { + types = append(types, taskType) + } + return types +} + +// StartTask 启动任务 +func (tm *TaskManager) StartTask(taskID uint) error { + tm.mu.Lock() + defer tm.mu.Unlock() + + utils.Info("StartTask: 尝试启动任务 %d", taskID) + + // 检查任务是否已在运行 + if _, exists := tm.running[taskID]; exists { + utils.Info("任务 %d 已在运行中", taskID) + return fmt.Errorf("任务 %d 已在运行中", taskID) + } + + // 获取任务信息 + task, err := tm.repoMgr.TaskRepository.GetByID(taskID) + if err != nil { + utils.Error("获取任务失败: %v", err) + return fmt.Errorf("获取任务失败: %v", err) + } + + utils.Info("StartTask: 获取到任务 %d, 类型: %s, 状态: %s", task.ID, task.Type, task.Status) + + // 获取处理器 + processor, exists := tm.processors[string(task.Type)] + if !exists { + utils.Error("未找到任务类型 %s 的处理器, 已注册的处理器: %v", task.Type, tm.getRegisteredProcessors()) + return fmt.Errorf("未找到任务类型 %s 的处理器", task.Type) + } + + utils.Info("StartTask: 找到处理器 %s", task.Type) + + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + tm.running[taskID] = cancel + + utils.Info("StartTask: 启动后台任务协程") + // 启动后台任务 + go tm.processTask(ctx, task, processor) + + utils.Info("StartTask: 任务 %d 启动成功", taskID) + return nil +} + +// StopTask 停止任务 +func (tm *TaskManager) StopTask(taskID uint) error { + tm.mu.Lock() + defer tm.mu.Unlock() + + cancel, exists := tm.running[taskID] + if !exists { + return fmt.Errorf("任务 %d 未在运行", taskID) + } + + cancel() + delete(tm.running, taskID) + + // 更新任务状态为暂停 + err := tm.repoMgr.TaskRepository.UpdateStatus(taskID, "paused") + if err != nil { + utils.Error("更新任务状态失败: %v", err) + } + + return nil +} + +// processTask 处理任务 +func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, processor TaskProcessor) { + defer func() { + tm.mu.Lock() + delete(tm.running, task.ID) + tm.mu.Unlock() + utils.Info("processTask: 任务 %d 处理完成,清理资源", task.ID) + }() + + utils.Info("processTask: 开始处理任务: %d, 类型: %s", task.ID, task.Type) + + // 更新任务状态为运行中 + err := tm.repoMgr.TaskRepository.UpdateStatus(task.ID, "running") + if err != nil { + utils.Error("更新任务状态失败: %v", err) + return + } + + // 获取待处理的任务项 + items, err := tm.repoMgr.TaskItemRepository.GetByTaskIDAndStatus(task.ID, "pending") + if err != nil { + utils.Error("获取任务项失败: %v", err) + tm.markTaskFailed(task.ID, fmt.Sprintf("获取任务项失败: %v", err)) + return + } + + totalItems := len(items) + processedItems := 0 + successItems := 0 + failedItems := 0 + + utils.Info("任务 %d 共有 %d 个待处理项", task.ID, totalItems) + + for _, item := range items { + select { + case <-ctx.Done(): + utils.Info("任务 %d 被取消", task.ID) + return + default: + // 处理单个任务项 + err := tm.processTaskItem(ctx, task.ID, item, processor) + processedItems++ + + if err != nil { + failedItems++ + utils.Error("处理任务项 %d 失败: %v", item.ID, err) + } else { + successItems++ + } + + // 更新任务进度 + progress := float64(processedItems) / float64(totalItems) * 100 + tm.updateTaskProgress(task.ID, progress, processedItems, successItems, failedItems) + } + } + + // 任务完成 + status := "completed" + message := fmt.Sprintf("任务完成,共处理 %d 项,成功 %d 项,失败 %d 项", processedItems, successItems, failedItems) + + if failedItems > 0 && successItems == 0 { + status = "failed" + message = fmt.Sprintf("任务失败,共处理 %d 项,全部失败", processedItems) + } else if failedItems > 0 { + status = "partial_success" + message = fmt.Sprintf("任务部分成功,共处理 %d 项,成功 %d 项,失败 %d 项", processedItems, successItems, failedItems) + } + + err = tm.repoMgr.TaskRepository.UpdateStatusAndMessage(task.ID, status, message) + if err != nil { + utils.Error("更新任务状态失败: %v", err) + } + + utils.Info("任务 %d 处理完成: %s", task.ID, message) +} + +// processTaskItem 处理单个任务项 +func (tm *TaskManager) processTaskItem(ctx context.Context, taskID uint, item *entity.TaskItem, processor TaskProcessor) error { + // 更新任务项状态为处理中 + err := tm.repoMgr.TaskItemRepository.UpdateStatus(item.ID, "processing") + if err != nil { + return fmt.Errorf("更新任务项状态失败: %v", err) + } + + // 处理任务项 + err = processor.Process(ctx, taskID, item) + + if err != nil { + // 处理失败 + outputData := map[string]interface{}{ + "error": err.Error(), + "time": time.Now(), + } + outputJSON, _ := json.Marshal(outputData) + + updateErr := tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "failed", string(outputJSON)) + if updateErr != nil { + utils.Error("更新失败任务项状态失败: %v", updateErr) + } + return err + } + + // 处理成功 + outputData := map[string]interface{}{ + "success": true, + "time": time.Now(), + } + outputJSON, _ := json.Marshal(outputData) + + err = tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "completed", string(outputJSON)) + if err != nil { + utils.Error("更新成功任务项状态失败: %v", err) + } + + return nil +} + +// updateTaskProgress 更新任务进度 +func (tm *TaskManager) updateTaskProgress(taskID uint, progress float64, processed, success, failed int) { + progressData := map[string]interface{}{ + "progress": progress, + "processed": processed, + "success": success, + "failed": failed, + "time": time.Now(), + } + + progressJSON, _ := json.Marshal(progressData) + + err := tm.repoMgr.TaskRepository.UpdateProgress(taskID, progress, string(progressJSON)) + if err != nil { + utils.Error("更新任务进度失败: %v", err) + } +} + +// markTaskFailed 标记任务失败 +func (tm *TaskManager) markTaskFailed(taskID uint, message string) { + err := tm.repoMgr.TaskRepository.UpdateStatusAndMessage(taskID, "failed", message) + if err != nil { + utils.Error("标记任务失败状态失败: %v", err) + } +} + +// GetTaskStatus 获取任务状态 +func (tm *TaskManager) GetTaskStatus(taskID uint) (string, error) { + task, err := tm.repoMgr.TaskRepository.GetByID(taskID) + if err != nil { + return "", err + } + return string(task.Status), nil +} + +// IsTaskRunning 检查任务是否在运行 +func (tm *TaskManager) IsTaskRunning(taskID uint) bool { + tm.mu.RLock() + defer tm.mu.RUnlock() + _, exists := tm.running[taskID] + return exists +} diff --git a/task/transfer_processor.go b/task/transfer_processor.go new file mode 100644 index 0000000..ba9c94d --- /dev/null +++ b/task/transfer_processor.go @@ -0,0 +1,258 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "strings" + "time" + + "github.com/ctwj/urldb/db/entity" + "github.com/ctwj/urldb/db/repo" + "github.com/ctwj/urldb/utils" +) + +// TransferProcessor 转存任务处理器 +type TransferProcessor struct { + repoMgr *repo.RepositoryManager +} + +// NewTransferProcessor 创建转存任务处理器 +func NewTransferProcessor(repoMgr *repo.RepositoryManager) *TransferProcessor { + return &TransferProcessor{ + repoMgr: repoMgr, + } +} + +// GetTaskType 获取任务类型 +func (tp *TransferProcessor) GetTaskType() string { + return "transfer" +} + +// TransferInput 转存任务输入数据结构 +type TransferInput struct { + Title string `json:"title"` + URL string `json:"url"` + CategoryID uint `json:"category_id"` + Tags []uint `json:"tags"` +} + +// TransferOutput 转存任务输出数据结构 +type TransferOutput struct { + ResourceID uint `json:"resource_id,omitempty"` + SaveURL string `json:"save_url,omitempty"` + Error string `json:"error,omitempty"` + Success bool `json:"success"` + Time string `json:"time"` +} + +// Process 处理转存任务项 +func (tp *TransferProcessor) Process(ctx context.Context, taskID uint, item *entity.TaskItem) error { + utils.Info("开始处理转存任务项: %d", item.ID) + + // 解析输入数据 + var input TransferInput + if err := json.Unmarshal([]byte(item.InputData), &input); err != nil { + return fmt.Errorf("解析输入数据失败: %v", err) + } + + // 验证输入数据 + if err := tp.validateInput(&input); err != nil { + return fmt.Errorf("输入数据验证失败: %v", err) + } + + // 检查资源是否已存在 + exists, existingResource, err := tp.checkResourceExists(input.URL) + if err != nil { + utils.Error("检查资源是否存在失败: %v", err) + } + + if exists { + // 资源已存在,更新输出数据 + output := TransferOutput{ + ResourceID: existingResource.ID, + SaveURL: existingResource.SaveURL, + Success: true, + Time: time.Now().Format("2006-01-02 15:04:05"), + } + + outputJSON, _ := json.Marshal(output) + item.OutputData = string(outputJSON) + + utils.Info("资源已存在,跳过转存: %s", input.Title) + return nil + } + + // 执行转存操作 + resourceID, saveURL, err := tp.performTransfer(ctx, &input) + if err != nil { + return fmt.Errorf("转存失败: %v", err) + } + + // 更新输出数据 + output := TransferOutput{ + ResourceID: resourceID, + SaveURL: saveURL, + Success: true, + Time: time.Now().Format("2006-01-02 15:04:05"), + } + + outputJSON, _ := json.Marshal(output) + item.OutputData = string(outputJSON) + + utils.Info("转存任务项处理完成: %d, 资源ID: %d", item.ID, resourceID) + return nil +} + +// validateInput 验证输入数据 +func (tp *TransferProcessor) validateInput(input *TransferInput) error { + if strings.TrimSpace(input.Title) == "" { + return fmt.Errorf("标题不能为空") + } + + if strings.TrimSpace(input.URL) == "" { + return fmt.Errorf("链接不能为空") + } + + // 验证URL格式 + if !tp.isValidURL(input.URL) { + return fmt.Errorf("链接格式不正确") + } + + return nil +} + +// isValidURL 验证URL格式 +func (tp *TransferProcessor) isValidURL(url string) bool { + // 简单的URL验证,可以根据需要扩展 + quarkPattern := `https://pan\.quark\.cn/s/[a-zA-Z0-9]+` + matched, _ := regexp.MatchString(quarkPattern, url) + return matched +} + +// checkResourceExists 检查资源是否已存在 +func (tp *TransferProcessor) checkResourceExists(url string) (bool, *entity.Resource, error) { + // 根据URL查找资源 + resource, err := tp.repoMgr.ResourceRepository.GetByURL(url) + if err != nil { + // 如果是未找到记录的错误,则表示资源不存在 + if strings.Contains(err.Error(), "record not found") { + return false, nil, nil + } + return false, nil, err + } + + return true, resource, nil +} + +// performTransfer 执行转存操作 +func (tp *TransferProcessor) performTransfer(ctx context.Context, input *TransferInput) (uint, string, error) { + // 解析URL获取分享信息 + shareInfo, err := tp.parseShareURL(input.URL) + if err != nil { + return 0, "", fmt.Errorf("解析分享链接失败: %v", err) + } + + // 创建资源记录 + var categoryID *uint + if input.CategoryID != 0 { + categoryID = &input.CategoryID + } + + resource := &entity.Resource{ + Title: input.Title, + URL: input.URL, + CategoryID: categoryID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + // 保存资源到数据库 + err = tp.repoMgr.ResourceRepository.Create(resource) + if err != nil { + return 0, "", fmt.Errorf("保存资源失败: %v", err) + } + + // 添加标签关联 + if len(input.Tags) > 0 { + err = tp.addResourceTags(resource.ID, input.Tags) + if err != nil { + utils.Error("添加资源标签失败: %v", err) + } + } + + // 执行实际转存操作 + saveURL, err := tp.transferToCloud(ctx, shareInfo) + if err != nil { + utils.Error("云端转存失败: %v", err) + // 转存失败但资源已创建,返回原始URL + return resource.ID, input.URL, nil + } + + // 更新资源的转存链接 + if saveURL != "" { + err = tp.repoMgr.ResourceRepository.UpdateSaveURL(resource.ID, saveURL) + if err != nil { + utils.Error("更新转存链接失败: %v", err) + } + } + + return resource.ID, saveURL, nil +} + +// ShareInfo 分享信息结构 +type ShareInfo struct { + PanType string + ShareID string + URL string +} + +// parseShareURL 解析分享链接 +func (tp *TransferProcessor) parseShareURL(url string) (*ShareInfo, error) { + // 解析夸克网盘链接 + quarkPattern := `https://pan\.quark\.cn/s/([a-zA-Z0-9]+)` + re := regexp.MustCompile(quarkPattern) + matches := re.FindStringSubmatch(url) + + if len(matches) >= 2 { + return &ShareInfo{ + PanType: "quark", + ShareID: matches[1], + URL: url, + }, nil + } + + return nil, fmt.Errorf("不支持的分享链接格式: %s", url) +} + +// addResourceTags 添加资源标签 +func (tp *TransferProcessor) addResourceTags(resourceID uint, tagIDs []uint) error { + for _, tagID := range tagIDs { + // 创建资源标签关联 + resourceTag := &entity.ResourceTag{ + ResourceID: resourceID, + TagID: tagID, + } + + err := tp.repoMgr.ResourceRepository.CreateResourceTag(resourceTag) + if err != nil { + return fmt.Errorf("创建资源标签关联失败: %v", err) + } + } + return nil +} + +// transferToCloud 执行云端转存 +func (tp *TransferProcessor) transferToCloud(ctx context.Context, shareInfo *ShareInfo) (string, error) { + // 检查是否启用自动转存 + autoTransferEnabled, err := tp.repoMgr.SystemConfigRepository.GetConfigBool("auto_transfer") + if err != nil || !autoTransferEnabled { + utils.Info("自动转存未启用,跳过云端转存") + return "", nil + } + + // TODO: 实现云端转存逻辑 + utils.Info("云端转存功能暂未实现,跳过转存: %s", shareInfo.ShareID) + return "", nil +} diff --git a/web/components/Admin/ManualBatchTransfer.vue b/web/components/Admin/ManualBatchTransfer.vue index 908b094..762bd05 100644 --- a/web/components/Admin/ManualBatchTransfer.vue +++ b/web/components/Admin/ManualBatchTransfer.vue @@ -1,15 +1,8 @@ \ No newline at end of file diff --git a/web/components/Admin/NewHeader.vue b/web/components/Admin/NewHeader.vue index 616f70e..11a167f 100644 --- a/web/components/Admin/NewHeader.vue +++ b/web/components/Admin/NewHeader.vue @@ -15,11 +15,41 @@ -
+
+
系统正常
+ + +
+
+ + 自动处理{{ autoProcessEnabled ? '已开启' : '已关闭' }} + +
+ + +
+
+ + 自动转存{{ autoTransferEnabled ? '已开启' : '已关闭' }} + +
+ + +
+
+ + + + +
@@ -45,7 +75,67 @@ \ No newline at end of file diff --git a/web/stores/task.ts b/web/stores/task.ts new file mode 100644 index 0000000..591da9d --- /dev/null +++ b/web/stores/task.ts @@ -0,0 +1,238 @@ +import { defineStore } from 'pinia' +import { ref, computed } from 'vue' +import { useTaskApi } from '~/composables/useApi' + +export interface TaskStats { + total: number + running: number + pending: number + completed: number + failed: number + paused: number +} + +export interface TaskInfo { + id: number + title: string + type: string + status: string + total_items: number + processed_items?: number + success_items?: number + failed_items?: number + created_at: string + updated_at: string + is_running?: boolean // 任务是否在TaskManager中运行 +} + +export const useTaskStore = defineStore('task', () => { + const taskApi = useTaskApi() + + // 任务统计信息 + const taskStats = ref({ + total: 0, + running: 0, + pending: 0, + completed: 0, + failed: 0, + paused: 0 + }) + + // 正在运行的任务列表 + const runningTasks = ref([]) + + // 未完成的任务列表(pending + running + paused) + const incompleteTasks = ref([]) + + // 更新定时器 + let updateInterval: NodeJS.Timeout | null = null + + // 计算属性:是否有活跃任务 + const hasActiveTasks = computed(() => { + return taskStats.value.running > 0 || taskStats.value.pending > 0 || taskStats.value.paused > 0 + }) + + // 计算属性:活跃任务总数 + const activeTaskCount = computed(() => { + return taskStats.value.running + taskStats.value.pending + taskStats.value.paused + }) + + // 计算属性:正在运行的任务数 + const runningTaskCount = computed(() => { + return taskStats.value.running + }) + + // 获取任务统计信息 + const fetchTaskStats = async () => { + try { + const response = await taskApi.getTasks() as any + console.log('原始任务API响应:', response) + + // 处理API响应格式 + let tasks: TaskInfo[] = [] + if (response && response.items && Array.isArray(response.items)) { + tasks = response.items + } else if (Array.isArray(response)) { + tasks = response + } + + console.log('解析后的任务列表:', tasks) + + if (tasks && tasks.length >= 0) { + // 重置统计 + const stats: TaskStats = { + total: tasks.length, + running: 0, + pending: 0, + completed: 0, + failed: 0, + paused: 0 + } + + const running: TaskInfo[] = [] + const incomplete: TaskInfo[] = [] + + // 统计各种状态的任务 + tasks.forEach((task: TaskInfo) => { + console.log('处理任务:', task.id, '状态:', task.status, '是否运行中:', task.is_running) + + // 如果任务标记为运行中,优先使用running状态 + let currentStatus = task.status + if (task.is_running) { + currentStatus = 'running' + } + + switch (currentStatus) { + case 'running': + stats.running++ + running.push(task) + incomplete.push(task) + break + case 'pending': + stats.pending++ + incomplete.push(task) + break + case 'completed': + stats.completed++ + break + case 'failed': + stats.failed++ + break + case 'paused': + stats.paused++ + incomplete.push(task) + break + } + }) + + // 更新状态 + taskStats.value = stats + runningTasks.value = running + incompleteTasks.value = incomplete + + console.log('任务统计更新:', stats) + console.log('运行中的任务:', running) + console.log('未完成的任务:', incomplete) + } + } catch (error) { + console.error('获取任务统计失败:', error) + } + } + + // 开始定时更新 + const startAutoUpdate = () => { + if (updateInterval) { + clearInterval(updateInterval) + } + + // 立即执行一次 + fetchTaskStats() + + // 每5秒更新一次 + updateInterval = setInterval(() => { + fetchTaskStats() + }, 5000) + + console.log('任务状态自动更新已启动') + } + + // 停止定时更新 + const stopAutoUpdate = () => { + if (updateInterval) { + clearInterval(updateInterval) + updateInterval = null + console.log('任务状态自动更新已停止') + } + } + + // 获取特定任务的详细状态 + const getTaskStatus = async (taskId: number) => { + try { + const status = await taskApi.getTaskStatus(taskId) + return status + } catch (error) { + console.error('获取任务状态失败:', error) + return null + } + } + + // 启动任务 + const startTask = async (taskId: number) => { + try { + await taskApi.startTask(taskId) + // 立即更新状态 + await fetchTaskStats() + return true + } catch (error) { + console.error('启动任务失败:', error) + return false + } + } + + // 停止任务 + const stopTask = async (taskId: number) => { + try { + await taskApi.stopTask(taskId) + // 立即更新状态 + await fetchTaskStats() + return true + } catch (error) { + console.error('停止任务失败:', error) + return false + } + } + + // 删除任务 + const deleteTask = async (taskId: number) => { + try { + await taskApi.deleteTask(taskId) + // 立即更新状态 + await fetchTaskStats() + return true + } catch (error) { + console.error('删除任务失败:', error) + return false + } + } + + return { + // 状态 + taskStats, + runningTasks, + incompleteTasks, + + // 计算属性 + hasActiveTasks, + activeTaskCount, + runningTaskCount, + + // 方法 + fetchTaskStats, + startAutoUpdate, + stopAutoUpdate, + getTaskStatus, + startTask, + stopTask, + deleteTask + } +}) \ No newline at end of file