From b22e211044680c3bf2e925a0b843793622e9cad5 Mon Sep 17 00:00:00 2001 From: varg1714 <51254954+varg1714@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:20:24 +0800 Subject: [PATCH] feat(fs): Add skipExisting option to move and copy, merge option to copy (#1556) * fix(fs): Add skipExisting option to move and copy. * feat(fs): Add merge option to copy. * feat(fs): Code smell. * feat(fs): Code smell. --- internal/fs/copy_move.go | 31 ++++++++++++++++++++++----- internal/fs/fs.go | 8 +++++++ server/handles/fsmanage.go | 43 ++++++++++++++++++++++++++++---------- 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/internal/fs/copy_move.go b/internal/fs/copy_move.go index e9b70856..57fd7bdb 100644 --- a/internal/fs/copy_move.go +++ b/internal/fs/copy_move.go @@ -24,14 +24,17 @@ type taskType uint8 func (t taskType) String() string { if t == 0 { return "copy" - } else { + } else if t == 1 { return "move" + } else { + return "merge" } } const ( copy taskType = iota move + merge ) type FileTransferTask struct { @@ -67,7 +70,7 @@ func (t *FileTransferTask) Run() error { return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error { nextTask.groupID = t.groupID task_group.TransferCoordinator.AddTask(t.groupID, nil) - if t.TaskType == copy { + if t.TaskType == copy || t.TaskType == merge { CopyTaskManager.Add(nextTask) } else { MoveTaskManager.Add(nextTask) @@ -109,7 +112,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str } if srcStorage.GetStorage() == dstStorage.GetStorage() { - if taskType == copy { + if taskType == copy || taskType == merge { err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) { return nil, err @@ -161,7 +164,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str t.Creator, _ = ctx.Value(conf.UserKey).(*model.User) t.ApiUrl = common.GetApiUrl(ctx) t.groupID = dstDirPath - if taskType == copy { + if taskType == copy || taskType == merge { task_group.TransferCoordinator.AddTask(dstDirPath, nil) CopyTaskManager.Add(t) } else { @@ -177,6 +180,7 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath) } + if srcObj.IsDir() { t.Status = "src object is dir, listing objs" objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{}) @@ -184,17 +188,34 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath) } dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName()) - if t.TaskType == copy { + if t.TaskType == copy || t.TaskType == merge { if t.Ctx().Value(conf.NoTaskKey) != nil { defer op.Cache.DeleteDirectory(t.DstStorage, dstActualPath) } else { task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath)) } } + + existedObjs := make(map[string]bool) + if t.TaskType == merge { + dstObjs, _ := op.List(t.Ctx(), t.DstStorage, dstActualPath, model.ListArgs{}) + for _, obj := range dstObjs { + if !obj.IsDir() { + existedObjs[obj.GetName()] = true + } + } + } + for _, obj := range objs { if utils.IsCanceled(t.Ctx()) { return nil } + + if t.TaskType == merge && !obj.IsDir() && existedObjs[obj.GetName()] { + // skip existed file + continue + } + err = f(&FileTransferTask{ TaskType: t.TaskType, TaskData: TaskData{ diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 9e02f629..85b7f540 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -84,6 +84,14 @@ func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) return res, err } +func Merge(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { + res, err := transfer(ctx, merge, srcObjPath, dstDirPath, lazyCache...) + if err != nil { + log.Errorf("failed merge %s to %s: %+v", srcObjPath, dstDirPath, err) + } + return res, err +} + func Rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error { err := rename(ctx, srcPath, dstName, lazyCache...) if err != nil { diff --git a/server/handles/fsmanage.go b/server/handles/fsmanage.go index 3fe86726..8247fa8c 100644 --- a/server/handles/fsmanage.go +++ b/server/handles/fsmanage.go @@ -57,10 +57,12 @@ func FsMkdir(c *gin.Context) { } type MoveCopyReq struct { - SrcDir string `json:"src_dir"` - DstDir string `json:"dst_dir"` - Names []string `json:"names"` - Overwrite bool `json:"overwrite"` + SrcDir string `json:"src_dir"` + DstDir string `json:"dst_dir"` + Names []string `json:"names"` + Overwrite bool `json:"overwrite"` + SkipExisting bool `json:"skip_existing"` + Merge bool `json:"merge"` } func FsMove(c *gin.Context) { @@ -89,20 +91,25 @@ func FsMove(c *gin.Context) { return } + var validNames []string if !req.Overwrite { for _, name := range req.Names { - if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil { + if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil && !req.SkipExisting { common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403) return + } else if res == nil { + validNames = append(validNames, name) } } + } else { + validNames = req.Names } // Create all tasks immediately without any synchronous validation // All validation will be done asynchronously in the background var addedTasks []task.TaskExtensionInfo - for i, name := range req.Names { - t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1) + for i, name := range validNames { + t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1) if t != nil { addedTasks = append(addedTasks, t) } @@ -151,20 +158,34 @@ func FsCopy(c *gin.Context) { return } + var validNames []string if !req.Overwrite { for _, name := range req.Names { if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil { - common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403) - return + if !req.SkipExisting && !req.Merge { + common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403) + return + } else if req.Merge && res.IsDir() { + validNames = append(validNames, name) + } + } else { + validNames = append(validNames, name) } } + } else { + validNames = req.Names } // Create all tasks immediately without any synchronous validation // All validation will be done asynchronously in the background var addedTasks []task.TaskExtensionInfo - for i, name := range req.Names { - t, err := fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1) + for i, name := range validNames { + var t task.TaskExtensionInfo + if req.Merge { + t, err = fs.Merge(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1) + } else { + t, err = fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1) + } if t != nil { addedTasks = append(addedTasks, t) }