update: 更新扩容功能

This commit is contained in:
Kerwin
2025-09-26 17:25:30 +08:00
parent 8be837fcbf
commit 193ed24316
4 changed files with 130 additions and 34 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"time"
@@ -36,12 +37,19 @@ type ExpansionInput struct {
DataSource map[string]interface{} `json:"data_source,omitempty"`
}
// TransferredResource 转存成功的资源信息
type TransferredResource struct {
Title string `json:"title"`
URL string `json:"url"`
}
// ExpansionOutput 扩容任务输出数据结构
type ExpansionOutput struct {
Success bool `json:"success"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
Time string `json:"time"`
Success bool `json:"success"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
Time string `json:"time"`
TransferredResources []TransferredResource `json:"transferred_resources,omitempty"`
}
// Process 处理扩容任务项
@@ -98,7 +106,8 @@ func (ep *ExpansionProcessor) Process(ctx context.Context, taskID uint, item *en
}
// 执行扩容操作(传入数据源)
if err := ep.performExpansion(ctx, input.PanAccountID, input.DataSource); err != nil {
transferred, err := ep.performExpansion(ctx, input.PanAccountID, input.DataSource)
if err != nil {
output := ExpansionOutput{
Success: false,
Message: "扩容失败",
@@ -115,9 +124,10 @@ func (ep *ExpansionProcessor) Process(ctx context.Context, taskID uint, item *en
// 扩容成功
output := ExpansionOutput{
Success: true,
Message: "扩容成功",
Time: utils.GetCurrentTimeString(),
Success: true,
Message: "扩容成功",
Time: utils.GetCurrentTimeString(),
TransferredResources: transferred,
}
outputJSON, _ := json.Marshal(output)
@@ -181,13 +191,16 @@ func (ep *ExpansionProcessor) checkAccountType(panAccountID uint) error {
}
// performExpansion 执行扩容操作
func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID uint, dataSource map[string]interface{}) error {
func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID uint, dataSource map[string]interface{}) ([]TransferredResource, error) {
rand.Seed(time.Now().UnixNano())
utils.Info("执行扩容操作账号ID: %d, 数据源: %v", panAccountID, dataSource)
transferred := []TransferredResource{}
// 获取账号信息
account, err := ep.repoMgr.CksRepository.FindByID(panAccountID)
if err != nil {
return fmt.Errorf("获取账号信息失败: %v", err)
return nil, fmt.Errorf("获取账号信息失败: %v", err)
}
// 创建网盘服务工厂
@@ -199,7 +212,7 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID
Cookie: account.Ck,
})
if err != nil {
return fmt.Errorf("创建网盘服务失败: %v", err)
return nil, fmt.Errorf("创建网盘服务失败: %v", err)
}
service.SetCKSRepository(ep.repoMgr.CksRepository, *account)
@@ -234,7 +247,7 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID
utils.Info("开始处理分类: %s", category)
// 获取该分类的资源
resources, err := ep.getResourcesForCategory(category, dataSourceType, thirdPartyURL)
resources, err := ep.getHotResources(category)
if err != nil {
utils.Error("获取分类 %s 的资源失败: %v", category, err)
continue
@@ -260,12 +273,25 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID
hasSpace, err := ep.checkStorageSpace(service, &account.Ck)
if err != nil {
utils.Error("检查存储空间失败: %v", err)
return fmt.Errorf("检查存储空间失败: %v", err)
return transferred, fmt.Errorf("检查存储空间失败: %v", err)
}
if !hasSpace {
utils.Info("存储空间不足,停止扩容")
return fmt.Errorf("存储空间不足,无法继续扩容")
utils.Info("存储空间不足,停止扩容,但保存已转存的资源")
// 存储空间不足时,停止继续转存,但返回已转存的资源作为成功结果
break
}
// 获取资源 , dataSourceType, thirdPartyURL
resource, err := ep.getResourcesByHot(resource, dataSourceType, thirdPartyURL, *account, service)
if resource == nil || err != nil {
if resource != nil {
utils.Error("获取资源失败: %s, 错误: %v", resource.Title, err)
} else {
utils.Error("获取资源失败, 错误: %v", err)
}
totalFailed++
continue
}
// 执行转存
@@ -276,12 +302,15 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID
continue
}
// 记录转存成功的资源
err = ep.recordTransferredResource(resource, account.ID, saveURL)
if err != nil {
utils.Error("记录转存资源失败: %v", err)
// 不影响扩容过程,继续处理
}
// 随机休眠1-3秒避免请求过于频繁
sleepDuration := time.Duration(rand.Intn(3)+1) * time.Second
time.Sleep(sleepDuration)
// 保存转存结果到任务输出
transferred = append(transferred, TransferredResource{
Title: resource.Title,
URL: saveURL,
})
totalTransferred++
transferredCount++
@@ -297,22 +326,67 @@ func (ep *ExpansionProcessor) performExpansion(ctx context.Context, panAccountID
}
utils.Info("扩容完成,总共转存: %d 个资源,失败: %d 个资源", totalTransferred, totalFailed)
return nil
return transferred, nil
}
// getResourcesForCategory 获取指定分类的资源
func (ep *ExpansionProcessor) getResourcesForCategory(category, dataSourceType, thirdPartyURL string) ([]*entity.HotDrama, error) {
func (ep *ExpansionProcessor) getResourcesByHot(
resource *entity.HotDrama, dataSourceType,
thirdPartyURL string,
entity entity.Cks,
service pan.PanService,
) (*entity.Resource, error) {
if dataSourceType == "third-party" && thirdPartyURL != "" {
// 从第三方API获取资源
return ep.getResourcesFromThirdPartyAPI(category, thirdPartyURL)
return ep.getResourcesFromThirdPartyAPI(resource, thirdPartyURL)
}
// 从内部数据库获取资源
return ep.getResourcesFromInternalDB(category)
return ep.getResourcesFromInternalDB(resource, entity, service)
}
// getResourcesFromInternalDB 根据 HotDrama 的title 获取数据库中资源,并且资源的类型和 account 的资源类型一致
func (ep *ExpansionProcessor) getResourcesFromInternalDB(HotDrama *entity.HotDrama, account entity.Cks, service pan.PanService) (*entity.Resource, error) {
// 获取账号对应的平台ID
panIDInt, err := ep.repoMgr.PanRepository.FindIdByServiceType(account.ServiceType)
if err != nil {
return nil, fmt.Errorf("获取平台ID失败: %v", err)
}
panID := uint(panIDInt)
// 1. 搜索标题
params := map[string]interface{}{
"search": HotDrama.Title,
"pan_id": panID,
"is_valid": true,
"page": 1,
"page_size": 10,
}
resources, _, err := ep.repoMgr.ResourceRepository.SearchWithFilters(params)
if err != nil {
return nil, fmt.Errorf("搜索资源失败: %v", err)
}
// 检查结果是否有效,通过服务验证
for _, res := range resources {
if res.IsValid && res.URL != "" {
// 使用服务验证资源是否可转存
shareID, _ := pan.ExtractShareId(res.URL)
if shareID != "" {
result, err := service.Transfer(shareID)
if err == nil && result != nil && result.Success {
return &res, nil
}
}
}
}
// 3. 没有有效资源,返回错误信息
return nil, fmt.Errorf("未找到有效的资源")
}
// getResourcesFromInternalDB 从内部数据库获取资源
func (ep *ExpansionProcessor) getResourcesFromInternalDB(category string) ([]*entity.HotDrama, error) {
func (ep *ExpansionProcessor) getHotResources(category string) ([]*entity.HotDrama, error) {
// 获取该分类下sub_type为"排行"的资源
dramas, _, err := ep.repoMgr.HotDramaRepository.FindByCategoryAndSubType(category, "排行", 1, 20)
if err != nil {
@@ -337,9 +411,9 @@ func (ep *ExpansionProcessor) getResourcesFromInternalDB(category string) ([]*en
}
// getResourcesFromThirdPartyAPI 从第三方API获取资源
func (ep *ExpansionProcessor) getResourcesFromThirdPartyAPI(category, apiURL string) ([]*entity.HotDrama, error) {
func (ep *ExpansionProcessor) getResourcesFromThirdPartyAPI(resource *entity.HotDrama, apiURL string) (*entity.Resource, error) {
// 构建API请求URL添加分类参数
requestURL := fmt.Sprintf("%s?category=%s&limit=20", apiURL, category)
requestURL := fmt.Sprintf("%s?category=%s&limit=20", apiURL, resource)
// 发送HTTP请求
client := &http.Client{Timeout: 30 * time.Second}
@@ -363,7 +437,7 @@ func (ep *ExpansionProcessor) getResourcesFromThirdPartyAPI(category, apiURL str
return nil, fmt.Errorf("解析第三方API响应失败: %v", err)
}
return apiResponse.Data, nil
return nil, nil
}
// checkStorageSpace 检查存储空间是否足够
@@ -389,16 +463,16 @@ func (ep *ExpansionProcessor) checkStorageSpace(service pan.PanService, ck *stri
}
// transferResource 执行单个资源的转存
func (ep *ExpansionProcessor) transferResource(ctx context.Context, service pan.PanService, drama *entity.HotDrama) (string, error) {
func (ep *ExpansionProcessor) transferResource(ctx context.Context, service pan.PanService, res *entity.Resource) (string, error) {
// 如果没有URL跳过转存
if drama.PosterURL == "" {
return "", fmt.Errorf("资源 %s 没有有效的URL", drama.Title)
if res.URL == "" {
return "", fmt.Errorf("资源 %s 没有有效的URL", res.URL)
}
// 提取分享ID
shareID, _ := pan.ExtractShareId(drama.PosterURL)
shareID, _ := pan.ExtractShareId(res.URL)
if shareID == "" {
return "", fmt.Errorf("无法从URL %s 提取分享ID", drama.PosterURL)
return "", fmt.Errorf("无法从URL %s 提取分享ID", res.URL)
}
// 执行转存