Files
OpenList/internal/task_group/transfer.go
ILoveScratch febbcd6027 feat(cache): improve cache management (#1339)
* feat(cache): improve cache management

* feat(disk-usage): add cache

* feat(disk-usage): add refresh

* fix(disk-usage): cache with ttl

* feat(cache): implement KeyedCache and TypedCache for improved caching mechanism

* fix(copy): update object retrieval to use Get instead of GetUnwrap

* refactor(cache): simplify DirectoryCache structure and improve object management

* fix(cache): correct cache entry initialization and key deletion logic in TypedCache

* refactor(driver): remove GetObjInfo interface and simplify Link function logic
https://github.com/OpenListTeam/OpenList/pull/888/files#r2430925783

* fix(link): optimize link retrieval and caching logic

* refactor(cache): consolidate cache management and improve directory cache handling

* fix(cache): add cache control based on storage configuration in List function

* .

* refactor: replace fmt.Sprintf with strconv for integer conversions

* refactor(cache): enhance cache entry management with Expirable interface

* fix(cache): improve link reference acquisition logic to handle expiration

* refactor: replace OnlyLinkMFile with NoLinkSF in driver configurations and logic

* refactor(link): enhance link caching logic with dynamic type keys based on IP and User-Agent

* feat(drivers): add LinkCacheType to driver configurations for enhanced caching

* refactor(cache): streamline directory object management in cache operations

* refactor(cache): remove unnecessary 'dirty' field from CacheEntry structure

* refactor(cache): replace 'dirty' field with bitwise flags

* refactor(io): 调高SyncClosers.AcquireReference的优先级

* refactor(link): 优化链接获取逻辑,增加重

* refactor(link): 添加RequireReference字段以增强链接管理

* refactor(link): 移除MFile字段,改用RangeReader

* refactor: 移除不必要的NoLinkSF字段

* refactor(cache): 修改目录缓存的脏标志定义和更新逻辑

* feat(cache): add expiration gc

---------

Co-authored-by: KirCute <951206789@qq.com>
Co-authored-by: KirCute <kircute@foxmail.com>
Co-authored-by: j2rong4cn <j2rong@qq.com>
2025-10-18 21:47:18 +08:00

103 lines
3.0 KiB
Go

package task_group
import (
"context"
"fmt"
"path"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type SrcPathToRemove string
// ActualPath
type DstPathToRefresh string
func RefreshAndRemove(dstPath string, payloads ...any) {
dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath)
if err != nil {
log.Error(errors.WithMessage(err, "failed get dst storage"))
return
}
_, dstNeedRefresh := dstStorage.(driver.Put)
if dstNeedRefresh {
op.Cache.DeleteDirectory(dstStorage, dstActualPath)
}
var ctx context.Context
for _, payload := range payloads {
switch p := payload.(type) {
case DstPathToRefresh:
if dstNeedRefresh {
op.Cache.DeleteDirectory(dstStorage, string(p))
}
case SrcPathToRemove:
if ctx == nil {
ctx = context.Background()
}
srcStorage, srcActualPath, err := op.GetStorageAndActualPath(string(p))
if err != nil {
log.Error(errors.WithMessage(err, "failed get src storage"))
continue
}
err = verifyAndRemove(ctx, srcStorage, dstStorage, srcActualPath, dstActualPath, dstNeedRefresh)
if err != nil {
log.Error(err)
}
}
}
}
func verifyAndRemove(ctx context.Context, srcStorage, dstStorage driver.Driver, srcPath, dstPath string, refresh bool) error {
srcObj, err := op.Get(ctx, srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
dstObjPath := path.Join(dstPath, srcObj.GetName())
dstObj, err := op.Get(ctx, dstStorage, dstObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get dst [%s] file", path.Join(dstStorage.GetStorage().MountPath, dstObjPath))
}
if !dstObj.IsDir() {
err = op.Remove(ctx, srcStorage, srcPath)
if err != nil {
return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err)
}
return nil
}
// Verify directory
srcObjs, err := op.List(ctx, srcStorage, srcPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
if refresh {
op.Cache.DeleteDirectory(dstStorage, dstObjPath)
}
hasErr := false
for _, obj := range srcObjs {
srcSubPath := path.Join(srcPath, obj.GetName())
err := verifyAndRemove(ctx, srcStorage, dstStorage, srcSubPath, dstObjPath, refresh)
if err != nil {
log.Error(err)
hasErr = true
}
}
if hasErr {
return errors.Errorf("some subitems of [%s] failed to verify and remove", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
err = op.Remove(ctx, srcStorage, srcPath)
if err != nil {
return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err)
}
return nil
}
var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", RefreshAndRemove)