fix(fs):fix retry task after restart (#1467)

* fix retry task after restart

* fix: initialize SrcStorage and DstStorage in tasks to prevent nil pointer dereference

* feat: implement storage load signal mechanism for improved synchronization

* fix:  update StoragesLoaded logic

* refactor: reorganize storage loading logic and improve synchronization handling

---------

Co-authored-by: j2rong4cn <j2rong@qq.com>
This commit is contained in:
jenfonro
2025-10-18 20:52:02 +08:00
committed by GitHub
parent 14d2b8290a
commit 549e60136b
9 changed files with 91 additions and 13 deletions

View File

@@ -25,6 +25,6 @@ func LoadStorages() {
storages[i].MountPath, storages[i].Driver, storages[i].Order)
}
}
conf.StoragesLoaded = true
conf.SendStoragesLoadedSignal()
}(storages)
}

View File

@@ -3,6 +3,7 @@ package conf
import (
"net/url"
"regexp"
"sync"
)
var (
@@ -23,8 +24,6 @@ var FilenameCharMap = make(map[string]string)
var PrivacyReg []*regexp.Regexp
var (
// StoragesLoaded loaded success if empty
StoragesLoaded = false
// 单个Buffer最大限制
MaxBufferLimit = 16 * 1024 * 1024
// 超过该阈值的Buffer将使用 mmap 分配,可主动释放内存
@@ -35,3 +34,39 @@ var (
ManageHtml string
IndexHtml string
)
var (
// StoragesLoaded loaded success if empty
StoragesLoaded = false
storagesLoadMu sync.RWMutex
storagesLoadSignal chan struct{} = make(chan struct{})
)
func StoragesLoadSignal() <-chan struct{} {
storagesLoadMu.RLock()
ch := storagesLoadSignal
storagesLoadMu.RUnlock()
return ch
}
func SendStoragesLoadedSignal() {
storagesLoadMu.Lock()
select {
case <-storagesLoadSignal:
// already closed
default:
StoragesLoaded = true
close(storagesLoadSignal)
}
storagesLoadMu.Unlock()
}
func ResetStoragesLoadSignal() {
storagesLoadMu.Lock()
select {
case <-storagesLoadSignal:
StoragesLoaded = false
storagesLoadSignal = make(chan struct{})
default:
// not closed -> nothing to do
}
storagesLoadMu.Unlock()
}

View File

@@ -1,6 +1,7 @@
package db
import (
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/pkg/errors"
)
@@ -30,6 +31,7 @@ func GetTaskDataFunc(type_s string, enabled bool) func() ([]byte, error) {
return nil
}
return func() ([]byte, error) {
<-conf.StoragesLoadSignal()
return []byte(task.PersistData), nil
}
}

View File

@@ -41,6 +41,18 @@ func (t *ArchiveDownloadTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
if t.SrcStorage == nil {
if srcStorage, _, err := op.GetStorageAndActualPath(t.SrcStorageMp); err == nil {
t.SrcStorage = srcStorage
} else {
return err
}
if dstStorage, _, err := op.GetStorageAndActualPath(t.DstStorageMp); err == nil {
t.DstStorage = dstStorage
} else {
return err
}
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()

View File

@@ -48,6 +48,19 @@ func (t *FileTransferTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
if t.SrcStorage == nil {
if srcStorage, _, err := op.GetStorageAndActualPath(t.SrcStorageMp); err == nil {
t.SrcStorage = srcStorage
} else {
return err
}
if dstStorage, _, err := op.GetStorageAndActualPath(t.DstStorageMp); err == nil {
t.DstStorage = dstStorage
} else {
return err
}
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()

View File

@@ -34,6 +34,20 @@ func (t *TransferTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
if t.SrcStorage == nil && t.SrcStorageMp != "" {
if srcStorage, _, err := op.GetStorageAndActualPath(t.SrcStorageMp); err == nil {
t.SrcStorage = srcStorage
} else {
return err
}
if t.DstStorage == nil {
if dstStorage, _, err := op.GetStorageAndActualPath(t.DstStorageMp); err == nil {
t.DstStorage = dstStorage
} else {
return err
}
}
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
@@ -64,9 +78,8 @@ func (t *TransferTask) Run() error {
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress)
}
return transferStdPath(t)
} else {
return transferObjPath(t)
}
return transferObjPath(t)
}
func (t *TransferTask) GetName() string {

View File

@@ -14,7 +14,7 @@ type TaskExtension struct {
Creator *model.User
startTime *time.Time
endTime *time.Time
totalBytes int64
TotalBytes int64
ApiUrl string
}
@@ -58,11 +58,11 @@ func (t *TaskExtension) ClearEndTime() {
}
func (t *TaskExtension) SetTotalBytes(totalBytes int64) {
t.totalBytes = totalBytes
t.TotalBytes = totalBytes
}
func (t *TaskExtension) GetTotalBytes() int64 {
return t.totalBytes
return t.TotalBytes
}
func (t *TaskExtension) ReinitCtx() error {

View File

@@ -175,7 +175,7 @@ func LoadAllStorages(c *gin.Context) {
common.ErrorResp(c, err, 500, true)
return
}
conf.StoragesLoaded = false
conf.ResetStoragesLoadSignal()
go func(storages []model.Storage) {
for _, storage := range storages {
storageDriver, err := op.GetStorageByMountPath(storage.MountPath)
@@ -195,7 +195,7 @@ func LoadAllStorages(c *gin.Context) {
log.Infof("success load storage: [%s], driver: [%s]",
storage.MountPath, storage.Driver)
}
conf.StoragesLoaded = true
conf.SendStoragesLoadedSignal()
}(storages)
common.SuccessResp(c)
}

View File

@@ -22,9 +22,12 @@ func StoragesLoaded(c *gin.Context) {
return
}
}
common.ErrorStrResp(c, "Loading storage, please wait", 500)
c.Abort()
return
select {
case <-conf.StoragesLoadSignal():
case <-c.Request.Context().Done():
c.Abort()
return
}
}
common.GinWithValue(c,
conf.ApiUrlKey, common.GetApiUrlFromRequest(c.Request),