Files
urldb/db/repo/telegram_channel_repository.go

157 lines
5.0 KiB
Go
Raw Permalink Normal View History

2025-09-16 00:07:02 +08:00
package repo
import (
"time"
"github.com/ctwj/urldb/db/entity"
"gorm.io/gorm"
)
type TelegramChannelRepository interface {
BaseRepository[entity.TelegramChannel]
FindActiveChannels() ([]entity.TelegramChannel, error)
FindByChatID(chatID int64) (*entity.TelegramChannel, error)
FindByChatType(chatType string) ([]entity.TelegramChannel, error)
UpdateLastPushAt(id uint, lastPushAt time.Time) error
FindDueForPush() ([]entity.TelegramChannel, error)
2025-09-21 00:11:10 +08:00
CleanupDuplicateChannels() error
2025-10-25 09:42:19 +08:00
FindActiveChannelsByTypes(chatTypes []string) ([]entity.TelegramChannel, error)
2025-09-16 00:07:02 +08:00
}
type TelegramChannelRepositoryImpl struct {
BaseRepositoryImpl[entity.TelegramChannel]
}
func NewTelegramChannelRepository(db *gorm.DB) TelegramChannelRepository {
return &TelegramChannelRepositoryImpl{
BaseRepositoryImpl: BaseRepositoryImpl[entity.TelegramChannel]{db: db},
}
}
// 实现基类方法
func (r *TelegramChannelRepositoryImpl) Create(entity *entity.TelegramChannel) error {
return r.db.Create(entity).Error
}
func (r *TelegramChannelRepositoryImpl) Update(entity *entity.TelegramChannel) error {
return r.db.Save(entity).Error
}
func (r *TelegramChannelRepositoryImpl) Delete(id uint) error {
return r.db.Delete(&entity.TelegramChannel{}, id).Error
}
func (r *TelegramChannelRepositoryImpl) FindByID(id uint) (*entity.TelegramChannel, error) {
var channel entity.TelegramChannel
err := r.db.First(&channel, id).Error
if err != nil {
return nil, err
}
return &channel, nil
}
func (r *TelegramChannelRepositoryImpl) FindAll() ([]entity.TelegramChannel, error) {
var channels []entity.TelegramChannel
err := r.db.Order("created_at desc").Find(&channels).Error
return channels, err
}
// FindActiveChannels 查找活跃的频道/群组
func (r *TelegramChannelRepositoryImpl) FindActiveChannels() ([]entity.TelegramChannel, error) {
var channels []entity.TelegramChannel
err := r.db.Where("is_active = ? AND push_enabled = ?", true, true).Order("created_at desc").Find(&channels).Error
return channels, err
}
// FindByChatID 根据 ChatID 查找频道/群组
func (r *TelegramChannelRepositoryImpl) FindByChatID(chatID int64) (*entity.TelegramChannel, error) {
var channel entity.TelegramChannel
err := r.db.Where("chat_id = ?", chatID).First(&channel).Error
if err != nil {
return nil, err
}
return &channel, nil
}
// FindByChatType 根据类型查找频道/群组
func (r *TelegramChannelRepositoryImpl) FindByChatType(chatType string) ([]entity.TelegramChannel, error) {
var channels []entity.TelegramChannel
err := r.db.Where("chat_type = ?", chatType).Order("created_at desc").Find(&channels).Error
return channels, err
}
2025-10-25 09:42:19 +08:00
// FindActiveChannelsByTypes 根据多个类型查找活跃频道/群组
func (r *TelegramChannelRepositoryImpl) FindActiveChannelsByTypes(chatTypes []string) ([]entity.TelegramChannel, error) {
var channels []entity.TelegramChannel
err := r.db.Where("chat_type IN (?) AND is_active = ?", chatTypes, true).Find(&channels).Error
return channels, err
}
2025-09-16 00:07:02 +08:00
// UpdateLastPushAt 更新最后推送时间
func (r *TelegramChannelRepositoryImpl) UpdateLastPushAt(id uint, lastPushAt time.Time) error {
return r.db.Model(&entity.TelegramChannel{}).Where("id = ?", id).Update("last_push_at", lastPushAt).Error
}
// FindDueForPush 查找需要推送的频道/群组
func (r *TelegramChannelRepositoryImpl) FindDueForPush() ([]entity.TelegramChannel, error) {
var channels []entity.TelegramChannel
// 查找活跃、启用推送的频道,且距离上次推送已超过推送频率小时的记录
2025-09-17 14:31:12 +08:00
// 先获取所有活跃且启用推送的频道
err := r.db.Where("is_active = ? AND push_enabled = ?", true, true).Find(&channels).Error
if err != nil {
return nil, err
}
// 在内存中过滤出需要推送的频道(更可靠的跨数据库方案)
var dueChannels []entity.TelegramChannel
now := time.Now()
2025-09-21 00:11:10 +08:00
// 用于去重的map以chat_id为键
seenChatIDs := make(map[int64]bool)
2025-09-17 14:31:12 +08:00
for _, channel := range channels {
2025-09-21 00:11:10 +08:00
// 检查是否已经处理过这个chat_id去重
if seenChatIDs[channel.ChatID] {
continue
}
2025-09-17 14:31:12 +08:00
// 如果从未推送过,或者距离上次推送已超过推送频率小时
2025-09-21 00:11:10 +08:00
isDue := false
2025-09-17 14:31:12 +08:00
if channel.LastPushAt == nil {
2025-09-21 00:11:10 +08:00
isDue = true
2025-09-17 14:31:12 +08:00
} else {
2025-09-19 18:37:50 +08:00
// 计算下次推送时间:上次推送时间 + 推送频率分钟
nextPushTime := channel.LastPushAt.Add(time.Duration(channel.PushFrequency) * time.Minute)
2025-09-17 14:31:12 +08:00
if now.After(nextPushTime) {
2025-09-21 00:11:10 +08:00
isDue = true
2025-09-17 14:31:12 +08:00
}
}
2025-09-21 00:11:10 +08:00
if isDue {
dueChannels = append(dueChannels, channel)
seenChatIDs[channel.ChatID] = true // 标记此chat_id已处理
}
2025-09-17 14:31:12 +08:00
}
return dueChannels, nil
2025-09-16 00:07:02 +08:00
}
2025-09-21 00:11:10 +08:00
// CleanupDuplicateChannels 清理重复的频道记录保留ID最小的记录
func (r *TelegramChannelRepositoryImpl) CleanupDuplicateChannels() error {
// 使用SQL查询找出重复的chat_id并删除除了ID最小外的所有记录
query := `
DELETE t1 FROM telegram_channels t1
INNER JOIN (
SELECT chat_id, MIN(id) as min_id
FROM telegram_channels
GROUP BY chat_id
HAVING COUNT(*) > 1
) t2 ON t1.chat_id = t2.chat_id
WHERE t1.id > t2.min_id
`
return r.db.Exec(query).Error
}