Files
OpenList/internal/stream/util.go

253 lines
7.2 KiB
Go
Raw Normal View History

package stream
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/net"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/pool"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/rclone/rclone/lib/mmap"
log "github.com/sirupsen/logrus"
)
type RangeReaderFunc func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error)
func (f RangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
return f(ctx, httpRange)
}
func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF, error) {
if link.MFile != nil {
return GetRangeReaderFromMFile(size, link.MFile), nil
}
if link.Concurrency > 0 || link.PartSize > 0 {
down := net.NewDownloader(func(d *net.Downloader) {
d.Concurrency = link.Concurrency
d.PartSize = link.PartSize
})
var rangeReader RangeReaderFunc = func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
var req *net.HttpRequestParams
if link.RangeReader != nil {
req = &net.HttpRequestParams{
Range: httpRange,
Size: size,
}
} else {
requestHeader, _ := ctx.Value(conf.RequestHeaderKey).(http.Header)
header := net.ProcessHeader(requestHeader, link.Header)
req = &net.HttpRequestParams{
Range: httpRange,
Size: size,
URL: link.URL,
HeaderRef: header,
}
}
return down.Download(ctx, req)
}
if link.RangeReader != nil {
down.HttpClient = net.GetRangeReaderHttpRequestFunc(link.RangeReader)
return rangeReader, nil
}
return RateLimitRangeReaderFunc(rangeReader), nil
}
if link.RangeReader != nil {
return link.RangeReader, nil
}
if len(link.URL) == 0 {
return nil, errors.New("invalid link: must have at least one of MFile, URL, or RangeReader")
}
rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > size {
httpRange.Length = size - httpRange.Start
}
requestHeader, _ := ctx.Value(conf.RequestHeaderKey).(http.Header)
header := net.ProcessHeader(requestHeader, link.Header)
header = http_range.ApplyRangeToHttpHeader(httpRange, header)
response, err := net.RequestHttp(ctx, "GET", header, link.URL)
if err != nil {
if _, ok := errors.Unwrap(err).(net.HttpStatusCodeError); ok {
return nil, err
}
return nil, fmt.Errorf("http request failure, err:%w", err)
}
if httpRange.Start == 0 && (httpRange.Length == -1 || httpRange.Length == size) || response.StatusCode == http.StatusPartialContent ||
checkContentRange(&response.Header, httpRange.Start) {
return response.Body, nil
} else if response.StatusCode == http.StatusOK {
log.Warnf("remote http server not supporting range request, expect low perfromace!")
readCloser, err := net.GetRangedHttpReader(response.Body, httpRange.Start, httpRange.Length)
if err != nil {
return nil, err
}
return readCloser, nil
}
return response.Body, nil
}
return RateLimitRangeReaderFunc(rangeReader), nil
}
// RangeReaderIF.RangeRead返回的io.ReadCloser保留file的签名。
func GetRangeReaderFromMFile(size int64, file model.File) model.RangeReaderIF {
return &model.FileRangeReader{
RangeReaderIF: RangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
length := httpRange.Length
if length < 0 || httpRange.Start+length > size {
length = size - httpRange.Start
}
return &model.FileCloser{File: io.NewSectionReader(file, httpRange.Start, length)}, nil
}),
}
}
// 139 cloud does not properly return 206 http status code, add a hack here
func checkContentRange(header *http.Header, offset int64) bool {
start, _, err := http_range.ParseContentRange(header.Get("Content-Range"))
if err != nil {
log.Warnf("exception trying to parse Content-Range, will ignore,err=%s", err)
}
if start == offset {
return true
}
return false
}
type ReaderWithCtx struct {
io.Reader
Ctx context.Context
}
func (r *ReaderWithCtx) Read(p []byte) (n int, err error) {
if utils.IsCanceled(r.Ctx) {
return 0, r.Ctx.Err()
}
return r.Reader.Read(p)
}
func (r *ReaderWithCtx) Close() error {
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashType *utils.HashType, hashParams ...any) (model.File, string, error) {
h := hashType.NewFunc(hashParams...)
tmpF, err := stream.CacheFullAndWriter(up, h)
if err != nil {
return nil, "", err
}
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
}
type StreamSectionReader struct {
file model.FileStreamer
off int64
bufPool *pool.Pool[[]byte]
}
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
ss := &StreamSectionReader{file: file}
if file.GetFile() != nil {
return ss, nil
}
maxBufferSize = min(maxBufferSize, int(file.GetSize()))
if maxBufferSize > conf.MaxBufferLimit {
_, err := file.CacheFullAndWriter(up, nil)
if err != nil {
return nil, err
}
return ss, nil
}
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold {
ss.bufPool = &pool.Pool[[]byte]{
New: func() []byte {
buf, err := mmap.Alloc(maxBufferSize)
if err == nil {
file.Add(utils.CloseFunc(func() error {
return mmap.Free(buf)
}))
} else {
buf = make([]byte, maxBufferSize)
}
return buf
},
}
} else {
ss.bufPool = &pool.Pool[[]byte]{
New: func() []byte {
return make([]byte, maxBufferSize)
},
}
}
file.Add(utils.CloseFunc(func() error {
ss.bufPool.Reset()
return nil
}))
return ss, nil
}
feat(drivers): add MediaFire driver support (#1322) * feat(drivers): add MediaFire driver support (#9319) - Implement complete MediaFire storage driver - Add authentication via session_token and cookie - Support all core operations: List, Get, Link, Put, Copy, Move, Remove, Rename, MakeDir - Include thumbnail generation for media files - Handle MediaFire's resumable upload API with multi-unit transfers - Add proper error handling and progress reporting Co-authored-by: Da3zKi7 <da3zki7@duck.com> * fix(mediafire): fix code errors in mediafire * fix(mediafire): fix code errors in mediafire * fix(drivers): add session renewal cron for MediaFire driver (#9321) - Implement automatic session token renewal every 6-9 minutes - Add validation for required SessionToken and Cookie fields in Init - Handle session expiration by calling renewToken on validation failure - Prevent storage failures due to MediaFire session timeouts Fixes session closure issues that occur after server restarts or extended periods. Co-authored-by: Da3zKi7 <da3zki7@duck.com> * docs: restore README changes Signed-off-by: ILoveScratch <ilovescratch@foxmail.com> * fix * fix * fix: add stream upload limit * fix * fix: clear action token on drop and refactor header setting * feat(drivers/mediafire): optimize file caching - support direct stream processing - Remove forced caching to *os.File type - Support generic model.File interface for better flexibility - Improve upload efficiency by avoiding unnecessary file conversions - Fix return type to use model.Object instead of model.ObjThumb * feat(drivers/mediafire): improve global rate limiting - Ensure all API methods properly use context for rate limiting - Fix context parameter usage in getDirectDownloadLink, getActionToken, getFileByHash - Maintain consistent rate limiting across all MediaFire API calls * feat(drivers/mediafire): unify return types - remove unnecessary ObjThumb - Change MakeDir, Rename, Copy methods to return model.Object instead of model.ObjThumb - Remove empty Thumbnail fields where not meaningful - Keep ObjThumb only for fileToObj (List operations) which provides actual thumbnail URLs - Improve code consistency and reduce unnecessary wrapper objects * refactor(drivers/mediafire): extract common error handling logic - Add checkAPIResult helper function to reduce code duplication - Replace repetitive MediaFire API error checks with centralized function - Maintain specific error messages for unique cases (token, upload, search) - Improve code maintainability and consistency * enhance(drivers/mediafire): improve quick upload implementation - Add null check for existingFile to prevent potential issues - Improve error handling in quick upload - continue normal upload if search fails - Add detailed comments explaining quick upload logic - Optimize getExistingFileInfo with clearer fallback strategy - Ensure upload reliability even when file search encounters issues * refactor(drivers/mediafire): optimize request method reusability - Extract common HTTP request logic into apiRequest method - Reduce code duplication between getForm and postForm methods - Maintain backward compatibility with existing method signatures - Centralize rate limiting and header management - Support extensible HTTP method handling * docs(drivers/mediafire): add comprehensive English comments - Add function-level comments for all major driver methods - Document Init, List, Link, MakeDir, Move, Rename, Copy, Remove, Put methods - Add comments for key utility functions including session token management - Improve code readability and maintainability for community collaboration - Follow Go documentation conventions with clear, concise descriptions * perf(mediafire): optimize memory allocation and type assertion performance - Pre-allocate slice capacity in getFiles and bitmap conversion to reduce reallocations - Cache file type check in uploadUnits to avoid repeated type assertions - Add uploadSingleUnitOptimized for os.File to eliminate redundant type checks - Optimize string to int conversion with proper error handling - Improve memory efficiency in file upload operations * fix(mediafire): upload without cache * feat(mediafire): add rate limiting to all API methods - Add WaitLimit(ctx) calls to all driver methods: List, Link, MakeDir, Move, Rename, Copy, Remove, Put - Ensure consistent rate limiting across all MediaFire API interactions - Follow project standard pattern used by other drivers * feat(mediafire): improve error handling consistency - Add context parameter to all HTTP API functions for proper context propagation - Update getForm, postForm and apiRequest to accept context parameter - Fix rate limiting to use caller context instead of background context - Ensure consistent error handling patterns across all API calls - Improve cancellation and timeout support * feat(mediafire): refactor resumableUpload to use io.ReadSeeker and improve upload handling * fix(mediafire): release section reader * feat: add disk usage * feat(drivers/mediafire): support concurrent upload (#1387) * feat(drivers): add MediaFire driver with concurrent upload support - Implement complete MediaFire storage driver with session token authentication - Support all core operations: List, Get, Link, Put, Copy, Move, Remove, Rename, MakeDir - Include thumbnail generation for media files - Handle MediaFire's resumable upload with intelligent and multi-unit transfers - Support concurrent chunk uploads using errgroup.NewOrderedGroupWithContext, using splitted file caching for large files - Optimize memory usage with adaptive buffer sizing (10MB-100MB (default)) - Include rate limiting and retry logic for API requests - Add proper error handling and progress reporting - Handle MediaFire's bitmap-based resumable upload protocol Closes PR #1322 * feat(stream): add DiscardSection method to StreamSectionReader for skipping data * feat(mediafire): refactor resumableUpload logic for improved upload handling and error management * fix(mediafire): stop cron job and clear action token in Drop method * . * fix(mediafire): optimize buffer sizing logic in uploadUnits method * fix(docs): remove duplicate MediaFire * fix(mediafire): revert 'optimization', large files should not be fully chached. --------- Signed-off-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com> Co-authored-by: Da3zKi7 <da3zki7@duck.com> Co-authored-by: D@' 3z K!7 <99719341+Da3zKi7@users.noreply.github.com> Co-authored-by: j2rong4cn <j2rong@qq.com> Co-authored-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com> * fix(mediafire): optimize hash calculation in uploadUnits function * feat(drivers/mediafire): support concurrent upload (#1366) * feat(drivers): add MediaFire driver with concurrent upload support - Implement complete MediaFire storage driver with session token authentication - Support all core operations: List, Get, Link, Put, Copy, Move, Remove, Rename, MakeDir - Include thumbnail generation for media files - Handle MediaFire's resumable upload with intelligent and multi-unit transfers - Support concurrent chunk uploads using errgroup.NewOrderedGroupWithContext, using splitted file caching for large files - Optimize memory usage with adaptive buffer sizing (10MB-100MB (default)) - Include rate limiting and retry logic for API requests - Add proper error handling and progress reporting - Handle MediaFire's bitmap-based resumable upload protocol Closes PR #1322 * feat(stream): add DiscardSection method to StreamSectionReader for skipping data * feat(mediafire): refactor resumableUpload logic for improved upload handling and error management * fix(mediafire): stop cron job and clear action token in Drop method * . * fix(mediafire): optimize buffer sizing logic in uploadUnits method * fix(docs): remove duplicate MediaFire * fix(mediafire): revert 'optimization', large files should not be fully chached. --------- Signed-off-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com> Signed-off-by: D@' 3z K!7 <99719341+Da3zKi7@users.noreply.github.com> Co-authored-by: j2rong4cn <j2rong@qq.com> Co-authored-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com> --------- Signed-off-by: ILoveScratch <ilovescratch@foxmail.com> Signed-off-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com> Signed-off-by: D@' 3z K!7 <99719341+Da3zKi7@users.noreply.github.com> Co-authored-by: D@' 3z K!7 <99719341+Da3zKi7@users.noreply.github.com> Co-authored-by: Da3zKi7 <da3zki7@duck.com> Co-authored-by: KirCute <951206789@qq.com> Co-authored-by: Suyunmeng <Susus0175@proton.me> Co-authored-by: j2rong4cn <j2rong@qq.com> Co-authored-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com>
2025-09-30 21:55:41 +08:00
// 线程不安全
func (ss *StreamSectionReader) DiscardSection(off int64, length int64) error {
if ss.file.GetFile() == nil {
if off != ss.off {
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
if err != nil {
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
}
}
ss.off += length
return nil
}
// 线程不安全
func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionReader, error) {
var cache io.ReaderAt = ss.file.GetFile()
var buf []byte
if cache == nil {
if off != ss.off {
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
tempBuf := ss.bufPool.Get()
buf = tempBuf[:length]
n, err := io.ReadFull(ss.file, buf)
if int64(n) != length {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
}
ss.off += int64(n)
off = 0
cache = bytes.NewReader(buf)
}
return &SectionReader{io.NewSectionReader(cache, off, length), buf}, nil
}
func (ss *StreamSectionReader) FreeSectionReader(sr *SectionReader) {
if sr != nil {
if sr.buf != nil {
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
sr.buf = nil
}
sr.ReadSeeker = nil
}
}
type SectionReader struct {
io.ReadSeeker
buf []byte
}