Compare commits

...

19 Commits

Author SHA1 Message Date
Seven
72206ac9f6 feat(strm): keep local download file (#1707) 2025-11-25 18:08:48 +08:00
ShenLin
62dedb2a2e fix(pkg/aria2): use pointer receivers for Call methods (#1706) 2025-11-25 13:25:42 +08:00
ShenLin
7189c5b461 chore(pkg/aria2): simplify context cancellation handling in RPC calls (#1705) 2025-11-25 13:09:04 +08:00
ShenLin
1a445f9d3f chore(archive): fix struct literal uses unkeyed fields (#1704) 2025-11-25 13:05:36 +08:00
ShenLin
aa22884079 fix(search): fix duplicated variable init (#1703) 2025-11-25 12:23:25 +08:00
ImoutoHeaven
316d4caf37 feat(search): Add task queue for Meilisearch to prevent race conditions (#1423)
* Add task queue for Meilisearch to prevent race conditions

- Implement TaskQueueManager for async index operations
- Queue update tasks and process them in batches every 30 seconds
- Check pending task status before executing new operations
- Optimize batch indexing and deletion logic
- Fix type assertion bug in buildSearchDocumentFromResults

* fix(search): re-enqueue skipped tasks to prevent task loss

When tasks are skipped due to pending dependencies, they are now
re-enqueued if not already in queue. This prevents task loss while
avoiding overwriting newer snapshots for the same parent.

* fix(copilot-comment): Invoke Stop() & err of SliceConvert

---------

Co-authored-by: ImoutoHeaven <noreply@imoutoheaven.org>
Co-authored-by: jyxjjj <773933146@qq.com>
2025-11-25 11:38:27 +08:00
KirCute
60a489eb68 feat(archive): support non-overwrite decompress (#1701) 2025-11-25 10:27:29 +08:00
varg1714
b22e211044 feat(fs): Add skipExisting option to move and copy, merge option to copy (#1556)
* fix(fs): Add skipExisting option to move and copy.

* feat(fs): Add merge option to copy.

* feat(fs): Code smell.

* feat(fs): Code smell.
2025-11-24 14:20:24 +08:00
KirCute
ca401b9af9 fix(local): assign non-CoW copy requests to the task module (#1669)
* fix(local): assign non-CoW copy requests to the task module

* fix build

* fix cross device
2025-11-24 14:14:53 +08:00
jenfonro
addce8b691 feat(baidu_netdisk): Add shard upload timeout setting (#1682)
add timeout
2025-11-24 14:14:34 +08:00
Seven
42fc841dc1 feat(strm): custom path prefixes (#1697)
fix(strm): custom path prefixes

Signed-off-by: ShenLin <773933146@qq.com>
Co-authored-by: ShenLin <773933146@qq.com>
2025-11-24 14:11:59 +08:00
varg1714
4c0916b64b fix(strm): fix the name and type issue (#1630)
* fix(strm): fix the name and type issue

* fix(strm): update version
2025-11-24 14:05:49 +08:00
VXTLS
3989d35abd fix(misskey): folderId format validation and root directory handling (#1647)
fix(misskey): Fix folderId format validation and root directory handling
2025-11-21 12:18:54 +08:00
KirCute
72e2ae1f14 feat(fs): support manually trigger objs update hook (#1620)
* feat(fs): support manually trigger objs update hook

* fix: support driver internal copy & move case

* fix

* fix: apply suggestions of Copilot
2025-11-21 12:18:20 +08:00
Seven
3e37f575d8 fix(openlist_driver): ensure UA is correctly propagated (#1679) 2025-11-21 12:13:41 +08:00
MoYan
c0d480366d fix(driver/123): initialize Platform field (#1644)
* fix(driver/123): initialization the Platform field

Signed-off-by: MoYan <1561515308@qq.com>

* Fix formatting of Platform field in Pan123

Signed-off-by: MoYan <1561515308@qq.com>

---------

Signed-off-by: MoYan <1561515308@qq.com>
2025-11-14 18:49:44 +08:00
Copilot
9de7561154 feat(upload): add optional system file filtering for uploads (#1634) 2025-11-14 14:45:39 +08:00
MadDogOwner
0866b9075f fix(link): correct link cache mode bitwise comparison (#1635)
* fix(link): correct link cache mode bitwise comparison

Signed-off-by: MadDogOwner <xiaoran@xrgzs.top>

* refactor(link): use explicit flag equality for link cache mode bitmask checks

Signed-off-by: MadDogOwner <xiaoran@xrgzs.top>

---------

Signed-off-by: MadDogOwner <xiaoran@xrgzs.top>
2025-11-13 13:52:33 +08:00
KirCute
055696f576 feat(s3): support frontend direct upload (#1631)
* feat(s3): support frontend direct upload

* feat(s3): support custom direct upload host

* fix: apply suggestions of Copilot
2025-11-13 13:22:17 +08:00
61 changed files with 1295 additions and 236 deletions

View File

@@ -28,6 +28,7 @@ func init() {
return &Pan123{
Addition: Addition{
UploadThread: 3,
Platform: "web",
},
}
})

View File

@@ -5,7 +5,6 @@ import (
"errors"
stdpath "path"
"strings"
"sync"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
@@ -17,9 +16,15 @@ import (
log "github.com/sirupsen/logrus"
)
type detailWithIndex struct {
idx int
val *model.StorageDetails
}
func (d *Alias) listRoot(ctx context.Context, withDetails, refresh bool) []model.Obj {
var objs []model.Obj
var wg sync.WaitGroup
detailsChan := make(chan detailWithIndex, len(d.pathMap))
workerCount := 0
for _, k := range d.rootOrder {
obj := model.Object{
Name: k,
@@ -47,22 +52,26 @@ func (d *Alias) listRoot(ctx context.Context, withDetails, refresh bool) []model
DriverName: remoteDriver.Config().Name,
},
}
wg.Add(1)
go func() {
defer wg.Done()
c, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
details, e := op.GetStorageDetails(c, remoteDriver, refresh)
workerCount++
go func(dri driver.Driver, i int) {
details, e := op.GetStorageDetails(ctx, dri, refresh)
if e != nil {
if !errors.Is(e, errs.NotImplement) && !errors.Is(e, errs.StorageNotInit) {
log.Errorf("failed get %s storage details: %+v", remoteDriver.GetStorage().MountPath, e)
log.Errorf("failed get %s storage details: %+v", dri.GetStorage().MountPath, e)
}
return
}
objs[idx].(*model.ObjStorageDetails).StorageDetails = details
}()
detailsChan <- detailWithIndex{idx: i, val: details}
}(remoteDriver, idx)
}
for workerCount > 0 {
select {
case r := <-detailsChan:
objs[r.idx].(*model.ObjStorageDetails).StorageDetails = r.val
workerCount--
case <-time.After(time.Second):
workerCount = 0
}
}
wg.Wait()
return objs
}

View File

@@ -58,8 +58,13 @@ func (d *BaiduNetdisk) GetAddition() driver.Additional {
}
func (d *BaiduNetdisk) Init(ctx context.Context) error {
timeout := DEFAULT_UPLOAD_SLICE_TIMEOUT
if d.UploadSliceTimeout > 0 {
timeout = time.Second * time.Duration(d.UploadSliceTimeout)
}
d.upClient = base.NewRestyClient().
SetTimeout(UPLOAD_TIMEOUT).
SetTimeout(timeout).
SetRetryCount(UPLOAD_RETRY_COUNT).
SetRetryWaitTime(UPLOAD_RETRY_WAIT_TIME).
SetRetryMaxWaitTime(UPLOAD_RETRY_MAX_WAIT_TIME)

View File

@@ -19,6 +19,7 @@ type Addition struct {
AccessToken string
RefreshToken string `json:"refresh_token" required:"true"`
UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"`
UploadSliceTimeout int `json:"upload_timeout" type:"number" default:"60" help:"per-slice upload timeout in seconds"`
UploadAPI string `json:"upload_api" default:"https://d.pcs.baidu.com"`
UseDynamicUploadAPI bool `json:"use_dynamic_upload_api" default:"true" help:"dynamically get upload api domain, when enabled, the 'Upload API' setting will be used as a fallback if failed to get"`
CustomUploadPartSize int64 `json:"custom_upload_part_size" type:"number" default:"0" help:"0 for auto"`
@@ -27,12 +28,12 @@ type Addition struct {
}
const (
UPLOAD_FALLBACK_API = "https://d.pcs.baidu.com" // 备用上传地址
UPLOAD_URL_EXPIRE_TIME = time.Minute * 60 // 上传地址有效期(分钟)
UPLOAD_TIMEOUT = time.Minute * 30 // 上传请求超时时间
UPLOAD_RETRY_COUNT = 3
UPLOAD_RETRY_WAIT_TIME = time.Second * 1
UPLOAD_RETRY_MAX_WAIT_TIME = time.Second * 5
UPLOAD_FALLBACK_API = "https://d.pcs.baidu.com" // 备用上传地址
UPLOAD_URL_EXPIRE_TIME = time.Minute * 60 // 上传地址有效期(分钟)
DEFAULT_UPLOAD_SLICE_TIMEOUT = time.Second * 60 // 上传分片请求默认超时时间
UPLOAD_RETRY_COUNT = 3
UPLOAD_RETRY_WAIT_TIME = time.Second * 1
UPLOAD_RETRY_MAX_WAIT_TIME = time.Second * 5
)
var config = driver.Config{

View File

@@ -0,0 +1,16 @@
//go:build !windows && !plan9 && !netbsd && !aix && !illumos && !solaris && !js
package local
import (
"os"
"path/filepath"
"syscall"
)
func copyNamedPipe(dstPath string, mode os.FileMode, dirMode os.FileMode) error {
if err := os.MkdirAll(filepath.Dir(dstPath), dirMode); err != nil {
return err
}
return syscall.Mkfifo(dstPath, uint32(mode))
}

View File

@@ -0,0 +1,9 @@
//go:build windows || plan9 || netbsd || aix || illumos || solaris || js
package local
import "os"
func copyNamedPipe(_ string, _, _ os.FileMode) error {
return nil
}

View File

@@ -23,7 +23,6 @@ import (
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/times"
cp "github.com/otiai10/copy"
log "github.com/sirupsen/logrus"
_ "golang.org/x/image/webp"
)
@@ -297,16 +296,9 @@ func (d *Local) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
return fmt.Errorf("the destination folder is a subfolder of the source folder")
}
err := os.Rename(srcPath, dstPath)
if err != nil && strings.Contains(err.Error(), "invalid cross-device link") {
// 跨设备移动,先复制再删除
if err := d.Copy(ctx, srcObj, dstDir); err != nil {
return err
}
// 复制成功后直接删除源文件/文件夹
if srcObj.IsDir() {
return os.RemoveAll(srcObj.GetPath())
}
return os.Remove(srcObj.GetPath())
if isCrossDeviceError(err) {
// 跨设备移动,变更为移动任务
return errs.NotImplement
}
if err == nil {
srcParent := filepath.Dir(srcPath)
@@ -347,15 +339,14 @@ func (d *Local) Copy(_ context.Context, srcObj, dstDir model.Obj) error {
if utils.IsSubPath(srcPath, dstPath) {
return fmt.Errorf("the destination folder is a subfolder of the source folder")
}
// Copy using otiai10/copy to perform more secure & efficient copy
err := cp.Copy(srcPath, dstPath, cp.Options{
Sync: true, // Sync file to disk after copy, may have performance penalty in filesystem such as ZFS
PreserveTimes: true,
PreserveOwner: true,
})
info, err := os.Lstat(srcPath)
if err != nil {
return err
}
// 复制regular文件会返回errs.NotImplement, 转为复制任务
if err = d.tryCopy(srcPath, dstPath, info); err != nil {
return err
}
if d.directoryMap.Has(filepath.Dir(dstPath)) {
d.directoryMap.UpdateDirSize(filepath.Dir(dstPath))

View File

@@ -3,6 +3,7 @@ package local
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
@@ -14,7 +15,9 @@ import (
"strings"
"sync"
"github.com/KarpelesLab/reflink"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/disintegration/imaging"
@@ -148,7 +151,7 @@ func (d *Local) getThumb(file model.Obj) (*bytes.Buffer, *string, error) {
return nil, nil, err
}
if d.ThumbCacheFolder != "" {
err = os.WriteFile(filepath.Join(d.ThumbCacheFolder, thumbName), buf.Bytes(), 0666)
err = os.WriteFile(filepath.Join(d.ThumbCacheFolder, thumbName), buf.Bytes(), 0o666)
if err != nil {
return nil, nil, err
}
@@ -405,3 +408,79 @@ func (m *DirectoryMap) DeleteDirNode(dirname string) error {
return nil
}
func (d *Local) tryCopy(srcPath, dstPath string, info os.FileInfo) error {
if info.Mode()&os.ModeDevice != 0 {
return errors.New("cannot copy a device")
} else if info.Mode()&os.ModeSymlink != 0 {
return d.copySymlink(srcPath, dstPath)
} else if info.Mode()&os.ModeNamedPipe != 0 {
return copyNamedPipe(dstPath, info.Mode(), os.FileMode(d.mkdirPerm))
} else if info.IsDir() {
return d.recurAndTryCopy(srcPath, dstPath)
} else {
return tryReflinkCopy(srcPath, dstPath)
}
}
func (d *Local) copySymlink(srcPath, dstPath string) error {
linkOrig, err := os.Readlink(srcPath)
if err != nil {
return err
}
dstDir := filepath.Dir(dstPath)
if !filepath.IsAbs(linkOrig) {
srcDir := filepath.Dir(srcPath)
rel, err := filepath.Rel(dstDir, srcDir)
if err != nil {
rel, err = filepath.Abs(srcDir)
}
if err != nil {
return err
}
linkOrig = filepath.Clean(filepath.Join(rel, linkOrig))
}
err = os.MkdirAll(dstDir, os.FileMode(d.mkdirPerm))
if err != nil {
return err
}
return os.Symlink(linkOrig, dstPath)
}
func (d *Local) recurAndTryCopy(srcPath, dstPath string) error {
err := os.MkdirAll(dstPath, os.FileMode(d.mkdirPerm))
if err != nil {
return err
}
files, err := readDir(srcPath)
if err != nil {
return err
}
for _, f := range files {
if !f.IsDir() {
sp := filepath.Join(srcPath, f.Name())
dp := filepath.Join(dstPath, f.Name())
if err = d.tryCopy(sp, dp, f); err != nil {
return err
}
}
}
for _, f := range files {
if f.IsDir() {
sp := filepath.Join(srcPath, f.Name())
dp := filepath.Join(dstPath, f.Name())
if err = d.recurAndTryCopy(sp, dp); err != nil {
return err
}
}
}
return nil
}
func tryReflinkCopy(srcPath, dstPath string) error {
err := reflink.Always(srcPath, dstPath)
if errors.Is(err, reflink.ErrReflinkUnsupported) || errors.Is(err, reflink.ErrReflinkFailed) || isCrossDeviceError(err) {
return errs.NotImplement
}
return err
}

View File

@@ -3,11 +3,13 @@
package local
import (
"errors"
"io/fs"
"strings"
"syscall"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"golang.org/x/sys/unix"
)
func isHidden(f fs.FileInfo, _ string) bool {
@@ -27,3 +29,7 @@ func getDiskUsage(path string) (model.DiskUsage, error) {
FreeSpace: free,
}, nil
}
func isCrossDeviceError(err error) bool {
return errors.Is(err, unix.EXDEV)
}

View File

@@ -49,3 +49,7 @@ func getDiskUsage(path string) (model.DiskUsage, error) {
FreeSpace: freeBytes,
}, nil
}
func isCrossDeviceError(err error) bool {
return errors.Is(err, windows.ERROR_NOT_SAME_DEVICE)
}

View File

@@ -57,18 +57,22 @@ func setBody(body interface{}) base.ReqCallback {
}
func handleFolderId(dir model.Obj) interface{} {
if dir.GetID() == "" {
return nil
if isRootFolder(dir) {
return nil // Root folder doesn't need folderId
}
return dir.GetID()
}
func isRootFolder(dir model.Obj) bool {
return dir.GetID() == ""
}
// API layer methods
func (d *Misskey) getFiles(dir model.Obj) ([]model.Obj, error) {
var files []MFile
var body map[string]string
if dir.GetPath() != "/" {
if !isRootFolder(dir) {
body = map[string]string{"folderId": dir.GetID()}
} else {
body = map[string]string{}
@@ -85,7 +89,7 @@ func (d *Misskey) getFiles(dir model.Obj) ([]model.Obj, error) {
func (d *Misskey) getFolders(dir model.Obj) ([]model.Obj, error) {
var folders []MFolder
var body map[string]string
if dir.GetPath() != "/" {
if !isRootFolder(dir) {
body = map[string]string{"folderId": dir.GetID()}
} else {
body = map[string]string{}
@@ -197,16 +201,24 @@ func (d *Misskey) put(ctx context.Context, dstDir model.Obj, stream model.FileSt
Reader: stream,
UpdateProgress: up,
})
// Build form data, only add folderId if not root folder
formData := map[string]string{
"name": stream.GetName(),
"comment": "",
"isSensitive": "false",
"force": "false",
}
folderId := handleFolderId(dstDir)
if folderId != nil {
formData["folderId"] = folderId.(string)
}
req := base.RestyClient.R().
SetContext(ctx).
SetFileReader("file", stream.GetName(), reader).
SetFormData(map[string]string{
"folderId": handleFolderId(dstDir).(string),
"name": stream.GetName(),
"comment": "",
"isSensitive": "false",
"force": "false",
}).
SetFormData(formData).
SetResult(&file).
SetAuthToken(d.AccessToken)

View File

@@ -7,19 +7,19 @@ import (
type Addition struct {
driver.RootPath
Region string `json:"region" type:"select" required:"true" options:"global,cn,us,de" default:"global"`
IsSharepoint bool `json:"is_sharepoint"`
UseOnlineAPI bool `json:"use_online_api" default:"true"`
APIAddress string `json:"api_url_address" default:"https://api.oplist.org/onedrive/renewapi"`
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
RedirectUri string `json:"redirect_uri" required:"true" default:"https://api.oplist.org/onedrive/callback"`
RefreshToken string `json:"refresh_token" required:"true"`
SiteId string `json:"site_id"`
ChunkSize int64 `json:"chunk_size" type:"number" default:"5"`
CustomHost string `json:"custom_host" help:"Custom host for onedrive download link"`
DisableDiskUsage bool `json:"disable_disk_usage" default:"false"`
EnableDirectUpload bool `json:"enable_direct_upload" default:"false" help:"Enable direct upload from client to OneDrive"`
Region string `json:"region" type:"select" required:"true" options:"global,cn,us,de" default:"global"`
IsSharepoint bool `json:"is_sharepoint"`
UseOnlineAPI bool `json:"use_online_api" default:"true"`
APIAddress string `json:"api_url_address" default:"https://api.oplist.org/onedrive/renewapi"`
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
RedirectUri string `json:"redirect_uri" required:"true" default:"https://api.oplist.org/onedrive/callback"`
RefreshToken string `json:"refresh_token" required:"true"`
SiteId string `json:"site_id"`
ChunkSize int64 `json:"chunk_size" type:"number" default:"5"`
CustomHost string `json:"custom_host" help:"Custom host for onedrive download link"`
DisableDiskUsage bool `json:"disable_disk_usage" default:"false"`
EnableDirectUpload bool `json:"enable_direct_upload" default:"false" help:"Enable direct upload from client to OneDrive"`
}
var config = driver.Config{

View File

@@ -117,7 +117,7 @@ func (d *OpenList) Link(ctx context.Context, file model.Obj, args model.LinkArgs
if d.PassUAToUpsteam {
userAgent := args.Header.Get("user-agent")
if userAgent != "" {
headers["User-Agent"] = base.UserAgent
headers["User-Agent"] = userAgent
}
}
// if PassIPToUpsteam is true, then pass the ip address to the upstream
@@ -360,6 +360,7 @@ func (d *OpenList) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.O
Name: []string{name},
PutIntoNewDir: args.PutIntoNewDir,
SrcDir: dir,
Overwrite: args.Overwrite,
})
})
return err

View File

@@ -167,4 +167,5 @@ type DecompressReq struct {
Name []string `json:"name"`
PutIntoNewDir bool `json:"put_into_new_dir"`
SrcDir string `json:"src_dir"`
Overwrite bool `json:"overwrite"`
}

View File

@@ -217,11 +217,10 @@ func (d *QuarkOrUC) GetDetails(ctx context.Context) (*model.StorageDetails, erro
if err != nil {
return nil, err
}
used := memberInfo.Data.UseCapacity
total := memberInfo.Data.TotalCapacity
return &model.StorageDetails{
DiskUsage: model.DiskUsage{
TotalSpace: memberInfo.Data.TotalCapacity,
FreeSpace: memberInfo.Data.TotalCapacity - memberInfo.Data.UseCapacity,
},
DiskUsage: driver.DiskUsageFromUsedAndTotal(used, total),
}, nil
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/cron"
@@ -24,9 +25,10 @@ import (
type S3 struct {
model.Storage
Addition
Session *session.Session
client *s3.S3
linkClient *s3.S3
Session *session.Session
client *s3.S3
linkClient *s3.S3
directUploadClient *s3.S3
config driver.Config
cron *cron.Cron
@@ -52,16 +54,18 @@ func (d *S3) Init(ctx context.Context) error {
if err != nil {
log.Errorln("Doge init session error:", err)
}
d.client = d.getClient(false)
d.linkClient = d.getClient(true)
d.client = d.getClient(ClientTypeNormal)
d.linkClient = d.getClient(ClientTypeLink)
d.directUploadClient = d.getClient(ClientTypeDirectUpload)
})
}
err := d.initSession()
if err != nil {
return err
}
d.client = d.getClient(false)
d.linkClient = d.getClient(true)
d.client = d.getClient(ClientTypeNormal)
d.linkClient = d.getClient(ClientTypeLink)
d.directUploadClient = d.getClient(ClientTypeDirectUpload)
return nil
}
@@ -210,4 +214,33 @@ func (d *S3) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer, up
return err
}
func (d *S3) GetDirectUploadTools() []string {
if !d.EnableDirectUpload {
return nil
}
return []string{"HttpDirect"}
}
func (d *S3) GetDirectUploadInfo(ctx context.Context, _ string, dstDir model.Obj, fileName string, _ int64) (any, error) {
if !d.EnableDirectUpload {
return nil, errs.NotImplement
}
path := getKey(stdpath.Join(dstDir.GetPath(), fileName), false)
req, _ := d.directUploadClient.PutObjectRequest(&s3.PutObjectInput{
Bucket: &d.Bucket,
Key: &path,
})
if req == nil {
return nil, fmt.Errorf("failed to create PutObject request")
}
link, err := req.Presign(time.Hour * time.Duration(d.SignURLExpire))
if err != nil {
return nil, err
}
return &model.HttpDirectUploadInfo{
UploadURL: link,
Method: "PUT",
}, nil
}
var _ driver.Driver = (*S3)(nil)

View File

@@ -21,6 +21,8 @@ type Addition struct {
ListObjectVersion string `json:"list_object_version" type:"select" options:"v1,v2" default:"v1"`
RemoveBucket bool `json:"remove_bucket" help:"Remove bucket name from path when using custom host."`
AddFilenameToDisposition bool `json:"add_filename_to_disposition" help:"Add filename to Content-Disposition header."`
EnableDirectUpload bool `json:"enable_direct_upload" default:"false"`
DirectUploadHost string `json:"direct_upload_host" required:"false"`
}
func init() {

View File

@@ -41,9 +41,15 @@ func (d *S3) initSession() error {
return err
}
func (d *S3) getClient(link bool) *s3.S3 {
const (
ClientTypeNormal = iota
ClientTypeLink
ClientTypeDirectUpload
)
func (d *S3) getClient(clientType int) *s3.S3 {
client := s3.New(d.Session)
if link && d.CustomHost != "" {
if clientType == ClientTypeLink && d.CustomHost != "" {
client.Handlers.Build.PushBack(func(r *request.Request) {
if r.HTTPRequest.Method != http.MethodGet {
return
@@ -58,6 +64,20 @@ func (d *S3) getClient(link bool) *s3.S3 {
}
})
}
if clientType == ClientTypeDirectUpload && d.DirectUploadHost != "" {
client.Handlers.Build.PushBack(func(r *request.Request) {
if r.HTTPRequest.Method != http.MethodPut {
return
}
split := strings.SplitN(d.DirectUploadHost, "://", 2)
if utils.SliceContains([]string{"http", "https"}, split[0]) {
r.HTTPRequest.URL.Scheme = split[0]
r.HTTPRequest.URL.Host = split[1]
} else {
r.HTTPRequest.URL.Host = d.DirectUploadHost
}
})
}
return client
}

View File

@@ -96,7 +96,7 @@ func (d *Strm) Init(ctx context.Context) error {
}
}
if d.Version != 3 {
if d.Version != 5 {
types := strings.Split("mp4,mkv,flv,avi,wmv,ts,rmvb,webm,mp3,flac,aac,wav,ogg,m4a,wma,alac", ",")
for _, ext := range types {
if _, ok := d.supportSuffix[ext]; !ok {
@@ -109,12 +109,13 @@ func (d *Strm) Init(ctx context.Context) error {
types = strings.Split("ass,srt,vtt,sub,strm", ",")
for _, ext := range types {
if _, ok := d.downloadSuffix[ext]; !ok {
d.supportSuffix[ext] = struct{}{}
d.downloadSuffix[ext] = struct{}{}
downloadTypes = append(downloadTypes, ext)
}
}
d.DownloadFileTypes = strings.Join(downloadTypes, ",")
d.Version = 3
d.PathPrefix = "/d"
d.Version = 5
}
return nil
}

View File

@@ -27,7 +27,7 @@ func UpdateLocalStrm(ctx context.Context, path string, objs []model.Obj) {
localPath := stdpath.Join(localParentPath, obj.GetName())
generateStrm(ctx, driver, obj, localPath)
}
deleteExtraFiles(localParentPath, objs)
deleteExtraFiles(driver, localParentPath, objs)
}
_ = strmTrie.VisitPrefixes(patricia.Prefix(path), func(needPathPrefix patricia.Prefix, item patricia.Item) error {
@@ -129,7 +129,7 @@ func generateStrm(ctx context.Context, driver *Strm, obj model.Obj, localPath st
}
}
func deleteExtraFiles(localPath string, objs []model.Obj) {
func deleteExtraFiles(driver *Strm, localPath string, objs []model.Obj) {
localFiles, err := getLocalFiles(localPath)
if err != nil {
log.Errorf("Failed to read local files from %s: %v", localPath, err)
@@ -137,15 +137,29 @@ func deleteExtraFiles(localPath string, objs []model.Obj) {
}
objsSet := make(map[string]struct{})
objsBaseNameSet := make(map[string]struct{})
for _, obj := range objs {
if obj.IsDir() {
continue
}
objsSet[stdpath.Join(localPath, obj.GetName())] = struct{}{}
objName := obj.GetName()
objsSet[stdpath.Join(localPath, objName)] = struct{}{}
objBaseName := strings.TrimSuffix(objName, utils.SourceExt(objName))
objsBaseNameSet[stdpath.Join(localPath, objBaseName[:len(objBaseName)-1])] = struct{}{}
}
for _, localFile := range localFiles {
if _, exists := objsSet[localFile]; !exists {
ext := utils.Ext(localFile)
localFileName := stdpath.Base(localFile)
localFileBaseName := strings.TrimSuffix(localFile, utils.SourceExt(localFileName))
_, nameExists := objsBaseNameSet[localFileBaseName[:len(localFileBaseName)-1]]
_, downloadFile := driver.downloadSuffix[ext]
if driver.KeepLocalDownloadFile && nameExists && downloadFile {
continue
}
err := os.Remove(localFile)
if err != nil {
log.Errorf("Failed to delete file: %s, error: %v\n", localFile, err)

View File

@@ -6,15 +6,17 @@ import (
)
type Addition struct {
Paths string `json:"paths" required:"true" type:"text"`
SiteUrl string `json:"siteUrl" type:"text" required:"false" help:"The prefix URL of the strm file"`
DownloadFileTypes string `json:"downloadFileTypes" type:"text" default:"ass,srt,vtt,sub,strm" required:"false" help:"Files need to download with strm (usally subtitles)"`
FilterFileTypes string `json:"filterFileTypes" type:"text" default:"mp4,mkv,flv,avi,wmv,ts,rmvb,webm,mp3,flac,aac,wav,ogg,m4a,wma,alac" required:"false" help:"Supports suffix name of strm file"`
EncodePath bool `json:"encodePath" default:"true" required:"true" help:"encode the path in the strm file"`
WithoutUrl bool `json:"withoutUrl" default:"false" help:"strm file content without URL prefix"`
SaveStrmToLocal bool `json:"SaveStrmToLocal" default:"false" help:"save strm file locally"`
SaveStrmLocalPath string `json:"SaveStrmLocalPath" type:"text" help:"save strm file local path"`
Version int
Paths string `json:"paths" required:"true" type:"text"`
SiteUrl string `json:"siteUrl" type:"text" required:"false" help:"The prefix URL of the strm file"`
PathPrefix string `json:"PathPrefix" type:"text" required:"false" default:"/d" help:"Path prefix"`
DownloadFileTypes string `json:"downloadFileTypes" type:"text" default:"ass,srt,vtt,sub,strm" required:"false" help:"Files need to download with strm (usally subtitles)"`
FilterFileTypes string `json:"filterFileTypes" type:"text" default:"mp4,mkv,flv,avi,wmv,ts,rmvb,webm,mp3,flac,aac,wav,ogg,m4a,wma,alac" required:"false" help:"Supports suffix name of strm file"`
EncodePath bool `json:"encodePath" default:"true" required:"true" help:"encode the path in the strm file"`
WithoutUrl bool `json:"withoutUrl" default:"false" help:"strm file content without URL prefix"`
SaveStrmToLocal bool `json:"SaveStrmToLocal" default:"false" help:"save strm file locally"`
SaveStrmLocalPath string `json:"SaveStrmLocalPath" type:"text" help:"save strm file local path"`
KeepLocalDownloadFile bool `json:"KeepLocalDownloadFile" default:"false" help:"keep local download files"`
Version int
}
var config = driver.Config{

View File

@@ -3,7 +3,6 @@ package strm
import (
"context"
"fmt"
stdpath "path"
"strings"
@@ -69,11 +68,12 @@ func (d *Strm) convert2strmObjs(ctx context.Context, reqPath string, objs []mode
if !obj.IsDir() {
path = stdpath.Join(reqPath, obj.GetName())
ext := strings.ToLower(utils.Ext(name))
sourceExt := utils.SourceExt(name)
if _, ok := d.downloadSuffix[ext]; ok {
size = obj.GetSize()
} else if _, ok := d.supportSuffix[ext]; ok {
id = "strm"
name = strings.TrimSuffix(name, ext) + "strm"
name = strings.TrimSuffix(name, sourceExt) + "strm"
size = int64(len(d.getLink(ctx, path)))
} else {
continue
@@ -111,6 +111,13 @@ func (d *Strm) getLink(ctx context.Context, path string) string {
signPath := sign.Sign(path)
finalPath = fmt.Sprintf("%s?sign=%s", finalPath, signPath)
}
pathPrefix := d.PathPrefix
if len(pathPrefix) > 0 {
finalPath = stdpath.Join(pathPrefix, finalPath)
}
if !strings.HasPrefix(finalPath, "/") {
finalPath = "/" + finalPath
}
if d.WithoutUrl {
return finalPath
}
@@ -120,10 +127,7 @@ func (d *Strm) getLink(ctx context.Context, path string) string {
} else {
apiUrl = common.GetApiUrl(ctx)
}
if !strings.HasPrefix(finalPath, "/") {
finalPath = "/" + finalPath
}
return fmt.Sprintf("%s/d%s",
return fmt.Sprintf("%s%s",
apiUrl,
finalPath)
}

View File

@@ -20,6 +20,7 @@ var config = driver.Config{
LocalSort: true,
NoCache: true,
CheckStatus: true,
OnlyIndices: true,
}
func init() {

3
go.mod
View File

@@ -5,6 +5,7 @@ go 1.23.4
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2
github.com/KarpelesLab/reflink v1.0.2
github.com/KirCute/zip v1.0.1
github.com/OpenListTeam/go-cache v0.1.0
github.com/OpenListTeam/sftpd-openlist v1.0.1
@@ -114,7 +115,6 @@ require (
github.com/minio/minlz v1.0.0 // indirect
github.com/minio/xxml v0.0.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/otiai10/mint v1.6.3 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/relvacode/iso8601 v1.6.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
@@ -256,7 +256,6 @@ require (
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/otiai10/copy v1.14.1
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect

6
go.sum
View File

@@ -39,6 +39,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Da3zKi7/saferith v0.33.0-fixed h1:fnIWTk7EP9mZAICf7aQjeoAwpfrlCrkOvqmi6CbWdTk=
github.com/Da3zKi7/saferith v0.33.0-fixed/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA=
github.com/KarpelesLab/reflink v1.0.2 h1:hQ1aM3TmjU2kTNUx5p/HaobDoADYk+a6AuEinG4Cv88=
github.com/KarpelesLab/reflink v1.0.2/go.mod h1:WGkTOKNjd1FsJKBw3mu4JvrPEDJyJJ+JPtxBkbPoCok=
github.com/KirCute/zip v1.0.1 h1:L/tVZglOiDVKDi9Ud+fN49htgKdQ3Z0H80iX8OZk13c=
github.com/KirCute/zip v1.0.1/go.mod h1:xhF7dCB+Bjvy+5a56lenYCKBsH+gxDNPZSy5Cp+nlXk=
github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g=
@@ -585,10 +587,6 @@ github.com/ncw/swift/v2 v2.0.4 h1:hHWVFxn5/YaTWAASmn4qyq2p6OyP/Hm3vMLzkjEqR7w=
github.com/ncw/swift/v2 v2.0.4/go.mod h1:cbAO76/ZwcFrFlHdXPjaqWZ9R7Hdar7HpjRXBfbjigk=
github.com/nwaples/rardecode/v2 v2.1.1 h1:OJaYalXdliBUXPmC8CZGQ7oZDxzX1/5mQmgn0/GASew=
github.com/nwaples/rardecode/v2 v2.1.1/go.mod h1:7uz379lSxPe6j9nvzxUZ+n7mnJNgjsRNb6IbvGVHRmw=
github.com/otiai10/copy v1.14.1 h1:5/7E6qsUMBaH5AnQ0sSLzzTg1oTECmcCmT6lvF45Na8=
github.com/otiai10/copy v1.14.1/go.mod h1:oQwrEDDOci3IM8dJF0d8+jnbfPDllW6vUjNc3DoZm9I=
github.com/otiai10/mint v1.6.3 h1:87qsV/aw1F5as1eH1zS/yqHY85ANKVMgkDrf9rcxbQs=
github.com/otiai10/mint v1.6.3/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM=
github.com/panjf2000/ants/v2 v2.4.2/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=

View File

@@ -22,7 +22,7 @@ func (RarDecoder) AcceptedExtensions() []string {
func (RarDecoder) AcceptedMultipartExtensions() map[string]tool.MultipartExtension {
return map[string]tool.MultipartExtension{
".part1.rar": {regexp.MustCompile("^.*\\.part(\\d+)\\.rar$"), 2},
".part1.rar": {PartFileFormat: regexp.MustCompile(`^.*\.part(\d+)\.rar$`), SecondPartIndex: 2},
}
}

View File

@@ -19,7 +19,7 @@ func (SevenZip) AcceptedExtensions() []string {
func (SevenZip) AcceptedMultipartExtensions() map[string]tool.MultipartExtension {
return map[string]tool.MultipartExtension{
".7z.001": {regexp.MustCompile("^.*\\.7z\\.(\\d+)$"), 2},
".7z.001": {PartFileFormat: regexp.MustCompile(`^.*\.7z\.(\d+)$`), SecondPartIndex: 2},
}
}

View File

@@ -22,8 +22,8 @@ func (z *Zip) AcceptedExtensions() []string {
func (z *Zip) AcceptedMultipartExtensions() map[string]tool.MultipartExtension {
return map[string]tool.MultipartExtension{
".zip": {regexp.MustCompile("^.*\\.z(\\d+)$"), 1},
".zip.001": {regexp.MustCompile("^.*\\.zip\\.(\\d+)$"), 2},
".zip": {PartFileFormat: regexp.MustCompile(`^.*\.z(\d+)$`), SecondPartIndex: 1},
".zip.001": {PartFileFormat: regexp.MustCompile(`^.*\.zip\.(\d+)$`), SecondPartIndex: 2},
}
}
@@ -135,6 +135,6 @@ var _ tool.Tool = (*Zip)(nil)
func init() {
tool.RegisterTool(&Zip{
traditionalSecondPartRegExp: regexp.MustCompile("^.*\\.z0*1$"),
traditionalSecondPartRegExp: regexp.MustCompile(`^.*\.z0*1$`),
})
}

View File

@@ -177,6 +177,9 @@ func InitialSettings() []model.SettingItem {
{Key: conf.ShareArchivePreview, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC},
{Key: conf.ShareForceProxy, Value: "true", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PRIVATE},
{Key: conf.ShareSummaryContent, Value: "@{{creator}} shared {{#each files}}{{#if @first}}\"{{filename this}}\"{{/if}}{{#if @last}}{{#unless (eq @index 0)}} and {{@index}} more files{{/unless}}{{/if}}{{/each}} from {{site_title}}: {{base_url}}/@s/{{id}}{{#if pwd}} , the share code is {{pwd}}{{/if}}{{#if expires}}, please access before {{dateLocaleString expires}}.{{/if}}", Type: conf.TypeText, Group: model.GLOBAL, Flag: model.PUBLIC},
{Key: conf.HandleHookAfterWriting, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PRIVATE},
{Key: conf.HandleHookRateLimit, Value: "0", Type: conf.TypeNumber, Group: model.GLOBAL, Flag: model.PRIVATE},
{Key: conf.IgnoreSystemFiles, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PRIVATE, Help: `When enabled, ignores common system files during upload (.DS_Store, desktop.ini, Thumbs.db, and files starting with ._)`},
// single settings
{Key: conf.Token, Value: token, Type: conf.TypeString, Group: model.SINGLE, Flag: model.PRIVATE},

View File

@@ -56,6 +56,9 @@ const (
ShareArchivePreview = "share_archive_preview"
ShareForceProxy = "share_force_proxy"
ShareSummaryContent = "share_summary_content"
HandleHookAfterWriting = "handle_hook_after_writing"
HandleHookRateLimit = "handle_hook_rate_limit"
IgnoreSystemFiles = "ignore_system_files"
// index
SearchIndex = "search_index"

View File

@@ -22,6 +22,8 @@ type Config struct {
// - LinkCacheNone: no extra info added to cache key (default)
// - flags (OR-able) can add more attributes to cache key (IP, UA, ...)
LinkCacheMode `json:"-"`
// if the driver only store indices of files (e.g. UrlTree)
OnlyIndices bool `json:"only_indices"`
}
type LinkCacheMode int8

View File

@@ -11,6 +11,7 @@ var (
ObjectAlreadyExists = errors.New("object already exists")
NotFolder = errors.New("not a folder")
NotFile = errors.New("not a file")
IgnoredSystemFile = errors.New("system file upload ignored")
)
func IsObjectNotFound(err error) bool {

View File

@@ -125,6 +125,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
DstActualPath: t.DstActualPath,
dstStorage: t.DstStorage,
DstStorageMp: t.DstStorageMp,
overwrite: t.Overwrite,
}
return uploadTask, nil
}
@@ -142,6 +143,7 @@ type ArchiveContentUploadTask struct {
DstStorageMp string
finalized bool
groupID string
overwrite bool
}
func (t *ArchiveContentUploadTask) GetName() string {
@@ -232,6 +234,7 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
dstStorage: t.dstStorage,
DstStorageMp: t.DstStorageMp,
groupID: t.groupID,
overwrite: t.overwrite,
})
if err != nil {
es = stderrors.Join(es, err)
@@ -241,6 +244,12 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
return es
}
} else {
if !t.overwrite {
dstPath := stdpath.Join(t.DstActualPath, t.ObjName)
if res, _ := op.Get(t.Ctx(), t.dstStorage, dstPath); res != nil {
return errs.ObjectAlreadyExists
}
}
file, err := os.Open(t.FilePath)
if err != nil {
return err

View File

@@ -24,14 +24,17 @@ type taskType uint8
func (t taskType) String() string {
if t == 0 {
return "copy"
} else {
} else if t == 1 {
return "move"
} else {
return "merge"
}
}
const (
copy taskType = iota
move
merge
)
type FileTransferTask struct {
@@ -67,7 +70,7 @@ func (t *FileTransferTask) Run() error {
return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error {
nextTask.groupID = t.groupID
task_group.TransferCoordinator.AddTask(t.groupID, nil)
if t.TaskType == copy {
if t.TaskType == copy || t.TaskType == merge {
CopyTaskManager.Add(nextTask)
} else {
MoveTaskManager.Add(nextTask)
@@ -109,7 +112,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
}
if srcStorage.GetStorage() == dstStorage.GetStorage() {
if taskType == copy {
if taskType == copy || taskType == merge {
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
return nil, err
@@ -161,7 +164,7 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
t.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
t.ApiUrl = common.GetApiUrl(ctx)
t.groupID = dstDirPath
if taskType == copy {
if taskType == copy || taskType == merge {
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
CopyTaskManager.Add(t)
} else {
@@ -177,6 +180,7 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
@@ -184,17 +188,34 @@ func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransfer
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
}
dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
if t.TaskType == copy {
if t.TaskType == copy || t.TaskType == merge {
if t.Ctx().Value(conf.NoTaskKey) != nil {
defer op.Cache.DeleteDirectory(t.DstStorage, dstActualPath)
} else {
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
}
}
existedObjs := make(map[string]bool)
if t.TaskType == merge {
dstObjs, _ := op.List(t.Ctx(), t.DstStorage, dstActualPath, model.ListArgs{})
for _, obj := range dstObjs {
if !obj.IsDir() {
existedObjs[obj.GetName()] = true
}
}
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
if t.TaskType == merge && !obj.IsDir() && existedObjs[obj.GetName()] {
// skip existed file
continue
}
err = f(&FileTransferTask{
TaskType: t.TaskType,
TaskData: TaskData{

View File

@@ -84,6 +84,14 @@ func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool)
return res, err
}
func Merge(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
res, err := transfer(ctx, merge, srcObjPath, dstDirPath, lazyCache...)
if err != nil {
log.Errorf("failed merge %s to %s: %+v", srcObjPath, dstDirPath, err)
}
return res, err
}
func Rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error {
err := rename(ctx, srcPath, dstName, lazyCache...)
if err != nil {

View File

@@ -77,6 +77,7 @@ type ArchiveDecompressArgs struct {
ArchiveInnerArgs
CacheFull bool
PutIntoNewDir bool
Overwrite bool
}
type SharingListArgs struct {

View File

@@ -11,6 +11,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/archive/tool"
"github.com/OpenListTeam/OpenList/v4/internal/cache"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
@@ -20,10 +21,13 @@ import (
gocache "github.com/OpenListTeam/go-cache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
var archiveMetaCache = gocache.NewMemCache(gocache.WithShards[*model.ArchiveMetaProvider](64))
var archiveMetaG singleflight.Group[*model.ArchiveMetaProvider]
var (
archiveMetaCache = gocache.NewMemCache(gocache.WithShards[*model.ArchiveMetaProvider](64))
archiveMetaG singleflight.Group[*model.ArchiveMetaProvider]
)
func GetArchiveMeta(ctx context.Context, storage driver.Driver, path string, args model.ArchiveMetaArgs) (*model.ArchiveMetaProvider, error) {
if storage.Config().CheckStatus && storage.GetStorage().Status != WORK {
@@ -196,8 +200,10 @@ func getArchiveMeta(ctx context.Context, storage driver.Driver, path string, arg
return obj, archiveMetaProvider, err
}
var archiveListCache = gocache.NewMemCache(gocache.WithShards[[]model.Obj](64))
var archiveListG singleflight.Group[[]model.Obj]
var (
archiveListCache = gocache.NewMemCache(gocache.WithShards[[]model.Obj](64))
archiveListG singleflight.Group[[]model.Obj]
)
func ListArchive(ctx context.Context, storage driver.Driver, path string, args model.ArchiveListArgs) ([]model.Obj, error) {
if storage.Config().CheckStatus && storage.GetStorage().Status != WORK {
@@ -397,8 +403,10 @@ type objWithLink struct {
obj model.Obj
}
var extractCache = cache.NewKeyedCache[*objWithLink](5 * time.Minute)
var extractG = singleflight.Group[*objWithLink]{}
var (
extractCache = cache.NewKeyedCache[*objWithLink](5 * time.Minute)
extractG = singleflight.Group[*objWithLink]{}
)
func DriverExtract(ctx context.Context, storage driver.Driver, path string, args model.ArchiveInnerArgs) (*model.Link, model.Obj, error) {
if storage.Config().CheckStatus && storage.GetStorage().Status != WORK {
@@ -506,9 +514,9 @@ func ArchiveDecompress(ctx context.Context, storage driver.Driver, srcPath, dstD
return errors.WithMessage(err, "failed to get dst dir")
}
var newObjs []model.Obj
switch s := storage.(type) {
case driver.ArchiveDecompressResult:
var newObjs []model.Obj
newObjs, err = s.ArchiveDecompress(ctx, srcObj, dstDir, args)
if err == nil {
if len(newObjs) > 0 {
@@ -527,5 +535,31 @@ func ArchiveDecompress(ctx context.Context, storage driver.Driver, srcPath, dstD
default:
return errs.NotImplement
}
if !utils.IsBool(lazyCache...) && err == nil && needHandleObjsUpdateHook() {
onlyList := false
targetPath := dstDirPath
if newObjs != nil && len(newObjs) == 1 && newObjs[0].IsDir() {
targetPath = stdpath.Join(dstDirPath, newObjs[0].GetName())
} else if newObjs != nil && len(newObjs) == 1 && !newObjs[0].IsDir() {
onlyList = true
} else if args.PutIntoNewDir {
targetPath = stdpath.Join(dstDirPath, strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName())))
} else if innerBase := stdpath.Base(args.InnerPath); innerBase != "." && innerBase != "/" {
targetPath = stdpath.Join(dstDirPath, innerBase)
dstObj, e := GetUnwrap(ctx, storage, targetPath)
onlyList = e != nil || !dstObj.IsDir()
}
if onlyList {
go List(context.Background(), storage, dstDirPath, model.ListArgs{Refresh: true})
} else {
var limiter *rate.Limiter
if l, _ := GetSettingItemByKey(conf.HandleHookRateLimit); l != nil {
if f, e := strconv.ParseFloat(l.Value, 64); e == nil && f > .0 {
limiter = rate.NewLimiter(rate.Limit(f), 1)
}
}
go RecursivelyListStorage(context.Background(), storage, targetPath, limiter, nil)
}
}
return errors.WithStack(err)
}

View File

@@ -2,10 +2,11 @@ package op
import (
"context"
stderrors "errors"
stdpath "path"
"strconv"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
@@ -14,6 +15,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
var listG singleflight.Group[[]model.Obj]
@@ -173,10 +175,10 @@ func Link(ctx context.Context, storage driver.Driver, path string, args model.Li
mode = storage.(driver.LinkCacheModeResolver).ResolveLinkCacheMode(path)
}
typeKey := args.Type
if mode&driver.LinkCacheIP == 1 {
if mode&driver.LinkCacheIP == driver.LinkCacheIP {
typeKey += "/" + args.IP
}
if mode&driver.LinkCacheUA == 1 {
if mode&driver.LinkCacheUA == driver.LinkCacheUA {
typeKey += "/" + args.Header.Get("User-Agent")
}
key := Key(storage, path)
@@ -310,7 +312,7 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
srcDirPath := stdpath.Dir(srcPath)
dstDirPath = utils.FixAndCleanPath(dstDirPath)
if dstDirPath == srcDirPath {
return stderrors.New("move in place")
return errors.New("move in place")
}
srcRawObj, err := Get(ctx, storage, srcPath)
if err != nil {
@@ -343,8 +345,24 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
}
}
default:
return errs.NotImplement
err = errs.NotImplement
}
if !utils.IsBool(lazyCache...) && err == nil && needHandleObjsUpdateHook() {
if !srcObj.IsDir() {
go List(context.Background(), storage, dstDirPath, model.ListArgs{Refresh: true})
} else {
targetPath := stdpath.Join(dstDirPath, srcObj.GetName())
var limiter *rate.Limiter
if l, _ := GetSettingItemByKey(conf.HandleHookRateLimit); l != nil {
if f, e := strconv.ParseFloat(l.Value, 64); e == nil && f > .0 {
limiter = rate.NewLimiter(rate.Limit(f), 1)
}
}
go RecursivelyListStorage(context.Background(), storage, targetPath, limiter, nil)
}
}
return errors.WithStack(err)
}
@@ -397,7 +415,7 @@ func Copy(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
srcPath = utils.FixAndCleanPath(srcPath)
dstDirPath = utils.FixAndCleanPath(dstDirPath)
if dstDirPath == stdpath.Dir(srcPath) {
return stderrors.New("copy in place")
return errors.New("copy in place")
}
srcRawObj, err := Get(ctx, storage, srcPath)
if err != nil {
@@ -428,8 +446,24 @@ func Copy(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
}
}
default:
return errs.NotImplement
err = errs.NotImplement
}
if !utils.IsBool(lazyCache...) && err == nil && needHandleObjsUpdateHook() {
if !srcObj.IsDir() {
go List(context.Background(), storage, dstDirPath, model.ListArgs{Refresh: true})
} else {
targetPath := stdpath.Join(dstDirPath, srcObj.GetName())
var limiter *rate.Limiter
if l, _ := GetSettingItemByKey(conf.HandleHookRateLimit); l != nil {
if f, e := strconv.ParseFloat(l.Value, 64); e == nil && f > .0 {
limiter = rate.NewLimiter(rate.Limit(f), 1)
}
}
go RecursivelyListStorage(context.Background(), storage, targetPath, limiter, nil)
}
}
return errors.WithStack(err)
}
@@ -475,7 +509,7 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file mod
return errors.WithMessagef(errs.StorageNotInit, "storage status: %s", storage.GetStorage().Status)
}
// UrlTree PUT
if storage.GetStorage().Driver == "UrlTree" {
if storage.Config().OnlyIndices {
var link string
dstDirPath, link = urlTreeSplitLineFormPath(stdpath.Join(dstDirPath, file.GetName()))
file = &stream.FileStream{Obj: &model.Object{Name: link}}
@@ -557,6 +591,9 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file mod
err = Remove(ctx, storage, tempPath)
}
}
if !utils.IsBool(lazyCache...) && err == nil && needHandleObjsUpdateHook() {
go List(context.Background(), storage, dstDirPath, model.ListArgs{Refresh: true})
}
return errors.WithStack(err)
}
@@ -601,6 +638,9 @@ func PutURL(ctx context.Context, storage driver.Driver, dstDirPath, dstName, url
default:
return errors.WithStack(errs.NotImplement)
}
if !utils.IsBool(lazyCache...) && err == nil && needHandleObjsUpdateHook() {
go List(context.Background(), storage, dstDirPath, model.ListArgs{Refresh: true})
}
log.Debugf("put url [%s](%s) done", dstName, url)
return errors.WithStack(err)
}
@@ -644,3 +684,8 @@ func GetDirectUploadInfo(ctx context.Context, tool string, storage driver.Driver
}
return info, nil
}
func needHandleObjsUpdateHook() bool {
needHandle, _ := GetSettingItemByKey(conf.HandleHookAfterWriting)
return needHandle != nil && (needHandle.Value == "true" || needHandle.Value == "1")
}

View File

@@ -0,0 +1,125 @@
package op
import (
"context"
stdpath "path"
"sync"
"sync/atomic"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
var (
ManualScanCancel = atomic.Pointer[context.CancelFunc]{}
ScannedCount = atomic.Uint64{}
)
func ManualScanRunning() bool {
return ManualScanCancel.Load() != nil
}
func BeginManualScan(rawPath string, limit float64) error {
rawPath = utils.FixAndCleanPath(rawPath)
ctx, cancel := context.WithCancel(context.Background())
if !ManualScanCancel.CompareAndSwap(nil, &cancel) {
cancel()
return errors.New("manual scan is running, please try later")
}
ScannedCount.Store(0)
go func() {
defer func() { (*ManualScanCancel.Swap(nil))() }()
err := RecursivelyList(ctx, rawPath, rate.Limit(limit), &ScannedCount)
if err != nil {
log.Errorf("failed recursively list: %v", err)
}
}()
return nil
}
func StopManualScan() {
c := ManualScanCancel.Load()
if c != nil {
(*c)()
}
}
func RecursivelyList(ctx context.Context, rawPath string, limit rate.Limit, counter *atomic.Uint64) error {
storage, actualPath, err := GetStorageAndActualPath(rawPath)
if err != nil && !errors.Is(err, errs.StorageNotFound) {
return err
} else if err == nil {
var limiter *rate.Limiter
if limit > .0 {
limiter = rate.NewLimiter(limit, 1)
}
RecursivelyListStorage(ctx, storage, actualPath, limiter, counter)
} else {
var wg sync.WaitGroup
recursivelyListVirtual(ctx, rawPath, limit, counter, &wg)
wg.Wait()
}
return nil
}
func recursivelyListVirtual(ctx context.Context, rawPath string, limit rate.Limit, counter *atomic.Uint64, wg *sync.WaitGroup) {
objs := GetStorageVirtualFilesByPath(rawPath)
if counter != nil {
counter.Add(uint64(len(objs)))
}
for _, obj := range objs {
if utils.IsCanceled(ctx) {
return
}
nextPath := stdpath.Join(rawPath, obj.GetName())
storage, actualPath, err := GetStorageAndActualPath(nextPath)
if err != nil && !errors.Is(err, errs.StorageNotFound) {
log.Errorf("error recursively list: failed get storage [%s]: %v", nextPath, err)
} else if err == nil {
var limiter *rate.Limiter
if limit > .0 {
limiter = rate.NewLimiter(limit, 1)
}
wg.Add(1)
go func() {
defer wg.Done()
RecursivelyListStorage(ctx, storage, actualPath, limiter, counter)
}()
} else {
recursivelyListVirtual(ctx, nextPath, limit, counter, wg)
}
}
}
func RecursivelyListStorage(ctx context.Context, storage driver.Driver, actualPath string, limiter *rate.Limiter, counter *atomic.Uint64) {
objs, err := List(ctx, storage, actualPath, model.ListArgs{Refresh: true})
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorf("error recursively list: failed list (%s)[%s]: %v", storage.GetStorage().MountPath, actualPath, err)
}
return
}
if counter != nil {
counter.Add(uint64(len(objs)))
}
for _, obj := range objs {
if utils.IsCanceled(ctx) {
return
}
if !obj.IsDir() {
continue
}
if limiter != nil {
if err = limiter.Wait(ctx); err != nil {
return
}
}
nextPath := stdpath.Join(actualPath, obj.GetName())
RecursivelyListStorage(ctx, storage, nextPath, limiter, counter)
}
}

View File

@@ -358,16 +358,21 @@ func GetStorageVirtualFilesWithDetailsByPath(ctx context.Context, prefix string,
DriverName: d.Config().Name,
},
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
details, err := GetStorageDetails(timeoutCtx, d, refresh)
if err != nil {
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.StorageNotInit) {
log.Errorf("failed get %s storage details: %+v", d.GetStorage().MountPath, err)
resultChan := make(chan *model.StorageDetails, 1)
go func(dri driver.Driver) {
details, err := GetStorageDetails(ctx, dri, refresh)
if err != nil {
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.StorageNotInit) {
log.Errorf("failed get %s storage details: %+v", dri.GetStorage().MountPath, err)
}
}
return ret
resultChan <- details
}(d)
select {
case r := <-resultChan:
ret.StorageDetails = r
case <-time.After(time.Second):
}
ret.StorageDetails = details
return ret
})
}

View File

@@ -215,6 +215,16 @@ func Update(ctx context.Context, parent string, objs []model.Obj) {
if !progress.IsDone {
return
}
// Use task queue for Meilisearch to avoid race conditions with async indexing
if msInstance, ok := instance.(interface {
EnqueueUpdate(parent string, objs []model.Obj)
}); ok {
// Enqueue task for async processing (diff calculation happens at consumption time)
msInstance.EnqueueUpdate(parent, objs)
return
}
nodes, err := instance.Get(ctx, parent)
if err != nil {
log.Errorf("update search index error while get nodes: %+v", err)
@@ -241,27 +251,23 @@ func Update(ctx context.Context, parent string, objs []model.Obj) {
}
}
}
// collect files and folders to add in batch
var toAddObjs []ObjWithParent
for i := range objs {
if toAdd.Contains(objs[i].GetName()) {
if !objs[i].IsDir() {
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
err = Index(ctx, parent, objs[i])
if err != nil {
log.Errorf("update search index error while index new node: %+v", err)
return
}
} else {
// build index if it's a folder
dir := path.Join(parent, objs[i].GetName())
err = BuildIndex(ctx,
[]string{dir},
conf.SlicesMap[conf.IgnorePaths],
setting.GetInt(conf.MaxIndexDepth, 20)-strings.Count(dir, "/"), false)
if err != nil {
log.Errorf("update search index error while build index: %+v", err)
return
}
}
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
toAddObjs = append(toAddObjs, ObjWithParent{
Parent: parent,
Obj: objs[i],
})
}
}
// batch index all files and folders at once
if len(toAddObjs) > 0 {
err = BatchIndex(ctx, toAddObjs)
if err != nil {
log.Errorf("update search index error while batch index new nodes: %+v", err)
return
}
}
}

View File

@@ -91,6 +91,11 @@ func init() {
return nil, err
}
}
// Initialize and start task queue manager
m.taskQueue = NewTaskQueueManager(&m)
m.taskQueue.Start()
return &m, nil
})
}

View File

@@ -33,6 +33,7 @@ type Meilisearch struct {
IndexUid string
FilterableAttributes []string
SearchableAttributes []string
taskQueue *TaskQueueManager
}
func (m *Meilisearch) Config() searcher.Config {
@@ -82,14 +83,17 @@ func (m *Meilisearch) Index(ctx context.Context, node model.SearchNode) error {
}
func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode) error {
documents, _ := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
parentHash := hashPath(src.Parent)
nodePath := path.Join(src.Parent, src.Name)
nodePathHash := hashPath(nodePath)
parentPaths := utils.GetPathHierarchy(src.Parent)
parentPathHashes, _ := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
return hashPath(parentPath), nil
})
if err != nil {
return nil, err
}
return &searchDocument{
ID: nodePathHash,
@@ -98,9 +102,12 @@ func (m *Meilisearch) BatchIndex(ctx context.Context, nodes []model.SearchNode)
SearchNode: src,
}, nil
})
if err != nil {
return err
}
// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
_, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
_, err = m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
if err != nil {
return err
}
@@ -203,6 +210,9 @@ func (m *Meilisearch) Del(ctx context.Context, prefix string) error {
}
func (m *Meilisearch) Release(ctx context.Context) error {
if m.taskQueue != nil {
m.taskQueue.Stop()
}
return nil
}
@@ -219,3 +229,115 @@ func (m *Meilisearch) getTaskStatus(ctx context.Context, taskUID int64) (meilise
}
return forTask.Status, nil
}
// EnqueueUpdate enqueues an update task to the task queue
func (m *Meilisearch) EnqueueUpdate(parent string, objs []model.Obj) {
if m.taskQueue == nil {
return
}
m.taskQueue.Enqueue(parent, objs)
}
// batchIndexWithTaskUID indexes documents and returns all taskUIDs
func (m *Meilisearch) batchIndexWithTaskUID(ctx context.Context, nodes []model.SearchNode) ([]int64, error) {
if len(nodes) == 0 {
return nil, nil
}
documents, err := utils.SliceConvert(nodes, func(src model.SearchNode) (*searchDocument, error) {
parentHash := hashPath(src.Parent)
nodePath := path.Join(src.Parent, src.Name)
nodePathHash := hashPath(nodePath)
parentPaths := utils.GetPathHierarchy(src.Parent)
parentPathHashes, err := utils.SliceConvert(parentPaths, func(parentPath string) (string, error) {
return hashPath(parentPath), nil
})
if err != nil {
return nil, err
}
return &searchDocument{
ID: nodePathHash,
ParentHash: parentHash,
ParentPathHashes: parentPathHashes,
SearchNode: src,
}, nil
})
if err != nil {
return nil, err
}
// max up to 10,000 documents per batch to reduce error rate while uploading over the Internet
tasks, err := m.Client.Index(m.IndexUid).AddDocumentsInBatchesWithContext(ctx, documents, 10000)
if err != nil {
return nil, err
}
// Return all task UIDs
taskUIDs := make([]int64, 0, len(tasks))
for _, task := range tasks {
taskUIDs = append(taskUIDs, task.TaskUID)
}
return taskUIDs, nil
}
// batchDeleteWithTaskUID deletes documents and returns all taskUIDs
func (m *Meilisearch) batchDeleteWithTaskUID(ctx context.Context, paths []string) ([]int64, error) {
if len(paths) == 0 {
return nil, nil
}
// Deduplicate paths first
pathSet := make(map[string]struct{})
uniquePaths := make([]string, 0, len(paths))
for _, p := range paths {
p = utils.FixAndCleanPath(p)
if _, exists := pathSet[p]; !exists {
pathSet[p] = struct{}{}
uniquePaths = append(uniquePaths, p)
}
}
const batchSize = 100 // max paths per batch to avoid filter length limits
var taskUIDs []int64
// Process in batches to avoid filter length limits
for i := 0; i < len(uniquePaths); i += batchSize {
end := i + batchSize
if end > len(uniquePaths) {
end = len(uniquePaths)
}
batch := uniquePaths[i:end]
// Build combined filter to delete all children in one request
// Format: parent_path_hashes = 'hash1' OR parent_path_hashes = 'hash2' OR ...
var filters []string
for _, p := range batch {
pathHash := hashPath(p)
filters = append(filters, fmt.Sprintf("parent_path_hashes = '%s'", pathHash))
}
if len(filters) > 0 {
combinedFilter := strings.Join(filters, " OR ")
// Delete all children for all paths in one request
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsByFilterWithContext(ctx, combinedFilter)
if err != nil {
return nil, err
}
taskUIDs = append(taskUIDs, task.TaskUID)
}
// Convert paths to document IDs and batch delete
documentIDs := make([]string, 0, len(batch))
for _, p := range batch {
documentIDs = append(documentIDs, hashPath(p))
}
// Use batch delete API
task, err := m.Client.Index(m.IndexUid).DeleteDocumentsWithContext(ctx, documentIDs)
if err != nil {
return nil, err
}
taskUIDs = append(taskUIDs, task.TaskUID)
}
return taskUIDs, nil
}

View File

@@ -0,0 +1,265 @@
package meilisearch
import (
"context"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
mapset "github.com/deckarep/golang-set/v2"
log "github.com/sirupsen/logrus"
)
// QueuedTask represents a task in the queue
type QueuedTask struct {
Parent string
Objs []model.Obj // current file system state
Depth int // path depth for sorting
EnqueueAt time.Time // enqueue time
}
// TaskQueueManager manages the task queue for async index operations
type TaskQueueManager struct {
queue map[string]*QueuedTask // parent -> task
pendingTasks map[string][]int64 // parent -> all submitted taskUIDs
mu sync.RWMutex
ticker *time.Ticker
stopCh chan struct{}
m *Meilisearch
consuming atomic.Bool // flag to prevent concurrent consumption
}
// NewTaskQueueManager creates a new task queue manager
func NewTaskQueueManager(m *Meilisearch) *TaskQueueManager {
return &TaskQueueManager{
queue: make(map[string]*QueuedTask),
pendingTasks: make(map[string][]int64),
stopCh: make(chan struct{}),
m: m,
}
}
// calculateDepth calculates the depth of a path
func calculateDepth(path string) int {
if path == "/" {
return 0
}
return strings.Count(strings.Trim(path, "/"), "/") + 1
}
// Enqueue enqueues a task with current file system state
func (tqm *TaskQueueManager) Enqueue(parent string, objs []model.Obj) {
tqm.mu.Lock()
defer tqm.mu.Unlock()
// deduplicate: overwrite existing task with the same parent
tqm.queue[parent] = &QueuedTask{
Parent: parent,
Objs: objs,
Depth: calculateDepth(parent),
EnqueueAt: time.Now(),
}
log.Debugf("enqueued update task for parent: %s, depth: %d, objs: %d", parent, calculateDepth(parent), len(objs))
}
// Start starts the task queue consumer
func (tqm *TaskQueueManager) Start() {
tqm.ticker = time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-tqm.ticker.C:
tqm.consume()
case <-tqm.stopCh:
log.Info("task queue manager stopped")
return
}
}
}()
log.Info("task queue manager started, will consume every 30 seconds")
}
// Stop stops the task queue consumer
func (tqm *TaskQueueManager) Stop() {
if tqm.ticker != nil {
tqm.ticker.Stop()
}
close(tqm.stopCh)
}
// consume processes all tasks in the queue
func (tqm *TaskQueueManager) consume() {
// Prevent concurrent consumption
if !tqm.consuming.CompareAndSwap(false, true) {
log.Warn("previous consume still running, skip this round")
return
}
defer tqm.consuming.Store(false)
tqm.mu.Lock()
// extract all tasks
tasks := make([]*QueuedTask, 0, len(tqm.queue))
for _, task := range tqm.queue {
tasks = append(tasks, task)
}
// clear queue
tqm.queue = make(map[string]*QueuedTask)
tqm.mu.Unlock()
if len(tasks) == 0 {
return
}
log.Infof("consuming task queue: %d tasks", len(tasks))
// sort tasks: shallow paths first, then by enqueue time
sort.Slice(tasks, func(i, j int) bool {
if tasks[i].Depth != tasks[j].Depth {
return tasks[i].Depth < tasks[j].Depth
}
return tasks[i].EnqueueAt.Before(tasks[j].EnqueueAt)
})
ctx := context.Background()
// execute tasks in order
for _, task := range tasks {
// Check if there are pending tasks for this parent
tqm.mu.RLock()
pendingTaskUIDs, hasPending := tqm.pendingTasks[task.Parent]
tqm.mu.RUnlock()
if hasPending && len(pendingTaskUIDs) > 0 {
// Check all pending task statuses
allCompleted := true
for _, taskUID := range pendingTaskUIDs {
taskStatus, err := tqm.m.getTaskStatus(ctx, taskUID)
if err != nil {
log.Errorf("failed to get task status for parent %s (taskUID: %d): %v", task.Parent, taskUID, err)
// If we can't get status, assume it's done and continue checking
continue
}
// Check if task is still running
if taskStatus == "enqueued" || taskStatus == "processing" {
log.Warnf("skipping task for parent %s: previous task %d still %s", task.Parent, taskUID, taskStatus)
allCompleted = false
break // No need to check remaining tasks
}
}
if !allCompleted {
// Re-enqueue the task if not already in queue (avoid overwriting newer snapshots)
tqm.mu.Lock()
if _, exists := tqm.queue[task.Parent]; !exists {
tqm.queue[task.Parent] = task
log.Debugf("re-enqueued skipped task for parent %s due to pending tasks", task.Parent)
} else {
log.Debugf("skipped task for parent %s not re-enqueued (newer task already in queue)", task.Parent)
}
tqm.mu.Unlock()
continue // Skip this task, some previous tasks are still running
}
// All tasks are in terminal state, remove from pending
log.Debugf("all previous tasks for parent %s are completed, proceeding with new task", task.Parent)
tqm.mu.Lock()
delete(tqm.pendingTasks, task.Parent)
tqm.mu.Unlock()
}
// Execute the task
tqm.executeTask(ctx, task)
}
log.Infof("task queue consumption completed")
}
// executeTask executes a single task
func (tqm *TaskQueueManager) executeTask(ctx context.Context, task *QueuedTask) {
parent := task.Parent
currentObjs := task.Objs
// Query index to get old state
nodes, err := tqm.m.Get(ctx, parent)
if err != nil {
log.Errorf("failed to get indexed nodes for parent %s: %v", parent, err)
return
}
// Calculate diff based on current index state
now := mapset.NewSet[string]()
for i := range currentObjs {
now.Add(currentObjs[i].GetName())
}
old := mapset.NewSet[string]()
for i := range nodes {
old.Add(nodes[i].Name)
}
toDelete := old.Difference(now)
toAdd := now.Difference(old)
// Collect paths to delete
var pathsToDelete []string
for i := range nodes {
if toDelete.Contains(nodes[i].Name) && !op.HasStorage(path.Join(parent, nodes[i].Name)) {
pathsToDelete = append(pathsToDelete, path.Join(parent, nodes[i].Name))
}
}
var allTaskUIDs []int64
// Execute delete first
if len(pathsToDelete) > 0 {
log.Debugf("executing delete for parent %s: %d paths", parent, len(pathsToDelete))
taskUIDs, err := tqm.m.batchDeleteWithTaskUID(ctx, pathsToDelete)
if err != nil {
log.Errorf("failed to batch delete for parent %s: %v", parent, err)
// Continue to add even if delete fails
} else {
allTaskUIDs = append(allTaskUIDs, taskUIDs...)
}
}
// Collect objects to add
var nodesToAdd []model.SearchNode
for i := range currentObjs {
if toAdd.Contains(currentObjs[i].GetName()) {
log.Debugf("will add index: %s", path.Join(parent, currentObjs[i].GetName()))
nodesToAdd = append(nodesToAdd, model.SearchNode{
Parent: parent,
Name: currentObjs[i].GetName(),
IsDir: currentObjs[i].IsDir(),
Size: currentObjs[i].GetSize(),
})
}
}
// Execute add
if len(nodesToAdd) > 0 {
log.Debugf("executing add for parent %s: %d nodes", parent, len(nodesToAdd))
taskUIDs, err := tqm.m.batchIndexWithTaskUID(ctx, nodesToAdd)
if err != nil {
log.Errorf("failed to batch index for parent %s: %v", parent, err)
} else {
allTaskUIDs = append(allTaskUIDs, taskUIDs...)
}
}
// Record all task UIDs for this parent
if len(allTaskUIDs) > 0 {
tqm.mu.Lock()
tqm.pendingTasks[parent] = allTaskUIDs
tqm.mu.Unlock()
log.Debugf("recorded %d taskUIDs for parent %s", len(allTaskUIDs), parent)
}
}

View File

@@ -1,7 +1,6 @@
package meilisearch
import (
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
)
@@ -13,16 +12,16 @@ func hashPath(path string) string {
}
func buildSearchDocumentFromResults(results map[string]any) *searchDocument {
searchNode := model.SearchNode{}
document := &searchDocument{
SearchNode: searchNode,
}
document := &searchDocument{}
// use assertion test to avoid panic
searchNode.Parent, _ = results["parent"].(string)
searchNode.Name, _ = results["name"].(string)
searchNode.IsDir, _ = results["is_dir"].(bool)
searchNode.Size, _ = results["size"].(int64)
document.SearchNode.Parent, _ = results["parent"].(string)
document.SearchNode.Name, _ = results["name"].(string)
document.SearchNode.IsDir, _ = results["is_dir"].(bool)
// JSON numbers are typically float64, not int64
if size, ok := results["size"].(float64); ok {
document.SearchNode.Size = int64(size)
}
document.ID, _ = results["id"].(string)
document.ParentHash, _ = results["parent_hash"].(string)

View File

@@ -28,3 +28,11 @@ func GetInt(key string, defaultVal int) int {
func GetBool(key string) bool {
return GetStr(key) == "true" || GetStr(key) == "1"
}
func GetFloat(key string, defaultVal float64) float64 {
f, err := strconv.ParseFloat(GetStr(key), 64)
if err != nil {
return defaultVal
}
return f
}

View File

@@ -5,11 +5,14 @@ import (
"fmt"
"path"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type SrcPathToRemove string
@@ -27,12 +30,31 @@ func RefreshAndRemove(dstPath string, payloads ...any) {
if dstNeedRefresh {
op.Cache.DeleteDirectory(dstStorage, dstActualPath)
}
dstNeedHandleHook := setting.GetBool(conf.HandleHookAfterWriting)
dstHandleHookLimit := setting.GetFloat(conf.HandleHookRateLimit, .0)
var listLimiter *rate.Limiter
if dstNeedRefresh && dstNeedHandleHook && dstHandleHookLimit > .0 {
listLimiter = rate.NewLimiter(rate.Limit(dstHandleHookLimit), 1)
}
var ctx context.Context
for _, payload := range payloads {
switch p := payload.(type) {
case DstPathToRefresh:
if dstNeedRefresh {
op.Cache.DeleteDirectory(dstStorage, string(p))
if dstNeedHandleHook {
if ctx == nil {
ctx = context.Background()
}
if listLimiter != nil {
_ = listLimiter.Wait(ctx)
}
_, e := op.List(ctx, dstStorage, string(p), model.ListArgs{Refresh: true})
if e != nil {
log.Errorf("failed handle objs update hook: %v", e)
}
} else {
op.Cache.DeleteDirectory(dstStorage, string(p))
}
}
case SrcPathToRemove:
if ctx == nil {

View File

@@ -69,14 +69,11 @@ func (h *httpCaller) setNotifier(ctx context.Context, u url.URL, notifier Notifi
go func() {
defer h.wg.Done()
defer conn.Close()
select {
case <-ctx.Done():
conn.SetWriteDeadline(time.Now().Add(time.Second))
if err := conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
log.Printf("sending websocket close message: %v", err)
}
return
<-ctx.Done()
conn.SetWriteDeadline(time.Now().Add(time.Second))
if err := conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
log.Printf("sending websocket close message: %v", err)
}
}()
h.wg.Add(1)
@@ -120,7 +117,7 @@ func (h *httpCaller) setNotifier(ctx context.Context, u url.URL, notifier Notifi
return
}
func (h httpCaller) Call(method string, params, reply interface{}) (err error) {
func (h *httpCaller) Call(method string, params, reply interface{}) (err error) {
payload, err := EncodeClientRequest(method, params)
if err != nil {
return
@@ -236,7 +233,7 @@ func (w *websocketCaller) Close() (err error) {
return
}
func (w websocketCaller) Call(method string, params, reply interface{}) (err error) {
func (w *websocketCaller) Call(method string, params, reply interface{}) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
select {
@@ -251,11 +248,9 @@ func (w websocketCaller) Call(method string, params, reply interface{}) (err err
return errors.New("sending channel blocking")
}
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
return err
}
<-ctx.Done()
if err := ctx.Err(); err == context.DeadlineExceeded {
return err
}
return
}

View File

@@ -185,3 +185,20 @@ const (
GB
TB
)
// IsSystemFile checks if a filename is a common system file that should be ignored
// Returns true for files like .DS_Store, desktop.ini, Thumbs.db, and Apple Double files (._*)
func IsSystemFile(filename string) bool {
// Common system files
switch filename {
case ".DS_Store", "desktop.ini", "Thumbs.db":
return true
}
// Apple Double files (._*)
if strings.HasPrefix(filename, "._") {
return true
}
return false
}

42
pkg/utils/file_test.go Normal file
View File

@@ -0,0 +1,42 @@
package utils
import (
"testing"
)
func TestIsSystemFile(t *testing.T) {
testCases := []struct {
filename string
expected bool
}{
// System files that should be filtered
{".DS_Store", true},
{"desktop.ini", true},
{"Thumbs.db", true},
{"._test.txt", true},
{"._", true},
{"._somefile", true},
{"._folder_name", true},
// Regular files that should not be filtered
{"test.txt", false},
{"file.pdf", false},
{"document.docx", false},
{".gitignore", false},
{".env", false},
{"_underscore.txt", false},
{"normal_file.txt", false},
{"", false},
{".hidden", false},
{"..special", false},
}
for _, tc := range testCases {
t.Run(tc.filename, func(t *testing.T) {
result := IsSystemFile(tc.filename)
if result != tc.expected {
t.Errorf("IsSystemFile(%q) = %v, want %v", tc.filename, result, tc.expected)
}
})
}
}

View File

@@ -44,11 +44,15 @@ func IsSubPath(path string, subPath string) bool {
}
func Ext(path string) string {
return strings.ToLower(SourceExt(path))
}
func SourceExt(path string) string {
ext := stdpath.Ext(path)
if len(ext) > 0 && ext[0] == '.' {
ext = ext[1:]
}
return strings.ToLower(ext)
return ext
}
func EncodePath(path string, all ...bool) string {

View File

@@ -15,7 +15,9 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/fs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
ftpserver "github.com/fclairamb/ftpserverlib"
"github.com/pkg/errors"
@@ -49,6 +51,11 @@ func OpenUpload(ctx context.Context, path string, trunc bool) (*FileUploadProxy,
if err != nil {
return nil, err
}
// Check if system file should be ignored
_, name := stdpath.Split(path)
if setting.GetBool(conf.IgnoreSystemFiles) && utils.IsSystemFile(name) {
return nil, errs.IgnoredSystemFile
}
tmpFile, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
if err != nil {
return nil, err
@@ -150,6 +157,11 @@ func OpenUploadWithLength(ctx context.Context, path string, trunc bool, length i
if err != nil {
return nil, err
}
// Check if system file should be ignored
_, name := stdpath.Split(path)
if setting.GetBool(conf.IgnoreSystemFiles) && utils.IsSystemFile(name) {
return nil, errs.IgnoredSystemFile
}
if trunc {
_ = fs.Remove(ctx, path)
}

View File

@@ -1,7 +1,6 @@
package handles
import (
"encoding/json"
"fmt"
"io"
stdpath "path"
@@ -229,30 +228,15 @@ func FsArchiveList(c *gin.Context, req *ArchiveListReq, user *model.User) {
})
}
type StringOrArray []string
func (s *StringOrArray) UnmarshalJSON(data []byte) error {
var value string
if err := json.Unmarshal(data, &value); err == nil {
*s = []string{value}
return nil
}
var sliceValue []string
if err := json.Unmarshal(data, &sliceValue); err != nil {
return err
}
*s = sliceValue
return nil
}
type ArchiveDecompressReq struct {
SrcDir string `json:"src_dir" form:"src_dir"`
DstDir string `json:"dst_dir" form:"dst_dir"`
Name StringOrArray `json:"name" form:"name"`
ArchivePass string `json:"archive_pass" form:"archive_pass"`
InnerPath string `json:"inner_path" form:"inner_path"`
CacheFull bool `json:"cache_full" form:"cache_full"`
PutIntoNewDir bool `json:"put_into_new_dir" form:"put_into_new_dir"`
SrcDir string `json:"src_dir" form:"src_dir"`
DstDir string `json:"dst_dir" form:"dst_dir"`
Name []string `json:"name" form:"name"`
ArchivePass string `json:"archive_pass" form:"archive_pass"`
InnerPath string `json:"inner_path" form:"inner_path"`
CacheFull bool `json:"cache_full" form:"cache_full"`
PutIntoNewDir bool `json:"put_into_new_dir" form:"put_into_new_dir"`
Overwrite bool `json:"overwrite" form:"overwrite"`
}
func FsArchiveDecompress(c *gin.Context) {
@@ -295,6 +279,7 @@ func FsArchiveDecompress(c *gin.Context) {
},
CacheFull: req.CacheFull,
PutIntoNewDir: req.PutIntoNewDir,
Overwrite: req.Overwrite,
})
if e != nil {
if errors.Is(e, errs.WrongArchivePassword) {

View File

@@ -57,10 +57,12 @@ func FsMkdir(c *gin.Context) {
}
type MoveCopyReq struct {
SrcDir string `json:"src_dir"`
DstDir string `json:"dst_dir"`
Names []string `json:"names"`
Overwrite bool `json:"overwrite"`
SrcDir string `json:"src_dir"`
DstDir string `json:"dst_dir"`
Names []string `json:"names"`
Overwrite bool `json:"overwrite"`
SkipExisting bool `json:"skip_existing"`
Merge bool `json:"merge"`
}
func FsMove(c *gin.Context) {
@@ -89,20 +91,25 @@ func FsMove(c *gin.Context) {
return
}
var validNames []string
if !req.Overwrite {
for _, name := range req.Names {
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil && !req.SkipExisting {
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
return
} else if res == nil {
validNames = append(validNames, name)
}
}
} else {
validNames = req.Names
}
// Create all tasks immediately without any synchronous validation
// All validation will be done asynchronously in the background
var addedTasks []task.TaskExtensionInfo
for i, name := range req.Names {
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
for i, name := range validNames {
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
if t != nil {
addedTasks = append(addedTasks, t)
}
@@ -151,20 +158,34 @@ func FsCopy(c *gin.Context) {
return
}
var validNames []string
if !req.Overwrite {
for _, name := range req.Names {
if res, _ := fs.Get(c.Request.Context(), stdpath.Join(dstDir, name), &fs.GetArgs{NoLog: true}); res != nil {
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
return
if !req.SkipExisting && !req.Merge {
common.ErrorStrResp(c, fmt.Sprintf("file [%s] exists", name), 403)
return
} else if req.Merge && res.IsDir() {
validNames = append(validNames, name)
}
} else {
validNames = append(validNames, name)
}
}
} else {
validNames = req.Names
}
// Create all tasks immediately without any synchronous validation
// All validation will be done asynchronously in the background
var addedTasks []task.TaskExtensionInfo
for i, name := range req.Names {
t, err := fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
for i, name := range validNames {
var t task.TaskExtensionInfo
if req.Merge {
t, err = fs.Merge(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
} else {
t, err = fs.Copy(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(validNames) > i+1)
}
if t != nil {
addedTasks = append(addedTasks, t)
}

View File

@@ -8,8 +8,10 @@ import (
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/fs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
@@ -28,6 +30,14 @@ func getLastModified(c *gin.Context) time.Time {
return lastModified
}
// shouldIgnoreSystemFile checks if the filename should be ignored based on settings
func shouldIgnoreSystemFile(filename string) bool {
if setting.GetBool(conf.IgnoreSystemFiles) {
return utils.IsSystemFile(filename)
}
return false
}
func FsStream(c *gin.Context) {
defer func() {
if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 {
@@ -56,6 +66,11 @@ func FsStream(c *gin.Context) {
}
}
dir, name := stdpath.Split(path)
// Check if system file should be ignored
if shouldIgnoreSystemFile(name) {
common.ErrorStrResp(c, errs.IgnoredSystemFile.Error(), 403)
return
}
// 如果请求头 Content-Length 和 X-File-Size 都没有,则 size=-1表示未知大小的流式上传
size := c.Request.ContentLength
if size < 0 {
@@ -160,6 +175,11 @@ func FsForm(c *gin.Context) {
}
defer f.Close()
dir, name := stdpath.Split(path)
// Check if system file should be ignored
if shouldIgnoreSystemFile(name) {
common.ErrorStrResp(c, errs.IgnoredSystemFile.Error(), 403)
return
}
h := make(map[*utils.HashType]string)
if md5 := c.GetHeader("X-File-Md5"); md5 != "" {
h[utils.MD5] = md5

47
server/handles/scan.go Normal file
View File

@@ -0,0 +1,47 @@
package handles
import (
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/gin-gonic/gin"
)
type ManualScanReq struct {
Path string `json:"path"`
Limit float64 `json:"limit"`
}
func StartManualScan(c *gin.Context) {
var req ManualScanReq
if err := c.ShouldBind(&req); err != nil {
common.ErrorResp(c, err, 400)
return
}
if err := op.BeginManualScan(req.Path, req.Limit); err != nil {
common.ErrorResp(c, err, 400)
return
}
common.SuccessResp(c)
}
func StopManualScan(c *gin.Context) {
if !op.ManualScanRunning() {
common.ErrorStrResp(c, "manual scan is not running", 400)
return
}
op.StopManualScan()
common.SuccessResp(c)
}
type ManualScanResp struct {
ObjCount uint64 `json:"obj_count"`
IsDone bool `json:"is_done"`
}
func GetManualScanProgress(c *gin.Context) {
ret := ManualScanResp{
ObjCount: op.ScannedCount.Load(),
IsDone: !op.ManualScanRunning(),
}
common.SuccessResp(c, ret)
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"strconv"
"sync"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
@@ -24,9 +23,15 @@ type StorageResp struct {
MountDetails *model.StorageDetails `json:"mount_details,omitempty"`
}
func makeStorageResp(c *gin.Context, storages []model.Storage) []*StorageResp {
type detailWithIndex struct {
idx int
val *model.StorageDetails
}
func makeStorageResp(ctx *gin.Context, storages []model.Storage) []*StorageResp {
ret := make([]*StorageResp, len(storages))
var wg sync.WaitGroup
detailsChan := make(chan detailWithIndex, len(storages))
workerCount := 0
for i, s := range storages {
ret[i] = &StorageResp{
Storage: s,
@@ -43,22 +48,26 @@ func makeStorageResp(c *gin.Context, storages []model.Storage) []*StorageResp {
if !ok {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(c, time.Second*3)
defer cancel()
details, err := op.GetStorageDetails(ctx, d)
if err != nil {
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.StorageNotInit) {
log.Errorf("failed get %s details: %+v", s.MountPath, err)
workerCount++
go func(dri driver.Driver, idx int) {
details, e := op.GetStorageDetails(ctx, dri)
if e != nil {
if !errors.Is(e, errs.NotImplement) && !errors.Is(e, errs.StorageNotInit) {
log.Errorf("failed get %s details: %+v", dri.GetStorage().MountPath, e)
}
return
}
ret[i].MountDetails = details
}()
detailsChan <- detailWithIndex{idx: idx, val: details}
}(d, i)
}
for workerCount > 0 {
select {
case r := <-detailsChan:
ret[r.idx].MountDetails = r.val
workerCount--
case <-time.After(time.Second * 3):
workerCount = 0
}
}
wg.Wait()
return ret
}

View File

@@ -11,7 +11,7 @@ import (
func SearchIndex(c *gin.Context) {
mode := setting.GetStr(conf.SearchIndex)
if mode == "none" {
common.ErrorResp(c, errs.SearchNotAvailable, 500)
common.ErrorResp(c, errs.SearchNotAvailable, 404)
c.Abort()
} else {
c.Next()

View File

@@ -179,6 +179,11 @@ func admin(g *gin.RouterGroup) {
index.POST("/stop", middlewares.SearchIndex, handles.StopIndex)
index.POST("/clear", middlewares.SearchIndex, handles.ClearIndex)
index.GET("/progress", middlewares.SearchIndex, handles.GetProgress)
scan := g.Group("/scan")
scan.POST("/start", handles.StartManualScan)
scan.POST("/stop", handles.StopManualScan)
scan.GET("/progress", handles.GetManualScanProgress)
}
func fsAndShare(g *gin.RouterGroup) {

View File

@@ -19,6 +19,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/fs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
@@ -286,6 +287,10 @@ func (b *s3Backend) PutObject(
Modified: ti,
Ctime: time.Now(),
}
// Check if system file should be ignored
if setting.GetBool(conf.IgnoreSystemFiles) && utils.IsSystemFile(obj.Name) {
return result, errs.IgnoredSystemFile
}
stream := &stream.FileStream{
Obj: &obj,
Reader: input,

View File

@@ -20,6 +20,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/net"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
@@ -358,6 +359,10 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request) (status int,
Modified: h.getModTime(r),
Ctime: h.getCreateTime(r),
}
// Check if system file should be ignored
if setting.GetBool(conf.IgnoreSystemFiles) && utils.IsSystemFile(obj.Name) {
return http.StatusForbidden, errs.IgnoredSystemFile
}
fsStream := &stream.FileStream{
Obj: &obj,
Reader: r.Body,