mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-24 19:12:56 +08:00
feat(fs): Add skipExisting option to move and copy, merge option to copy (#1556)
* fix(fs): Add skipExisting option to move and copy. * feat(fs): Add merge option to copy. * feat(fs): Code smell. * feat(fs): Code smell.
This commit is contained in:
@@ -24,14 +24,17 @@ type taskType uint8
|
||||
func (t taskType) String() string {
|
||||
if t == 0 {
|
||||
return "copy"
|
||||
} else {
|
||||
} else if t == 1 {
|
||||
return "move"
|
||||
} else {
|
||||
return "merge"
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
copy taskType = iota
|
||||
move
|
||||
merge
|
||||
)
|
||||
|
||||
type FileTransferTask struct {
|
||||
@@ -67,7 +70,7 @@ func (t *FileTransferTask) Run() error {
|
||||
return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error {
|
||||
nextTask.groupID = t.groupID
|
||||
task_group.TransferCoordinator.AddTask(t.groupID, nil)
|
||||
if t.TaskType == copy {
|
||||
if t.TaskType == copy || t.TaskType == merge {
|
||||
CopyTaskManager.Add(nextTask)
|
||||
} else {
|
||||
MoveTaskManager.Add(nextTask)
|
||||
@@ -109,7 +112,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
|
||||
}
|
||||
|
||||
if srcStorage.GetStorage() == dstStorage.GetStorage() {
|
||||
if taskType == copy {
|
||||
if taskType == copy || taskType == merge {
|
||||
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
|
||||
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
|
||||
return nil, err
|
||||
@@ -161,7 +164,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
|
||||
t.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
|
||||
t.ApiUrl = common.GetApiUrl(ctx)
|
||||
t.groupID = dstDirPath
|
||||
if taskType == copy {
|
||||
if taskType == copy || taskType == merge {
|
||||
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
|
||||
CopyTaskManager.Add(t)
|
||||
} else {
|
||||
@@ -177,6 +180,7 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
|
||||
}
|
||||
|
||||
if srcObj.IsDir() {
|
||||
t.Status = "src object is dir, listing objs"
|
||||
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
|
||||
@@ -184,17 +188,34 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer
|
||||
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
|
||||
}
|
||||
dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
|
||||
if t.TaskType == copy {
|
||||
if t.TaskType == copy || t.TaskType == merge {
|
||||
if t.Ctx().Value(conf.NoTaskKey) != nil {
|
||||
defer op.Cache.DeleteDirectory(t.DstStorage, dstActualPath)
|
||||
} else {
|
||||
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
|
||||
}
|
||||
}
|
||||
|
||||
existedObjs := make(map[string]bool)
|
||||
if t.TaskType == merge {
|
||||
dstObjs, _ := op.List(t.Ctx(), t.DstStorage, dstActualPath, model.ListArgs{})
|
||||
for _, obj := range dstObjs {
|
||||
if !obj.IsDir() {
|
||||
existedObjs[obj.GetName()] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, obj := range objs {
|
||||
if utils.IsCanceled(t.Ctx()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if t.TaskType == merge && !obj.IsDir() && existedObjs[obj.GetName()] {
|
||||
// skip existed file
|
||||
continue
|
||||
}
|
||||
|
||||
err = f(&FileTransferTask{
|
||||
TaskType: t.TaskType,
|
||||
TaskData: TaskData{
|
||||
|
||||
@@ -84,6 +84,14 @@ func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool)
|
||||
return res, err
|
||||
}
|
||||
|
||||
func Merge(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
|
||||
res, err := transfer(ctx, merge, srcObjPath, dstDirPath, lazyCache...)
|
||||
if err != nil {
|
||||
log.Errorf("failed merge %s to %s: %+v", srcObjPath, dstDirPath, err)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
func Rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error {
|
||||
err := rename(ctx, srcPath, dstName, lazyCache...)
|
||||
if err != nil {
|
||||
|
||||
@@ -57,10 +57,12 @@ func FsMkdir(c *gin.Context) {
|
||||
}
|
||||
|
||||
type MoveCopyReq struct {
|
||||
SrcDir string `json:"src_dir"`
|
||||
DstDir string `json:"dst_dir"`
|
||||
Names []string `json:"names"`
|
||||
Overwrite bool `json:"overwrite"`
|
||||
SrcDir string `json:"src_dir"`
|
||||
DstDir string `json:"dst_dir"`
|
||||
Names []string `json:"names"`
|
||||
Overwrite bool `json:"overwrite"`
|
||||
SkipExisting bool `json:"skip_existing"`
|
||||
Merge bool `json:"merge"`
|
||||
}
|
||||
|
||||
func FsMove(c *gin.Context) {
|
||||
@@ -89,20 +91,25 @@ func FsMove(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
var validNames []string
|
||||
if !req.Overwrite {
|
||||
for _, name := range req.Names {
|
||||
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
|
||||
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil && !req.SkipExisting {
|
||||
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
|
||||
return
|
||||
} else if res == nil {
|
||||
validNames = append(validNames, name)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
validNames = req.Names
|
||||
}
|
||||
|
||||
// Create all tasks immediately without any synchronous validation
|
||||
// All validation will be done asynchronously in the background
|
||||
var addedTasks []task.TaskExtensionInfo
|
||||
for i, name := range req.Names {
|
||||
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
|
||||
for i, name := range validNames {
|
||||
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
|
||||
if t != nil {
|
||||
addedTasks = append(addedTasks, t)
|
||||
}
|
||||
@@ -151,20 +158,34 @@ func FsCopy(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
var validNames []string
|
||||
if !req.Overwrite {
|
||||
for _, name := range req.Names {
|
||||
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
|
||||
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
|
||||
return
|
||||
if !req.SkipExisting && !req.Merge {
|
||||
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
|
||||
return
|
||||
} else if req.Merge && res.IsDir() {
|
||||
validNames = append(validNames, name)
|
||||
}
|
||||
} else {
|
||||
validNames = append(validNames, name)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
validNames = req.Names
|
||||
}
|
||||
|
||||
// Create all tasks immediately without any synchronous validation
|
||||
// All validation will be done asynchronously in the background
|
||||
var addedTasks []task.TaskExtensionInfo
|
||||
for i, name := range req.Names {
|
||||
t, err := fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
|
||||
for i, name := range validNames {
|
||||
var t task.TaskExtensionInfo
|
||||
if req.Merge {
|
||||
t, err = fs.Merge(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
|
||||
} else {
|
||||
t, err = fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
|
||||
}
|
||||
if t != nil {
|
||||
addedTasks = append(addedTasks, t)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user