feat(alias): support parallel write (#69)

* feat(alias): support parallel write

* fix(alias): lack `err` in `errors.Join()`
This commit is contained in:
KirCute
2025-06-17 00:13:01 +08:00
committed by GitHub
parent 2e2cec05fd
commit 5a4649c929
3 changed files with 94 additions and 19 deletions

View File

@@ -3,6 +3,7 @@ package alias
import (
"context"
"errors"
"io"
stdpath "path"
"strings"
@@ -10,6 +11,7 @@ import (
"github.com/OpenListTeam/OpenList/internal/errs"
"github.com/OpenListTeam/OpenList/internal/fs"
"github.com/OpenListTeam/OpenList/internal/model"
"github.com/OpenListTeam/OpenList/internal/stream"
"github.com/OpenListTeam/OpenList/pkg/utils"
)
@@ -133,7 +135,10 @@ func (d *Alias) MakeDir(ctx context.Context, parentDir model.Obj, dirName string
}
reqPath, err := d.getReqPath(ctx, parentDir, true)
if err == nil {
return fs.MakeDir(ctx, stdpath.Join(*reqPath, dirName))
for _, path := range reqPath {
err = errors.Join(err, fs.MakeDir(ctx, stdpath.Join(*path, dirName)))
}
return err
}
if errs.IsNotImplement(err) {
return errors.New("same-name dirs cannot make sub-dir")
@@ -159,7 +164,14 @@ func (d *Alias) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
if err != nil {
return err
}
return fs.Move(ctx, *srcPath, *dstPath)
if len(srcPath) == len(dstPath) {
for i := range srcPath {
err = errors.Join(err, fs.Move(ctx, *srcPath[i], *dstPath[i]))
}
return err
} else {
return errors.New("parallel paths mismatch")
}
}
func (d *Alias) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
@@ -168,7 +180,10 @@ func (d *Alias) Rename(ctx context.Context, srcObj model.Obj, newName string) er
}
reqPath, err := d.getReqPath(ctx, srcObj, false)
if err == nil {
return fs.Rename(ctx, *reqPath, newName)
for _, path := range reqPath {
err = errors.Join(err, fs.Rename(ctx, *path, newName))
}
return err
}
if errs.IsNotImplement(err) {
return errors.New("same-name files cannot be Rename")
@@ -194,8 +209,21 @@ func (d *Alias) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
if err != nil {
return err
}
_, err = fs.Copy(ctx, *srcPath, *dstPath)
return err
if len(srcPath) == len(dstPath) {
for i := range srcPath {
_, e := fs.Copy(ctx, *srcPath[i], *dstPath[i])
err = errors.Join(err, e)
}
return err
} else if len(srcPath) == 1 || !d.ProtectSameName {
for _, path := range dstPath {
_, e := fs.Copy(ctx, *srcPath[0], *path)
err = errors.Join(err, e)
}
return err
} else {
return errors.New("parallel paths mismatch")
}
}
func (d *Alias) Remove(ctx context.Context, obj model.Obj) error {
@@ -204,7 +232,10 @@ func (d *Alias) Remove(ctx context.Context, obj model.Obj) error {
}
reqPath, err := d.getReqPath(ctx, obj, false)
if err == nil {
return fs.Remove(ctx, *reqPath)
for _, path := range reqPath {
err = errors.Join(err, fs.Remove(ctx, *path))
}
return err
}
if errs.IsNotImplement(err) {
return errors.New("same-name files cannot be Delete")
@@ -218,7 +249,28 @@ func (d *Alias) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer,
}
reqPath, err := d.getReqPath(ctx, dstDir, true)
if err == nil {
return fs.PutDirectly(ctx, *reqPath, s)
if len(reqPath) == 1 {
return fs.PutDirectly(ctx, *reqPath[0], s)
} else {
defer s.Close()
file, err := s.CacheFullInTempFile()
if err != nil {
return err
}
for _, path := range reqPath {
err = errors.Join(err, fs.PutDirectly(ctx, *path, &stream.FileStream{
Obj: s,
Mimetype: s.GetMimetype(),
WebPutAsTask: s.NeedStore(),
Reader: file,
}))
_, e := file.Seek(0, io.SeekStart)
if e != nil {
return errors.Join(err, e)
}
}
return err
}
}
if errs.IsNotImplement(err) {
return errors.New("same-name dirs cannot be Put")
@@ -232,7 +284,10 @@ func (d *Alias) PutURL(ctx context.Context, dstDir model.Obj, name, url string)
}
reqPath, err := d.getReqPath(ctx, dstDir, true)
if err == nil {
return fs.PutURL(ctx, *reqPath, name, url)
for _, path := range reqPath {
err = errors.Join(err, fs.PutURL(ctx, *path, name, url))
}
return err
}
if errs.IsNotImplement(err) {
return errors.New("same-name files cannot offline download")
@@ -314,8 +369,21 @@ func (d *Alias) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.Obj,
if err != nil {
return err
}
_, err = fs.ArchiveDecompress(ctx, *srcPath, *dstPath, args)
return err
if len(srcPath) == len(dstPath) {
for i := range srcPath {
_, e := fs.ArchiveDecompress(ctx, *srcPath[i], *dstPath[i], args)
err = errors.Join(err, e)
}
return err
} else if len(srcPath) == 1 || !d.ProtectSameName {
for _, path := range dstPath {
_, e := fs.ArchiveDecompress(ctx, *srcPath[0], *path, args)
err = errors.Join(err, e)
}
return err
} else {
return errors.New("parallel paths mismatch")
}
}
var _ driver.Driver = (*Alias)(nil)

View File

@@ -11,6 +11,7 @@ type Addition struct {
// define other
Paths string `json:"paths" required:"true" type:"text"`
ProtectSameName bool `json:"protect_same_name" default:"true" required:"false" help:"Protects same-name files from Delete or Rename"`
ParallelWrite bool `json:"parallel_write" type:"bool" default:"false"`
DownloadConcurrency int `json:"download_concurrency" default:"0" required:"false" type:"number" help:"Need to enable proxy"`
DownloadPartSize int `json:"download_part_size" default:"0" type:"number" required:"false" help:"Need to enable proxy. Unit: KB"`
Writable bool `json:"writable" type:"bool" default:"false"`

View File

@@ -127,33 +127,39 @@ func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs)
return link, err
}
func (d *Alias) getReqPath(ctx context.Context, obj model.Obj, isParent bool) (*string, error) {
func (d *Alias) getReqPath(ctx context.Context, obj model.Obj, isParent bool) ([]*string, error) {
root, sub := d.getRootAndPath(obj.GetPath())
if sub == "" && !isParent {
return nil, errs.NotSupport
}
dsts, ok := d.pathMap[root]
all := true
if !ok {
return nil, errs.ObjectNotFound
}
var reqPath *string
var reqPath []*string
for _, dst := range dsts {
path := stdpath.Join(dst, sub)
_, err := fs.Get(ctx, path, &fs.GetArgs{NoLog: true})
if err != nil {
all = false
if d.ProtectSameName && d.ParallelWrite && len(reqPath) >= 2 {
return nil, errs.NotImplement
}
continue
}
if !d.ProtectSameName {
return &path, nil
if !d.ProtectSameName && !d.ParallelWrite {
return []*string{&path}, nil
}
if ok {
ok = false
} else {
reqPath = append(reqPath, &path)
if d.ProtectSameName && !d.ParallelWrite && len(reqPath) >= 2 {
return nil, errs.NotImplement
}
if d.ProtectSameName && d.ParallelWrite && len(reqPath) >= 2 && !all {
return nil, errs.NotImplement
}
reqPath = &path
}
if reqPath == nil {
if len(reqPath) == 0 {
return nil, errs.ObjectNotFound
}
return reqPath, nil