Files
urldb/task/task_processor.go

548 lines
17 KiB
Go
Raw Normal View History

2025-08-09 23:47:30 +08:00
package task
import (
"context"
"encoding/json"
"fmt"
"sync"
2025-10-28 11:07:00 +08:00
"time"
2025-08-09 23:47:30 +08:00
"github.com/ctwj/urldb/db/entity"
"github.com/ctwj/urldb/db/repo"
"github.com/ctwj/urldb/utils"
)
// TaskProcessor 任务处理器接口
type TaskProcessor interface {
Process(ctx context.Context, taskID uint, item *entity.TaskItem) error
GetTaskType() string
}
// TaskManager 任务管理器
type TaskManager struct {
processors map[string]TaskProcessor
repoMgr *repo.RepositoryManager
mu sync.RWMutex
running map[uint]context.CancelFunc // 正在运行的任务
}
// NewTaskManager 创建任务管理器
func NewTaskManager(repoMgr *repo.RepositoryManager) *TaskManager {
return &TaskManager{
processors: make(map[string]TaskProcessor),
repoMgr: repoMgr,
running: make(map[uint]context.CancelFunc),
}
}
// RegisterProcessor 注册任务处理器
func (tm *TaskManager) RegisterProcessor(processor TaskProcessor) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.processors[processor.GetTaskType()] = processor
2025-08-25 09:51:45 +08:00
utils.Debug("注册任务处理器: %s", processor.GetTaskType())
2025-08-09 23:47:30 +08:00
}
// getRegisteredProcessors 获取已注册的处理器列表(用于调试)
func (tm *TaskManager) getRegisteredProcessors() []string {
var types []string
for taskType := range tm.processors {
types = append(types, taskType)
}
return types
}
// StartTask 启动任务
func (tm *TaskManager) StartTask(taskID uint) error {
tm.mu.Lock()
defer tm.mu.Unlock()
2025-08-25 09:51:45 +08:00
utils.Debug("StartTask: 尝试启动任务 %d", taskID)
2025-08-09 23:47:30 +08:00
// 检查任务是否已在运行
if _, exists := tm.running[taskID]; exists {
2025-08-25 09:51:45 +08:00
utils.Debug("任务 %d 已在运行中", taskID)
2025-08-09 23:47:30 +08:00
return fmt.Errorf("任务 %d 已在运行中", taskID)
}
// 获取任务信息
task, err := tm.repoMgr.TaskRepository.GetByID(taskID)
if err != nil {
utils.Error("获取任务失败: %v", err)
return fmt.Errorf("获取任务失败: %v", err)
}
2025-08-25 09:51:45 +08:00
utils.Debug("StartTask: 获取到任务 %d, 类型: %s, 状态: %s", task.ID, task.Type, task.Status)
2025-08-09 23:47:30 +08:00
// 获取处理器
processor, exists := tm.processors[string(task.Type)]
if !exists {
utils.Error("未找到任务类型 %s 的处理器, 已注册的处理器: %v", task.Type, tm.getRegisteredProcessors())
return fmt.Errorf("未找到任务类型 %s 的处理器", task.Type)
}
2025-08-25 09:51:45 +08:00
utils.Debug("StartTask: 找到处理器 %s", task.Type)
2025-08-09 23:47:30 +08:00
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
tm.running[taskID] = cancel
2025-08-25 09:51:45 +08:00
utils.Debug("StartTask: 启动后台任务协程")
2025-08-09 23:47:30 +08:00
// 启动后台任务
go tm.processTask(ctx, task, processor)
2025-10-28 11:07:00 +08:00
utils.InfoWithFields(map[string]interface{}{
"task_id": taskID,
}, "StartTask: 任务 %d 启动成功", taskID)
2025-08-09 23:47:30 +08:00
return nil
}
2025-08-10 00:54:30 +08:00
// PauseTask 暂停任务
func (tm *TaskManager) PauseTask(taskID uint) error {
tm.mu.Lock()
defer tm.mu.Unlock()
utils.Info("PauseTask: 尝试暂停任务 %d", taskID)
// 检查任务是否在运行
cancel, exists := tm.running[taskID]
if !exists {
2025-08-10 13:52:41 +08:00
// 检查数据库中任务状态
task, err := tm.repoMgr.TaskRepository.GetByID(taskID)
if err != nil {
utils.Error("获取任务信息失败: %v", err)
return fmt.Errorf("获取任务信息失败: %v", err)
}
// 如果数据库中的状态是running说明服务器重启了直接更新状态
if task.Status == "running" {
utils.Info("任务 %d 在数据库中状态为running但内存中不存在可能是服务器重启直接更新状态为paused", taskID)
err := tm.repoMgr.TaskRepository.UpdateStatus(taskID, "paused")
if err != nil {
utils.Error("更新任务状态为暂停失败: %v", err)
return fmt.Errorf("更新任务状态失败: %v", err)
}
utils.Info("任务 %d 暂停成功(服务器重启恢复)", taskID)
return nil
}
2025-08-10 00:54:30 +08:00
utils.Info("任务 %d 未在运行,无法暂停", taskID)
return fmt.Errorf("任务 %d 未在运行", taskID)
}
// 停止任务类似stop但状态标记为paused
cancel()
delete(tm.running, taskID)
// 更新任务状态为暂停
err := tm.repoMgr.TaskRepository.UpdateStatus(taskID, "paused")
if err != nil {
utils.Error("更新任务状态为暂停失败: %v", err)
return fmt.Errorf("更新任务状态失败: %v", err)
}
utils.Info("任务 %d 暂停成功", taskID)
return nil
}
2025-08-09 23:47:30 +08:00
// StopTask 停止任务
func (tm *TaskManager) StopTask(taskID uint) error {
tm.mu.Lock()
defer tm.mu.Unlock()
cancel, exists := tm.running[taskID]
if !exists {
2025-08-10 13:52:41 +08:00
// 检查数据库中任务状态
task, err := tm.repoMgr.TaskRepository.GetByID(taskID)
if err != nil {
utils.Error("获取任务信息失败: %v", err)
return fmt.Errorf("获取任务信息失败: %v", err)
}
// 如果数据库中的状态是running说明服务器重启了直接更新状态
if task.Status == "running" {
utils.Info("任务 %d 在数据库中状态为running但内存中不存在可能是服务器重启直接更新状态为paused", taskID)
err := tm.repoMgr.TaskRepository.UpdateStatus(taskID, "paused")
if err != nil {
utils.Error("更新任务状态失败: %v", err)
return fmt.Errorf("更新任务状态失败: %v", err)
}
utils.Info("任务 %d 停止成功(服务器重启恢复)", taskID)
return nil
}
2025-08-09 23:47:30 +08:00
return fmt.Errorf("任务 %d 未在运行", taskID)
}
cancel()
delete(tm.running, taskID)
// 更新任务状态为暂停
err := tm.repoMgr.TaskRepository.UpdateStatus(taskID, "paused")
if err != nil {
utils.Error("更新任务状态失败: %v", err)
}
return nil
}
// processTask 处理任务
func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, processor TaskProcessor) {
2025-10-28 11:07:00 +08:00
startTime := utils.GetCurrentTime()
2025-08-09 23:47:30 +08:00
defer func() {
tm.mu.Lock()
delete(tm.running, task.ID)
tm.mu.Unlock()
2025-10-28 11:07:00 +08:00
elapsedTime := time.Since(startTime)
utils.Info("processTask: 任务 %d 处理完成,耗时: %v清理资源", task.ID, elapsedTime)
2025-08-09 23:47:30 +08:00
}()
2025-10-28 11:07:00 +08:00
utils.InfoWithFields(map[string]interface{}{
"task_id": task.ID,
"task_type": task.Type,
}, "processTask: 开始处理任务: %d, 类型: %s", task.ID, task.Type)
2025-08-09 23:47:30 +08:00
// 更新任务状态为运行中
err := tm.repoMgr.TaskRepository.UpdateStatus(task.ID, "running")
if err != nil {
utils.Error("更新任务状态失败: %v", err)
return
}
2025-09-27 16:14:43 +08:00
// 更新任务开始时间
err = tm.repoMgr.TaskRepository.UpdateStartedAt(task.ID)
if err != nil {
utils.Error("更新任务开始时间失败: %v", err)
}
2025-08-12 00:27:10 +08:00
// 获取任务项统计信息,用于计算正确的进度
2025-10-28 11:07:00 +08:00
statsStart := utils.GetCurrentTime()
2025-08-12 00:27:10 +08:00
stats, err := tm.repoMgr.TaskItemRepository.GetStatsByTaskID(task.ID)
2025-10-28 11:07:00 +08:00
statsDuration := time.Since(statsStart)
2025-08-12 00:27:10 +08:00
if err != nil {
utils.Error("获取任务项统计失败: %v", err)
stats = map[string]int{
"total": 0,
"pending": 0,
"processing": 0,
"completed": 0,
"failed": 0,
}
2025-10-28 11:07:00 +08:00
} else {
utils.Debug("获取任务项统计完成,耗时: %v", statsDuration)
2025-08-12 00:27:10 +08:00
}
2025-08-09 23:47:30 +08:00
// 获取待处理的任务项
2025-10-28 11:07:00 +08:00
itemsStart := utils.GetCurrentTime()
2025-08-09 23:47:30 +08:00
items, err := tm.repoMgr.TaskItemRepository.GetByTaskIDAndStatus(task.ID, "pending")
2025-10-28 11:07:00 +08:00
itemsDuration := time.Since(itemsStart)
2025-08-09 23:47:30 +08:00
if err != nil {
utils.Error("获取任务项失败: %v", err)
tm.markTaskFailed(task.ID, fmt.Sprintf("获取任务项失败: %v", err))
return
2025-10-28 11:07:00 +08:00
} else {
utils.Debug("获取任务项完成,数量: %d耗时: %v", len(items), itemsDuration)
2025-08-09 23:47:30 +08:00
}
2025-08-12 00:27:10 +08:00
// 计算总任务项数和已完成的项数
totalItems := stats["total"]
completedItems := stats["completed"]
initialFailedItems := stats["failed"]
processingItems := stats["processing"]
2025-08-09 23:47:30 +08:00
2025-08-12 00:27:10 +08:00
// 如果当前批次有处理中的任务项重置它们为pending状态服务器重启恢复
if processingItems > 0 {
2025-10-28 11:07:00 +08:00
utils.Info("任务 %d 发现 %d 个处理中的任务项重置为pending状态", task.ID, processingItems)
resetStart := utils.GetCurrentTime()
2025-08-12 00:27:10 +08:00
err = tm.repoMgr.TaskItemRepository.ResetProcessingItems(task.ID)
2025-10-28 11:07:00 +08:00
resetDuration := time.Since(resetStart)
2025-08-12 00:27:10 +08:00
if err != nil {
utils.Error("重置处理中任务项失败: %v", err)
2025-10-28 11:07:00 +08:00
} else {
utils.Debug("重置处理中任务项完成,耗时: %v", resetDuration)
2025-08-12 00:27:10 +08:00
}
// 重新获取待处理的任务项
items, err = tm.repoMgr.TaskItemRepository.GetByTaskIDAndStatus(task.ID, "pending")
if err != nil {
utils.Error("重新获取任务项失败: %v", err)
tm.markTaskFailed(task.ID, fmt.Sprintf("重新获取任务项失败: %v", err))
return
}
}
currentBatchItems := len(items)
processedItems := completedItems + initialFailedItems // 已经处理的项目数
successItems := completedItems
failedItems := initialFailedItems
2025-08-25 09:51:45 +08:00
utils.Debug("任务 %d 统计信息: 总计=%d, 已完成=%d, 已失败=%d, 待处理=%d",
2025-08-12 00:27:10 +08:00
task.ID, totalItems, completedItems, failedItems, currentBatchItems)
2025-08-09 23:47:30 +08:00
2025-10-28 11:07:00 +08:00
// 记录处理开始时间
batchStartTime := utils.GetCurrentTime()
for i, item := range items {
2025-08-09 23:47:30 +08:00
select {
case <-ctx.Done():
2025-08-25 09:51:45 +08:00
utils.Debug("任务 %d 被取消", task.ID)
2025-08-09 23:47:30 +08:00
return
default:
2025-10-28 11:07:00 +08:00
// 记录单个任务项处理开始时间
itemStartTime := utils.GetCurrentTime()
2025-08-09 23:47:30 +08:00
// 处理单个任务项
err := tm.processTaskItem(ctx, task.ID, item, processor)
processedItems++
2025-10-28 11:07:00 +08:00
// 记录单个任务项处理耗时
itemDuration := time.Since(itemStartTime)
2025-08-09 23:47:30 +08:00
if err != nil {
failedItems++
2025-10-28 11:07:00 +08:00
utils.ErrorWithFields(map[string]interface{}{
"task_item_id": item.ID,
"error": err.Error(),
"duration_ms": itemDuration.Milliseconds(),
}, "处理任务项 %d 失败: %v耗时: %v", item.ID, err, itemDuration)
2025-08-09 23:47:30 +08:00
} else {
successItems++
2025-10-28 11:07:00 +08:00
utils.Info("处理任务项 %d 成功,耗时: %v", item.ID, itemDuration)
2025-08-09 23:47:30 +08:00
}
2025-08-12 00:27:10 +08:00
// 更新任务进度(基于总任务项数)
if totalItems > 0 {
progress := float64(processedItems) / float64(totalItems) * 100
tm.updateTaskProgress(task.ID, progress, processedItems, successItems, failedItems)
}
2025-10-28 11:07:00 +08:00
// 每处理10个任务项记录一次批处理进度
if (i+1)%10 == 0 || i == len(items)-1 {
batchDuration := time.Since(batchStartTime)
utils.Info("任务 %d 批处理进度: 已处理 %d/%d 项,成功 %d 项,失败 %d 项,当前批处理耗时: %v",
task.ID, processedItems, totalItems, successItems, failedItems, batchDuration)
}
2025-08-09 23:47:30 +08:00
}
}
2025-10-28 11:07:00 +08:00
// 记录整个批处理耗时
batchDuration := time.Since(batchStartTime)
utils.Info("任务 %d 批处理完成: 总计 %d 项,成功 %d 项,失败 %d 项,总耗时: %v",
task.ID, len(items), successItems, failedItems, batchDuration)
2025-08-09 23:47:30 +08:00
// 任务完成
status := "completed"
message := fmt.Sprintf("任务完成,共处理 %d 项,成功 %d 项,失败 %d 项", processedItems, successItems, failedItems)
if failedItems > 0 && successItems == 0 {
status = "failed"
message = fmt.Sprintf("任务失败,共处理 %d 项,全部失败", processedItems)
} else if failedItems > 0 {
status = "partial_success"
message = fmt.Sprintf("任务部分成功,共处理 %d 项,成功 %d 项,失败 %d 项", processedItems, successItems, failedItems)
}
err = tm.repoMgr.TaskRepository.UpdateStatusAndMessage(task.ID, status, message)
if err != nil {
utils.Error("更新任务状态失败: %v", err)
}
2025-09-27 16:14:43 +08:00
// 如果任务完成,更新完成时间
if status == "completed" || status == "failed" || status == "partial_success" {
err = tm.repoMgr.TaskRepository.UpdateCompletedAt(task.ID)
if err != nil {
utils.Error("更新任务完成时间失败: %v", err)
}
}
2025-10-28 11:07:00 +08:00
utils.InfoWithFields(map[string]interface{}{
"task_id": task.ID,
"message": message,
}, "任务 %d 处理完成: %s", task.ID, message)
2025-08-09 23:47:30 +08:00
}
// processTaskItem 处理单个任务项
func (tm *TaskManager) processTaskItem(ctx context.Context, taskID uint, item *entity.TaskItem, processor TaskProcessor) error {
2025-10-28 11:07:00 +08:00
itemStartTime := utils.GetCurrentTime()
utils.Debug("开始处理任务项: %d (任务ID: %d)", item.ID, taskID)
2025-08-09 23:47:30 +08:00
// 更新任务项状态为处理中
2025-10-28 11:07:00 +08:00
updateStart := utils.GetCurrentTime()
2025-08-09 23:47:30 +08:00
err := tm.repoMgr.TaskItemRepository.UpdateStatus(item.ID, "processing")
2025-10-28 11:07:00 +08:00
updateDuration := time.Since(updateStart)
2025-08-09 23:47:30 +08:00
if err != nil {
2025-10-28 11:07:00 +08:00
utils.Error("更新任务项状态失败: %v耗时: %v", err, updateDuration)
2025-08-09 23:47:30 +08:00
return fmt.Errorf("更新任务项状态失败: %v", err)
2025-10-28 11:07:00 +08:00
} else {
utils.Debug("更新任务项状态为处理中完成,耗时: %v", updateDuration)
2025-08-09 23:47:30 +08:00
}
// 处理任务项
2025-10-28 11:07:00 +08:00
processStart := utils.GetCurrentTime()
2025-08-09 23:47:30 +08:00
err = processor.Process(ctx, taskID, item)
2025-10-28 11:07:00 +08:00
processDuration := time.Since(processStart)
2025-08-09 23:47:30 +08:00
if err != nil {
// 处理失败
2025-10-28 11:07:00 +08:00
utils.Error("处理任务项 %d 失败: %v处理耗时: %v", item.ID, err, processDuration)
2025-08-09 23:47:30 +08:00
outputData := map[string]interface{}{
"error": err.Error(),
2025-08-11 01:34:07 +08:00
"time": utils.GetCurrentTime(),
2025-10-28 11:07:00 +08:00
"duration_ms": processDuration.Milliseconds(),
2025-08-09 23:47:30 +08:00
}
outputJSON, _ := json.Marshal(outputData)
updateErr := tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "failed", string(outputJSON))
if updateErr != nil {
utils.Error("更新失败任务项状态失败: %v", updateErr)
}
return err
}
// 处理成功
2025-10-28 11:07:00 +08:00
utils.Info("处理任务项 %d 成功,处理耗时: %v", item.ID, processDuration)
2025-09-27 16:14:43 +08:00
// 如果处理器已经设置了 output_data比如 ExpansionProcessor则不覆盖
var outputJSON string
if item.OutputData == "" {
outputData := map[string]interface{}{
"success": true,
"time": utils.GetCurrentTime(),
2025-10-28 11:07:00 +08:00
"duration_ms": processDuration.Milliseconds(),
2025-09-27 16:14:43 +08:00
}
outputBytes, _ := json.Marshal(outputData)
outputJSON = string(outputBytes)
} else {
2025-10-28 11:07:00 +08:00
// 使用处理器设置的 output_data并添加处理时间信息
var existingOutput map[string]interface{}
if json.Unmarshal([]byte(item.OutputData), &existingOutput) == nil {
existingOutput["duration_ms"] = processDuration.Milliseconds()
outputBytes, _ := json.Marshal(existingOutput)
outputJSON = string(outputBytes)
} else {
// 如果无法解析现有输出,保留原样并添加时间信息
outputData := map[string]interface{}{
"original_output": item.OutputData,
"success": true,
"time": utils.GetCurrentTime(),
"duration_ms": processDuration.Milliseconds(),
}
outputBytes, _ := json.Marshal(outputData)
outputJSON = string(outputBytes)
}
2025-08-09 23:47:30 +08:00
}
2025-10-28 11:07:00 +08:00
updateSuccessStart := utils.GetCurrentTime()
2025-09-27 16:14:43 +08:00
err = tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "completed", outputJSON)
2025-10-28 11:07:00 +08:00
updateSuccessDuration := time.Since(updateSuccessStart)
2025-08-09 23:47:30 +08:00
if err != nil {
2025-10-28 11:07:00 +08:00
utils.Error("更新成功任务项状态失败: %v耗时: %v", err, updateSuccessDuration)
} else {
utils.Debug("更新成功任务项状态完成,耗时: %v", updateSuccessDuration)
2025-08-09 23:47:30 +08:00
}
2025-10-28 11:07:00 +08:00
itemDuration := time.Since(itemStartTime)
utils.Debug("任务项 %d 处理完成,总耗时: %v", item.ID, itemDuration)
2025-08-09 23:47:30 +08:00
return nil
}
// updateTaskProgress 更新任务进度
func (tm *TaskManager) updateTaskProgress(taskID uint, progress float64, processed, success, failed int) {
2025-08-10 13:52:41 +08:00
// 更新任务统计信息
err := tm.repoMgr.TaskRepository.UpdateTaskStats(taskID, processed, success, failed)
if err != nil {
utils.Error("更新任务统计信息失败: %v", err)
}
// 更新进度数据(用于兼容性)
2025-08-09 23:47:30 +08:00
progressData := map[string]interface{}{
"progress": progress,
"processed": processed,
"success": success,
"failed": failed,
2025-08-11 01:34:07 +08:00
"time": utils.GetCurrentTime(),
2025-08-09 23:47:30 +08:00
}
progressJSON, _ := json.Marshal(progressData)
2025-08-10 13:52:41 +08:00
err = tm.repoMgr.TaskRepository.UpdateProgress(taskID, progress, string(progressJSON))
2025-08-09 23:47:30 +08:00
if err != nil {
2025-08-10 13:52:41 +08:00
utils.Error("更新任务进度数据失败: %v", err)
2025-08-09 23:47:30 +08:00
}
}
// markTaskFailed 标记任务失败
func (tm *TaskManager) markTaskFailed(taskID uint, message string) {
err := tm.repoMgr.TaskRepository.UpdateStatusAndMessage(taskID, "failed", message)
if err != nil {
utils.Error("标记任务失败状态失败: %v", err)
}
2025-09-27 16:14:43 +08:00
// 更新任务完成时间
err = tm.repoMgr.TaskRepository.UpdateCompletedAt(taskID)
if err != nil {
utils.Error("更新任务完成时间失败: %v", err)
}
2025-08-09 23:47:30 +08:00
}
// GetTaskStatus 获取任务状态
func (tm *TaskManager) GetTaskStatus(taskID uint) (string, error) {
task, err := tm.repoMgr.TaskRepository.GetByID(taskID)
if err != nil {
return "", err
}
return string(task.Status), nil
}
// IsTaskRunning 检查任务是否在运行
func (tm *TaskManager) IsTaskRunning(taskID uint) bool {
tm.mu.RLock()
defer tm.mu.RUnlock()
_, exists := tm.running[taskID]
return exists
}
2025-08-10 13:52:41 +08:00
// RecoverRunningTasks 恢复运行中的任务(服务器重启后调用)
func (tm *TaskManager) RecoverRunningTasks() error {
tm.mu.Lock()
defer tm.mu.Unlock()
utils.Info("开始恢复运行中的任务...")
// 获取数据库中状态为running的任务
tasks, _, err := tm.repoMgr.TaskRepository.GetList(1, 1000, "", "running")
if err != nil {
utils.Error("获取运行中任务失败: %v", err)
return fmt.Errorf("获取运行中任务失败: %v", err)
}
recoveredCount := 0
for _, task := range tasks {
// 检查任务是否已在内存中运行
if _, exists := tm.running[task.ID]; exists {
utils.Info("任务 %d 已在内存中运行,跳过恢复", task.ID)
continue
}
// 获取处理器
processor, exists := tm.processors[string(task.Type)]
if !exists {
utils.Error("未找到任务类型 %s 的处理器,跳过恢复任务 %d", task.Type, task.ID)
// 将任务状态重置为pending避免卡在running状态
tm.repoMgr.TaskRepository.UpdateStatus(task.ID, "pending")
continue
}
// 创建上下文并恢复任务
ctx, cancel := context.WithCancel(context.Background())
tm.running[task.ID] = cancel
utils.Info("恢复任务 %d (类型: %s)", task.ID, task.Type)
go tm.processTask(ctx, task, processor)
recoveredCount++
}
utils.Info("任务恢复完成,共恢复 %d 个任务", recoveredCount)
return nil
}