mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-25 03:15:19 +08:00
feat(stream): support using temporary files as large buffer (#1399)
feat(stream): refactor StreamSectionReader to support using temporary files as large buffer
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
@@ -151,32 +152,58 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
|
||||
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
type StreamSectionReader struct {
|
||||
file model.FileStreamer
|
||||
off int64
|
||||
bufPool *pool.Pool[[]byte]
|
||||
type StreamSectionReaderIF interface {
|
||||
// 线程不安全
|
||||
GetSectionReader(off, length int64) (io.ReadSeeker, error)
|
||||
FreeSectionReader(sr io.ReadSeeker)
|
||||
// 线程不安全
|
||||
DiscardSection(off int64, length int64) error
|
||||
}
|
||||
|
||||
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
|
||||
ss := &StreamSectionReader{file: file}
|
||||
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (StreamSectionReaderIF, error) {
|
||||
if file.GetFile() != nil {
|
||||
return ss, nil
|
||||
return &cachedSectionReader{file.GetFile()}, nil
|
||||
}
|
||||
|
||||
maxBufferSize = min(maxBufferSize, int(file.GetSize()))
|
||||
if maxBufferSize > conf.MaxBufferLimit {
|
||||
_, err := file.CacheFullAndWriter(up, nil)
|
||||
f, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if f.Truncate((file.GetSize()+int64(maxBufferSize-1))/int64(maxBufferSize)*int64(maxBufferSize)) != nil {
|
||||
// fallback to full cache
|
||||
_, _ = f.Close(), os.Remove(f.Name())
|
||||
cache, err := file.CacheFullAndWriter(up, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cachedSectionReader{cache}, nil
|
||||
}
|
||||
|
||||
ss := &fileSectionReader{Reader: file, temp: f}
|
||||
ss.bufPool = &pool.Pool[*offsetWriterWithBase]{
|
||||
New: func() *offsetWriterWithBase {
|
||||
base := ss.fileOff
|
||||
ss.fileOff += int64(maxBufferSize)
|
||||
return &offsetWriterWithBase{io.NewOffsetWriter(ss.temp, base), base}
|
||||
},
|
||||
}
|
||||
file.Add(utils.CloseFunc(func() error {
|
||||
ss.bufPool.Reset()
|
||||
return errors.Join(ss.temp.Close(), os.Remove(ss.temp.Name()))
|
||||
}))
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
ss := &directSectionReader{file: file}
|
||||
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold {
|
||||
ss.bufPool = &pool.Pool[[]byte]{
|
||||
New: func() []byte {
|
||||
buf, err := mmap.Alloc(maxBufferSize)
|
||||
if err == nil {
|
||||
file.Add(utils.CloseFunc(func() error {
|
||||
ss.file.Add(utils.CloseFunc(func() error {
|
||||
return mmap.Free(buf)
|
||||
}))
|
||||
} else {
|
||||
@@ -200,53 +227,113 @@ func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *mode
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
type cachedSectionReader struct {
|
||||
cache io.ReaderAt
|
||||
}
|
||||
|
||||
func (*cachedSectionReader) DiscardSection(off int64, length int64) error {
|
||||
return nil
|
||||
}
|
||||
func (s *cachedSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
|
||||
return io.NewSectionReader(s.cache, off, length), nil
|
||||
}
|
||||
func (*cachedSectionReader) FreeSectionReader(sr io.ReadSeeker) {}
|
||||
|
||||
type fileSectionReader struct {
|
||||
io.Reader
|
||||
off int64
|
||||
temp *os.File
|
||||
fileOff int64
|
||||
bufPool *pool.Pool[*offsetWriterWithBase]
|
||||
}
|
||||
|
||||
type offsetWriterWithBase struct {
|
||||
*io.OffsetWriter
|
||||
base int64
|
||||
}
|
||||
|
||||
// 线程不安全
|
||||
func (ss *StreamSectionReader) DiscardSection(off int64, length int64) error {
|
||||
if ss.file.GetFile() == nil {
|
||||
if off != ss.off {
|
||||
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
|
||||
}
|
||||
func (ss *fileSectionReader) DiscardSection(off int64, length int64) error {
|
||||
if off != ss.off {
|
||||
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
_, err := utils.CopyWithBufferN(io.Discard, ss.Reader, length)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
|
||||
}
|
||||
ss.off += length
|
||||
return nil
|
||||
}
|
||||
|
||||
// 线程不安全
|
||||
func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionReader, error) {
|
||||
var cache io.ReaderAt = ss.file.GetFile()
|
||||
var buf []byte
|
||||
if cache == nil {
|
||||
if off != ss.off {
|
||||
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
tempBuf := ss.bufPool.Get()
|
||||
buf = tempBuf[:length]
|
||||
n, err := io.ReadFull(ss.file, buf)
|
||||
if int64(n) != length {
|
||||
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
|
||||
}
|
||||
ss.off += int64(n)
|
||||
off = 0
|
||||
cache = bytes.NewReader(buf)
|
||||
}
|
||||
return &SectionReader{io.NewSectionReader(cache, off, length), buf}, nil
|
||||
type fileBufferSectionReader struct {
|
||||
io.ReadSeeker
|
||||
fileBuf *offsetWriterWithBase
|
||||
}
|
||||
|
||||
func (ss *StreamSectionReader) FreeSectionReader(sr *SectionReader) {
|
||||
if sr != nil {
|
||||
if sr.buf != nil {
|
||||
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
|
||||
sr.buf = nil
|
||||
}
|
||||
func (ss *fileSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
|
||||
if off != ss.off {
|
||||
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
fileBuf := ss.bufPool.Get()
|
||||
_, _ = fileBuf.Seek(0, io.SeekStart)
|
||||
n, err := utils.CopyWithBufferN(fileBuf, ss.Reader, length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
|
||||
}
|
||||
ss.off += length
|
||||
return &fileBufferSectionReader{io.NewSectionReader(ss.temp, fileBuf.base, length), fileBuf}, nil
|
||||
}
|
||||
|
||||
func (ss *fileSectionReader) FreeSectionReader(rs io.ReadSeeker) {
|
||||
if sr, ok := rs.(*fileBufferSectionReader); ok {
|
||||
ss.bufPool.Put(sr.fileBuf)
|
||||
sr.fileBuf = nil
|
||||
sr.ReadSeeker = nil
|
||||
}
|
||||
}
|
||||
|
||||
type SectionReader struct {
|
||||
type directSectionReader struct {
|
||||
file model.FileStreamer
|
||||
off int64
|
||||
bufPool *pool.Pool[[]byte]
|
||||
}
|
||||
|
||||
// 线程不安全
|
||||
func (ss *directSectionReader) DiscardSection(off int64, length int64) error {
|
||||
if off != ss.off {
|
||||
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
|
||||
}
|
||||
ss.off += length
|
||||
return nil
|
||||
}
|
||||
|
||||
type bufferSectionReader struct {
|
||||
io.ReadSeeker
|
||||
buf []byte
|
||||
}
|
||||
|
||||
// 线程不安全
|
||||
func (ss *directSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
|
||||
if off != ss.off {
|
||||
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
|
||||
}
|
||||
tempBuf := ss.bufPool.Get()
|
||||
buf := tempBuf[:length]
|
||||
n, err := io.ReadFull(ss.file, buf)
|
||||
if int64(n) != length {
|
||||
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
|
||||
}
|
||||
ss.off += int64(n)
|
||||
return &bufferSectionReader{bytes.NewReader(buf), buf}, nil
|
||||
}
|
||||
func (ss *directSectionReader) FreeSectionReader(rs io.ReadSeeker) {
|
||||
if sr, ok := rs.(*bufferSectionReader); ok {
|
||||
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
|
||||
sr.buf = nil
|
||||
sr.ReadSeeker = nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user