diff --git a/db/repo/task_repository.go b/db/repo/task_repository.go index 15651cd..f3a0a69 100644 --- a/db/repo/task_repository.go +++ b/db/repo/task_repository.go @@ -1,6 +1,8 @@ package repo import ( + "time" + "github.com/ctwj/urldb/db/entity" "gorm.io/gorm" ) @@ -15,6 +17,8 @@ type TaskRepository interface { UpdateProgress(id uint, progress float64, progressData string) error UpdateStatusAndMessage(id uint, status, message string) error UpdateTaskStats(id uint, processed, success, failed int) error + UpdateStartedAt(id uint) error + UpdateCompletedAt(id uint) error } // TaskRepositoryImpl 任务仓库实现 @@ -134,3 +138,15 @@ func (r *TaskRepositoryImpl) UpdateTaskStats(id uint, processed, success, failed "failed_items": failed, }).Error } + +// UpdateStartedAt 更新任务开始时间 +func (r *TaskRepositoryImpl) UpdateStartedAt(id uint) error { + now := time.Now() + return r.db.Model(&entity.Task{}).Where("id = ?", id).Update("started_at", now).Error +} + +// UpdateCompletedAt 更新任务完成时间 +func (r *TaskRepositoryImpl) UpdateCompletedAt(id uint) error { + now := time.Now() + return r.db.Model(&entity.Task{}).Where("id = ?", id).Update("completed_at", now).Error +} diff --git a/handlers/task_handler.go b/handlers/task_handler.go index a66a367..66990b6 100644 --- a/handlers/task_handler.go +++ b/handlers/task_handler.go @@ -545,3 +545,75 @@ func (h *TaskHandler) GetExpansionAccounts(c *gin.Context) { "message": "获取支持扩容账号列表成功", }) } + +// GetExpansionOutput 获取账号扩容输出数据 +func (h *TaskHandler) GetExpansionOutput(c *gin.Context) { + accountIDStr := c.Param("accountId") + accountID, err := strconv.ParseUint(accountIDStr, 10, 32) + if err != nil { + ErrorResponse(c, "无效的账号ID: "+err.Error(), http.StatusBadRequest) + return + } + + utils.Debug("获取账号扩容输出数据: 账号ID %d", accountID) + + // 获取该账号的所有扩容任务 + tasks, _, err := h.repoMgr.TaskRepository.GetList(1, 1000, "expansion", "completed") + if err != nil { + utils.Error("获取扩容任务列表失败: %v", err) + ErrorResponse(c, "获取扩容任务列表失败: "+err.Error(), http.StatusInternalServerError) + return + } + + // 查找该账号的扩容任务 + var targetTask *entity.Task + for _, task := range tasks { + if task.Config != "" { + var taskConfig map[string]interface{} + if err := json.Unmarshal([]byte(task.Config), &taskConfig); err == nil { + if configAccountID, ok := taskConfig["pan_account_id"].(float64); ok { + if uint(configAccountID) == uint(accountID) { + targetTask = task + break + } + } + } + } + } + + if targetTask == nil { + ErrorResponse(c, "该账号没有完成扩容任务", http.StatusNotFound) + return + } + + // 获取任务项,获取输出数据 + items, _, err := h.repoMgr.TaskItemRepository.GetListByTaskID(targetTask.ID, 1, 10, "completed") + if err != nil { + utils.Error("获取任务项失败: %v", err) + ErrorResponse(c, "获取任务输出数据失败: "+err.Error(), http.StatusInternalServerError) + return + } + + if len(items) == 0 { + ErrorResponse(c, "任务项不存在", http.StatusNotFound) + return + } + + // 返回第一个完成的任务项的输出数据 + taskItem := items[0] + var outputData map[string]interface{} + if taskItem.OutputData != "" { + if err := json.Unmarshal([]byte(taskItem.OutputData), &outputData); err != nil { + utils.Error("解析输出数据失败: %v", err) + ErrorResponse(c, "解析输出数据失败: "+err.Error(), http.StatusInternalServerError) + return + } + } + + SuccessResponse(c, gin.H{ + "task_id": targetTask.ID, + "account_id": accountID, + "output_data": outputData, + "message": "获取扩容输出数据成功", + }) +} diff --git a/task/expansion_processor.go b/task/expansion_processor.go index 9de8472..6e9142a 100644 --- a/task/expansion_processor.go +++ b/task/expansion_processor.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "math/rand" - "net/http" "time" pan "github.com/ctwj/urldb/common" @@ -295,7 +294,7 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID } // 执行转存 - saveURL, err := ep.transferResource(ctx, service, resource) + saveURL, err := ep.transferResource(ctx, service, resource, *account) if err != nil { utils.Error("转存资源失败: %s, 错误: %v", resource.Title, err) totalFailed++ @@ -347,12 +346,14 @@ func (ep *ExpansionProcessor) getResourcesByHot( // getResourcesFromInternalDB 根据 HotDrama 的title 获取数据库中资源,并且资源的类型和 account 的资源类型一致 func (ep *ExpansionProcessor) getResourcesFromInternalDB(HotDrama *entity.HotDrama, account entity.Cks, service pan.PanService) (*entity.Resource, error) { - // 获取账号对应的平台ID - panIDInt, err := ep.repoMgr.PanRepository.FindIdByServiceType(account.ServiceType) - if err != nil { - return nil, fmt.Errorf("获取平台ID失败: %v", err) - } - panID := uint(panIDInt) + // 修改配置 isType = 1 只检测,不转存 + service.UpdateConfig(&pan.PanConfig{ + URL: "", + ExpiredType: 0, + IsType: 1, + Cookie: account.Ck, + }) + panID := account.PanID // 1. 搜索标题 params := map[string]interface{}{ @@ -413,29 +414,9 @@ func (ep *ExpansionProcessor) getHotResources(category string) ([]*entity.HotDra // getResourcesFromThirdPartyAPI 从第三方API获取资源 func (ep *ExpansionProcessor) getResourcesFromThirdPartyAPI(resource *entity.HotDrama, apiURL string) (*entity.Resource, error) { // 构建API请求URL,添加分类参数 - requestURL := fmt.Sprintf("%s?category=%s&limit=20", apiURL, resource) + // requestURL := fmt.Sprintf("%s?category=%s&limit=20", apiURL, resource) - // 发送HTTP请求 - client := &http.Client{Timeout: 30 * time.Second} - resp, err := client.Get(requestURL) - if err != nil { - return nil, fmt.Errorf("请求第三方API失败: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("第三方API返回错误状态码: %d", resp.StatusCode) - } - - // 解析响应数据(假设API返回JSON格式的资源列表) - var apiResponse struct { - Data []*entity.HotDrama `json:"data"` - } - - err = json.NewDecoder(resp.Body).Decode(&apiResponse) - if err != nil { - return nil, fmt.Errorf("解析第三方API响应失败: %v", err) - } + // TODO 使用第三方API接口,请求资源 return nil, nil } @@ -463,7 +444,15 @@ func (ep *ExpansionProcessor) checkStorageSpace(service pan.PanService, ck *stri } // transferResource 执行单个资源的转存 -func (ep *ExpansionProcessor) transferResource(ctx context.Context, service pan.PanService, res *entity.Resource) (string, error) { +func (ep *ExpansionProcessor) transferResource(ctx context.Context, service pan.PanService, res *entity.Resource, account entity.Cks) (string, error) { + // 修改配置 isType = 0 转存 + service.UpdateConfig(&pan.PanConfig{ + URL: "", + ExpiredType: 0, + IsType: 0, + Cookie: account.Ck, + }) + // 如果没有URL,跳过转存 if res.URL == "" { return "", fmt.Errorf("资源 %s 没有有效的URL", res.URL) @@ -510,35 +499,35 @@ func (ep *ExpansionProcessor) transferResource(ctx context.Context, service pan. } // recordTransferredResource 记录转存成功的资源 -func (ep *ExpansionProcessor) recordTransferredResource(drama *entity.HotDrama, accountID uint, saveURL string) error { - // 获取夸克网盘的平台ID - panIDInt, err := ep.repoMgr.PanRepository.FindIdByServiceType("quark") - if err != nil { - utils.Error("获取夸克网盘平台ID失败: %v", err) - return err - } +// func (ep *ExpansionProcessor) recordTransferredResource(drama *entity.HotDrama, accountID uint, saveURL string) error { +// // 获取夸克网盘的平台ID +// panIDInt, err := ep.repoMgr.PanRepository.FindIdByServiceType("quark") +// if err != nil { +// utils.Error("获取夸克网盘平台ID失败: %v", err) +// return err +// } - // 转换为uint - panID := uint(panIDInt) +// // 转换为uint +// panID := uint(panIDInt) - // 创建资源记录 - resource := &entity.Resource{ - Title: drama.Title, - URL: drama.PosterURL, - SaveURL: saveURL, - PanID: &panID, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - IsValid: true, - IsPublic: false, // 扩容资源默认不公开 - } +// // 创建资源记录 +// resource := &entity.Resource{ +// Title: drama.Title, +// URL: drama.PosterURL, +// SaveURL: saveURL, +// PanID: &panID, +// CreatedAt: time.Now(), +// UpdatedAt: time.Now(), +// IsValid: true, +// IsPublic: false, // 扩容资源默认不公开 +// } - // 保存到数据库 - err = ep.repoMgr.ResourceRepository.Create(resource) - if err != nil { - return fmt.Errorf("保存资源记录失败: %v", err) - } +// // 保存到数据库 +// err = ep.repoMgr.ResourceRepository.Create(resource) +// if err != nil { +// return fmt.Errorf("保存资源记录失败: %v", err) +// } - utils.Info("成功记录转存资源: %s (ID: %d)", drama.Title, resource.ID) - return nil -} +// utils.Info("成功记录转存资源: %s (ID: %d)", drama.Title, resource.ID) +// return nil +// } diff --git a/task/task_processor.go b/task/task_processor.go index 59ae04f..c3a9437 100644 --- a/task/task_processor.go +++ b/task/task_processor.go @@ -201,6 +201,12 @@ func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, proce return } + // 更新任务开始时间 + err = tm.repoMgr.TaskRepository.UpdateStartedAt(task.ID) + if err != nil { + utils.Error("更新任务开始时间失败: %v", err) + } + // 获取任务项统计信息,用于计算正确的进度 stats, err := tm.repoMgr.TaskItemRepository.GetStatsByTaskID(task.ID) if err != nil { @@ -294,6 +300,14 @@ func (tm *TaskManager) processTask(ctx context.Context, task *entity.Task, proce utils.Error("更新任务状态失败: %v", err) } + // 如果任务完成,更新完成时间 + if status == "completed" || status == "failed" || status == "partial_success" { + err = tm.repoMgr.TaskRepository.UpdateCompletedAt(task.ID) + if err != nil { + utils.Error("更新任务完成时间失败: %v", err) + } + } + utils.Info("任务 %d 处理完成: %s", task.ID, message) } @@ -324,13 +338,21 @@ func (tm *TaskManager) processTaskItem(ctx context.Context, taskID uint, item *e } // 处理成功 - outputData := map[string]interface{}{ - "success": true, - "time": utils.GetCurrentTime(), + // 如果处理器已经设置了 output_data(比如 ExpansionProcessor),则不覆盖 + var outputJSON string + if item.OutputData == "" { + outputData := map[string]interface{}{ + "success": true, + "time": utils.GetCurrentTime(), + } + outputBytes, _ := json.Marshal(outputData) + outputJSON = string(outputBytes) + } else { + // 使用处理器设置的 output_data + outputJSON = item.OutputData } - outputJSON, _ := json.Marshal(outputData) - err = tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "completed", string(outputJSON)) + err = tm.repoMgr.TaskItemRepository.UpdateStatusAndOutput(item.ID, "completed", outputJSON) if err != nil { utils.Error("更新成功任务项状态失败: %v", err) } @@ -369,6 +391,12 @@ func (tm *TaskManager) markTaskFailed(taskID uint, message string) { if err != nil { utils.Error("标记任务失败状态失败: %v", err) } + + // 更新任务完成时间 + err = tm.repoMgr.TaskRepository.UpdateCompletedAt(taskID) + if err != nil { + utils.Error("更新任务完成时间失败: %v", err) + } } // GetTaskStatus 获取任务状态 diff --git a/web/composables/useApi.ts b/web/composables/useApi.ts index 93d2610..e2f504d 100644 --- a/web/composables/useApi.ts +++ b/web/composables/useApi.ts @@ -253,7 +253,8 @@ export const useTaskApi = () => { const pauseTask = (id: number) => useApiFetch(`/tasks/${id}/pause`, { method: 'POST' }).then(parseApiResponse) const deleteTask = (id: number) => useApiFetch(`/tasks/${id}`, { method: 'DELETE' }).then(parseApiResponse) const getTaskItems = (id: number, params?: any) => useApiFetch(`/tasks/${id}/items`, { params }).then(parseApiResponse) - return { createBatchTransferTask, createExpansionTask, getExpansionAccounts, getTasks, getTaskStatus, startTask, stopTask, pauseTask, deleteTask, getTaskItems } + const getExpansionOutput = (accountId: number) => useApiFetch(`/tasks/expansion/accounts/${accountId}/output`).then(parseApiResponse) + return { createBatchTransferTask, createExpansionTask, getExpansionAccounts, getTasks, getTaskStatus, startTask, stopTask, pauseTask, deleteTask, getTaskItems, getExpansionOutput } } // 日志函数:只在开发环境打印 diff --git a/web/pages/admin/accounts-expansion.vue b/web/pages/admin/accounts-expansion.vue index 991bc6d..def729b 100644 --- a/web/pages/admin/accounts-expansion.vue +++ b/web/pages/admin/accounts-expansion.vue @@ -113,6 +113,19 @@ {{ item.expanded ? '已扩容' : '扩容' }} + + + + + 提取 + @@ -218,6 +231,62 @@ + + + + +
+ +
+ + + +
+ + +
+ +
+ + + + 仅链接 + + + + 标题|链接 + + + + 标题/n链接 + + + + +
+ + +
+ +
+ + +
+ 复制 + 关闭 +
+
+
+
+