Files
urldb/utils/scheduler.go

575 lines
17 KiB
Go
Raw Normal View History

2025-07-11 17:45:16 +08:00
package utils
import (
"log"
"strings"
2025-07-12 21:23:23 +08:00
"sync"
2025-07-11 17:45:16 +08:00
"time"
2025-07-17 14:08:52 +08:00
panutils "github.com/ctwj/panResManage/common"
commonutils "github.com/ctwj/panResManage/common/utils"
"github.com/ctwj/panResManage/db/entity"
"github.com/ctwj/panResManage/db/repo"
2025-07-11 17:45:16 +08:00
)
// Scheduler 定时任务管理器
type Scheduler struct {
2025-07-12 21:23:23 +08:00
doubanService *DoubanService
hotDramaRepo repo.HotDramaRepository
readyResourceRepo repo.ReadyResourceRepository
resourceRepo repo.ResourceRepository
systemConfigRepo repo.SystemConfigRepository
2025-07-15 12:50:24 +08:00
panRepo repo.PanRepository
2025-07-12 21:23:23 +08:00
stopChan chan bool
isRunning bool
readyResourceRunning bool
processingMutex sync.Mutex // 防止ready_resource任务重叠执行
hotDramaMutex sync.Mutex // 防止热播剧任务重叠执行
2025-07-15 12:50:24 +08:00
// 平台映射缓存
panCache map[string]*uint // serviceType -> panID
panCacheOnce sync.Once
2025-07-11 17:45:16 +08:00
}
// NewScheduler 创建新的定时任务管理器
2025-07-15 12:50:24 +08:00
func NewScheduler(hotDramaRepo repo.HotDramaRepository, readyResourceRepo repo.ReadyResourceRepository, resourceRepo repo.ResourceRepository, systemConfigRepo repo.SystemConfigRepository, panRepo repo.PanRepository) *Scheduler {
2025-07-11 17:45:16 +08:00
return &Scheduler{
2025-07-12 21:23:23 +08:00
doubanService: NewDoubanService(),
hotDramaRepo: hotDramaRepo,
readyResourceRepo: readyResourceRepo,
resourceRepo: resourceRepo,
systemConfigRepo: systemConfigRepo,
2025-07-15 12:50:24 +08:00
panRepo: panRepo,
2025-07-12 21:23:23 +08:00
stopChan: make(chan bool),
isRunning: false,
readyResourceRunning: false,
processingMutex: sync.Mutex{},
hotDramaMutex: sync.Mutex{},
2025-07-15 12:50:24 +08:00
panCache: make(map[string]*uint),
2025-07-11 17:45:16 +08:00
}
}
// StartHotDramaScheduler 启动热播剧定时任务
func (s *Scheduler) StartHotDramaScheduler() {
if s.isRunning {
log.Println("热播剧定时任务已在运行中")
return
}
s.isRunning = true
log.Println("启动热播剧定时任务")
go func() {
2025-07-15 18:44:09 +08:00
ticker := time.NewTicker(12 * time.Hour) // 每12小时执行一次
2025-07-11 17:45:16 +08:00
defer ticker.Stop()
// 立即执行一次
s.fetchHotDramaData()
for {
select {
case <-ticker.C:
2025-07-12 21:23:23 +08:00
// 使用TryLock防止任务重叠执行
if s.hotDramaMutex.TryLock() {
go func() {
defer s.hotDramaMutex.Unlock()
s.fetchHotDramaData()
}()
} else {
log.Println("上一次热播剧任务还在执行中,跳过本次执行")
}
2025-07-11 17:45:16 +08:00
case <-s.stopChan:
log.Println("停止热播剧定时任务")
return
}
}
}()
}
// StopHotDramaScheduler 停止热播剧定时任务
func (s *Scheduler) StopHotDramaScheduler() {
if !s.isRunning {
log.Println("热播剧定时任务未在运行")
return
}
s.stopChan <- true
s.isRunning = false
log.Println("已发送停止信号给热播剧定时任务")
}
// fetchHotDramaData 获取热播剧数据
func (s *Scheduler) fetchHotDramaData() {
log.Println("开始获取热播剧数据...")
2025-07-15 18:44:09 +08:00
// 直接处理电影和电视剧数据不再需要FetchHotDramaNames
s.processHotDramaNames([]string{})
2025-07-11 17:45:16 +08:00
}
// processHotDramaNames 处理热播剧名字
func (s *Scheduler) processHotDramaNames(dramaNames []string) {
log.Printf("开始处理热播剧数据,共 %d 个", len(dramaNames))
2025-07-15 18:44:09 +08:00
// 收集所有数据
var allDramas []*entity.HotDrama
// 获取电影数据
movieDramas := s.processMovieData()
allDramas = append(allDramas, movieDramas...)
// 获取电视剧数据
tvDramas := s.processTvData()
allDramas = append(allDramas, tvDramas...)
// 清空数据库
log.Printf("准备清空数据库,当前共有 %d 条数据", len(allDramas))
if err := s.hotDramaRepo.DeleteAll(); err != nil {
log.Printf("清空数据库失败: %v", err)
return
}
log.Println("数据库清空完成")
// 批量插入所有数据
if len(allDramas) > 0 {
log.Printf("开始批量插入 %d 条数据", len(allDramas))
if err := s.hotDramaRepo.BatchCreate(allDramas); err != nil {
log.Printf("批量插入数据失败: %v", err)
} else {
log.Printf("成功批量插入 %d 条数据", len(allDramas))
}
} else {
log.Println("没有数据需要插入")
}
2025-07-11 17:45:16 +08:00
log.Println("热播剧数据处理完成")
}
// processMovieData 处理电影数据
2025-07-15 18:44:09 +08:00
func (s *Scheduler) processMovieData() []*entity.HotDrama {
2025-07-11 17:45:16 +08:00
log.Println("开始处理电影数据...")
2025-07-15 18:44:09 +08:00
var movieDramas []*entity.HotDrama
// 使用GetTypePage方法获取电影数据
movieResult, err := s.doubanService.GetTypePage("热门", "全部")
2025-07-11 17:45:16 +08:00
if err != nil {
log.Printf("获取电影榜单失败: %v", err)
2025-07-15 18:44:09 +08:00
return movieDramas
2025-07-11 17:45:16 +08:00
}
if movieResult.Success && movieResult.Data != nil {
2025-07-15 18:44:09 +08:00
log.Printf("电影获取到 %d 个数据", len(movieResult.Data.Items))
2025-07-11 17:45:16 +08:00
for _, item := range movieResult.Data.Items {
drama := &entity.HotDrama{
2025-07-15 16:34:21 +08:00
Title: item.Title,
CardSubtitle: item.CardSubtitle,
EpisodesInfo: item.EpisodesInfo,
IsNew: item.IsNew,
Rating: item.Rating.Value,
RatingCount: item.Rating.Count,
Year: item.Year,
Region: item.Region,
Genres: strings.Join(item.Genres, ", "),
Directors: strings.Join(item.Directors, ", "),
Actors: strings.Join(item.Actors, ", "),
PosterURL: item.Pic.Normal,
Category: "电影",
SubType: "热门",
Source: "douban",
DoubanID: item.ID,
DoubanURI: item.URI,
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
movieDramas = append(movieDramas, drama)
log.Printf("收集电影: %s (评分: %.1f, 年份: %s, 地区: %s)",
item.Title, item.Rating.Value, item.Year, item.Region)
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
} else {
log.Printf("电影获取数据失败或为空")
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
log.Printf("电影数据处理完成,共收集 %d 条数据", len(movieDramas))
return movieDramas
2025-07-11 17:45:16 +08:00
}
// processTvData 处理电视剧数据
2025-07-15 18:44:09 +08:00
func (s *Scheduler) processTvData() []*entity.HotDrama {
2025-07-11 17:45:16 +08:00
log.Println("开始处理电视剧数据...")
2025-07-15 18:44:09 +08:00
var tvDramas []*entity.HotDrama
2025-07-11 17:45:16 +08:00
2025-07-15 18:44:09 +08:00
// 获取所有tv类型
tvTypes := s.doubanService.GetAllTvTypes()
log.Printf("获取到 %d 个tv类型: %v", len(tvTypes), tvTypes)
// 遍历每个type分别请求数据
for _, tvType := range tvTypes {
log.Printf("正在处理tv类型: %s", tvType)
2025-07-11 17:45:16 +08:00
2025-07-15 18:44:09 +08:00
// 使用GetTypePage方法请求数据
tvResult, err := s.doubanService.GetTypePage("tv", tvType)
if err != nil {
log.Printf("获取tv类型 %s 数据失败: %v", tvType, err)
continue
}
if tvResult.Success && tvResult.Data != nil {
log.Printf("tv类型 %s 获取到 %d 个数据", tvType, len(tvResult.Data.Items))
for _, item := range tvResult.Data.Items {
drama := &entity.HotDrama{
Title: item.Title,
CardSubtitle: item.CardSubtitle,
EpisodesInfo: item.EpisodesInfo,
IsNew: item.IsNew,
Rating: item.Rating.Value,
RatingCount: item.Rating.Count,
Year: item.Year,
Region: item.Region,
Genres: strings.Join(item.Genres, ", "),
Directors: strings.Join(item.Directors, ", "),
Actors: strings.Join(item.Actors, ", "),
PosterURL: item.Pic.Normal,
Category: "电视剧",
SubType: tvType, // 使用具体的tv类型
Source: "douban",
DoubanID: item.ID,
DoubanURI: item.URI,
}
tvDramas = append(tvDramas, drama)
log.Printf("收集tv类型 %s: %s (评分: %.1f, 年份: %s, 地区: %s)",
tvType, item.Title, item.Rating.Value, item.Year, item.Region)
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
} else {
log.Printf("tv类型 %s 获取数据失败或为空", tvType)
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
// 每个type请求间隔1秒避免请求过于频繁
time.Sleep(1 * time.Second)
2025-07-11 17:45:16 +08:00
}
2025-07-15 18:44:09 +08:00
log.Printf("电视剧数据处理完成,共收集 %d 条数据", len(tvDramas))
return tvDramas
2025-07-11 17:45:16 +08:00
}
// IsRunning 检查定时任务是否在运行
func (s *Scheduler) IsRunning() bool {
return s.isRunning
}
// GetHotDramaNames 手动获取热播剧名字(用于测试或手动调用)
func (s *Scheduler) GetHotDramaNames() ([]string, error) {
2025-07-15 18:44:09 +08:00
// 由于删除了FetchHotDramaNames方法返回空数组
return []string{}, nil
2025-07-11 17:45:16 +08:00
}
2025-07-12 21:23:23 +08:00
// StartReadyResourceScheduler 启动待处理资源自动处理任务
func (s *Scheduler) StartReadyResourceScheduler() {
if s.readyResourceRunning {
log.Println("待处理资源自动处理任务已在运行中")
return
}
s.readyResourceRunning = true
log.Println("启动待处理资源自动处理任务")
go func() {
2025-07-15 12:50:24 +08:00
// 获取系统配置中的间隔时间
config, err := s.systemConfigRepo.GetOrCreateDefault()
interval := 5 * time.Minute // 默认5分钟
if err == nil && config.AutoProcessInterval > 0 {
interval = time.Duration(config.AutoProcessInterval) * time.Minute
}
ticker := time.NewTicker(interval)
2025-07-12 21:23:23 +08:00
defer ticker.Stop()
2025-07-15 12:50:24 +08:00
log.Printf("待处理资源自动处理任务已启动,间隔时间: %v", interval)
2025-07-12 21:23:23 +08:00
// 立即执行一次
s.processReadyResources()
for {
select {
case <-ticker.C:
// 使用TryLock防止任务重叠执行
if s.processingMutex.TryLock() {
go func() {
defer s.processingMutex.Unlock()
s.processReadyResources()
}()
} else {
log.Println("上一次待处理资源任务还在执行中,跳过本次执行")
}
case <-s.stopChan:
log.Println("停止待处理资源自动处理任务")
return
}
}
}()
}
// StopReadyResourceScheduler 停止待处理资源自动处理任务
func (s *Scheduler) StopReadyResourceScheduler() {
if !s.readyResourceRunning {
log.Println("待处理资源自动处理任务未在运行")
return
}
s.stopChan <- true
s.readyResourceRunning = false
log.Println("已发送停止信号给待处理资源自动处理任务")
}
// processReadyResources 处理待处理资源
func (s *Scheduler) processReadyResources() {
log.Println("开始处理待处理资源...")
// 检查系统配置,确认是否启用自动处理
config, err := s.systemConfigRepo.GetOrCreateDefault()
if err != nil {
log.Printf("获取系统配置失败: %v", err)
return
}
if !config.AutoProcessReadyResources {
log.Println("自动处理待处理资源功能已禁用")
return
}
// 获取所有待处理资源
readyResources, err := s.readyResourceRepo.FindAll()
if err != nil {
log.Printf("获取待处理资源失败: %v", err)
return
}
if len(readyResources) == 0 {
log.Println("没有待处理的资源")
return
}
log.Printf("找到 %d 个待处理资源,开始处理...", len(readyResources))
processedCount := 0
2025-07-13 09:04:50 +08:00
factory := panutils.GetInstance() // 使用单例模式
2025-07-12 21:23:23 +08:00
for _, readyResource := range readyResources {
//readyResource.URL 是 查重
exits, err := s.resourceRepo.FindExists(readyResource.URL)
if err != nil {
log.Printf("查重失败: %v", err)
continue
}
if exits {
log.Printf("资源已存在: %s", readyResource.URL)
s.readyResourceRepo.Delete(readyResource.ID)
continue
}
2025-07-13 08:02:55 +08:00
if err := s.convertReadyResourceToResource(readyResource, factory); err != nil {
2025-07-12 21:23:23 +08:00
log.Printf("处理资源失败 (ID: %d): %v", readyResource.ID, err)
}
s.readyResourceRepo.Delete(readyResource.ID)
processedCount++
log.Printf("成功处理资源: %s", readyResource.URL)
}
log.Printf("待处理资源处理完成,共处理 %d 个资源", processedCount)
}
// convertReadyResourceToResource 将待处理资源转换为正式资源
2025-07-13 08:02:55 +08:00
func (s *Scheduler) convertReadyResourceToResource(readyResource entity.ReadyResource, factory *panutils.PanFactory) error {
log.Printf("开始处理资源: %s", readyResource.URL)
2025-07-13 09:04:50 +08:00
// 提取分享ID和服务类型
shareID, serviceType := panutils.ExtractShareId(readyResource.URL)
if serviceType == panutils.NotFound {
log.Printf("不支持的链接地址: %s", readyResource.URL)
return nil
}
log.Printf("检测到服务类型: %s, 分享ID: %s", serviceType.String(), shareID)
2025-07-13 09:57:36 +08:00
// 不是夸克,直接保存,
if serviceType != panutils.Quark {
// 检测是否有效
checkResult, _ := commonutils.CheckURL(readyResource.URL)
if !checkResult.Status {
log.Printf("链接无效: %s", readyResource.URL)
return nil
}
// 入库
}
2025-07-13 09:04:50 +08:00
// 准备配置
2025-07-13 08:02:55 +08:00
config := &panutils.PanConfig{
URL: readyResource.URL,
Code: "", // 可以从readyResource中获取
2025-07-13 09:57:36 +08:00
IsType: 1, // 转存并分享后的资源信息 0 转存后分享, 1 只获取基本信息
2025-07-13 08:02:55 +08:00
ExpiredType: 1, // 永久分享
AdFid: "",
Stoken: "",
}
2025-07-13 09:04:50 +08:00
// 通过工厂获取对应的网盘服务单例
2025-07-13 08:02:55 +08:00
panService, err := factory.CreatePanService(readyResource.URL, config)
if err != nil {
2025-07-13 09:04:50 +08:00
log.Printf("获取网盘服务失败: %v", err)
2025-07-13 08:02:55 +08:00
return err
}
2025-07-13 09:04:50 +08:00
// 阿里云盘特殊处理检查URL有效性
2025-07-13 09:57:36 +08:00
// if serviceType == panutils.Alipan {
// checkResult, _ := CheckURL(readyResource.URL)
// if !checkResult.Status {
// log.Printf("阿里云盘链接无效: %s", readyResource.URL)
// return nil
// }
// // 如果有标题,直接创建资源
// if readyResource.Title != nil && *readyResource.Title != "" {
// resource := &entity.Resource{
// Title: *readyResource.Title,
// Description: readyResource.Description,
// URL: readyResource.URL,
// PanID: s.determinePanID(readyResource.URL),
// IsValid: true,
// IsPublic: true,
// }
// // 如果有分类信息,尝试查找或创建分类
// if readyResource.Category != "" {
// categoryID, err := s.getOrCreateCategory(readyResource.Category)
// if err == nil {
// resource.CategoryID = &categoryID
// }
// }
// return s.resourceRepo.Create(resource)
// }
// }
2025-07-13 08:02:55 +08:00
2025-07-13 09:04:50 +08:00
// 统一处理:尝试转存获取标题
result, err := panService.Transfer(shareID)
if err != nil {
2025-07-13 09:57:36 +08:00
log.Printf("网盘信息获取失败: %v", err)
2025-07-13 09:04:50 +08:00
return err
}
2025-07-13 08:02:55 +08:00
2025-07-13 09:04:50 +08:00
if !result.Success {
2025-07-13 09:57:36 +08:00
log.Printf("网盘信息获取失败: %s", result.Message)
2025-07-13 09:04:50 +08:00
return nil
}
2025-07-13 08:02:55 +08:00
2025-07-13 09:04:50 +08:00
// 提取转存结果
if resultData, ok := result.Data.(map[string]interface{}); ok {
title := resultData["title"].(string)
shareURL := resultData["shareUrl"].(string)
// fid := resultData["fid"].(string) // 暂时未使用
// 创建资源记录
resource := &entity.Resource{
Title: title,
Description: readyResource.Description,
URL: shareURL,
2025-07-15 12:50:24 +08:00
PanID: s.getPanIDByServiceType(serviceType),
2025-07-13 09:04:50 +08:00
IsValid: true,
IsPublic: true,
}
2025-07-13 08:02:55 +08:00
2025-07-13 09:04:50 +08:00
// 如果有分类信息,尝试查找或创建分类
if readyResource.Category != "" {
categoryID, err := s.getOrCreateCategory(readyResource.Category)
if err == nil {
resource.CategoryID = &categoryID
2025-07-13 08:02:55 +08:00
}
}
2025-07-13 09:04:50 +08:00
return s.resourceRepo.Create(resource)
2025-07-12 21:23:23 +08:00
}
2025-07-13 09:04:50 +08:00
log.Printf("转存结果格式异常")
2025-07-12 21:23:23 +08:00
return nil
}
// getOrCreateCategory 获取或创建分类
func (s *Scheduler) getOrCreateCategory(categoryName string) (uint, error) {
// 这里需要实现分类的查找和创建逻辑
// 由于没有CategoryRepository的注入这里先返回0
// 你可以根据需要添加CategoryRepository的依赖
return 0, nil
}
2025-07-15 12:50:24 +08:00
// initPanCache 初始化平台映射缓存
func (s *Scheduler) initPanCache() {
s.panCacheOnce.Do(func() {
// 获取所有平台数据
pans, err := s.panRepo.FindAll()
if err != nil {
log.Printf("初始化平台缓存失败: %v", err)
return
}
// 建立 ServiceType 到 PanID 的映射
serviceTypeToPanName := map[string]string{
"quark": "quark",
"alipan": "aliyun", // 阿里云盘在数据库中的名称是 aliyun
"baidu": "baidu",
"uc": "uc",
"unknown": "other",
}
// 创建平台名称到ID的映射
panNameToID := make(map[string]*uint)
for _, pan := range pans {
panID := pan.ID
panNameToID[pan.Name] = &panID
}
// 建立 ServiceType 到 PanID 的映射
for serviceType, panName := range serviceTypeToPanName {
if panID, exists := panNameToID[panName]; exists {
s.panCache[serviceType] = panID
log.Printf("平台映射缓存: %s -> %s (ID: %d)", serviceType, panName, *panID)
} else {
log.Printf("警告: 未找到平台 %s 对应的数据库记录", panName)
}
}
// 确保有默认的 other 平台
if otherID, exists := panNameToID["other"]; exists {
s.panCache["unknown"] = otherID
}
log.Printf("平台映射缓存初始化完成,共 %d 个映射", len(s.panCache))
})
}
// getPanIDByServiceType 根据服务类型获取平台ID
func (s *Scheduler) getPanIDByServiceType(serviceType panutils.ServiceType) *uint {
s.initPanCache()
serviceTypeStr := serviceType.String()
if panID, exists := s.panCache[serviceTypeStr]; exists {
return panID
}
// 如果找不到,返回 other 平台的ID
if otherID, exists := s.panCache["other"]; exists {
log.Printf("未找到服务类型 %s 的映射,使用默认平台 other", serviceTypeStr)
return otherID
}
log.Printf("未找到服务类型 %s 的映射且没有默认平台返回nil", serviceTypeStr)
return nil
}
2025-07-12 21:23:23 +08:00
// IsReadyResourceRunning 检查待处理资源自动处理任务是否在运行
func (s *Scheduler) IsReadyResourceRunning() bool {
return s.readyResourceRunning
}