mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-25 03:15:19 +08:00
feat(plugin): add chunk handling methods to UploadReadable interface
This commit is contained in:
@@ -2,6 +2,7 @@ package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"maps"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -9,6 +10,7 @@ import (
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
plugin_warp "github.com/OpenListTeam/OpenList/v4/internal/plugin/warp"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
|
||||
@@ -72,6 +74,9 @@ func (host *DriverHost) Instantiate(ctx context.Context, rt wazero.Runtime) erro
|
||||
exportsType.Export("[resource-drop]readable", host.DropReadable)
|
||||
exportsType.Export("[method]readable.streams", host.Stream)
|
||||
exportsType.Export("[method]readable.peek", host.StreamPeek)
|
||||
exportsType.Export("[method]readable.chunks", host.Chunks)
|
||||
exportsType.Export("[method]readable.next-chunk", host.NextChunk)
|
||||
exportsType.Export("[method]readable.chunk-reset", host.ChunkReset)
|
||||
exportsType.Export("[method]readable.get-hasher", host.GetHasher)
|
||||
exportsType.Export("[method]readable.update-progress", host.UpdateProgress)
|
||||
if _, err := exportsType.Instantiate(ctx); err != nil {
|
||||
@@ -168,6 +173,66 @@ func (host *DriverHost) StreamPeek(this plugin_warp.UploadReadable, offset uint6
|
||||
return witgo.Ok[io_v0_2.OutputStream, string](streamHandle)
|
||||
}
|
||||
|
||||
// chunks: func(len: u32) -> result<u32, string>;
|
||||
func (host *DriverHost) Chunks(this plugin_warp.UploadReadable, len uint32) witgo.Result[uint32, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.StreamConsume {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: StreamConsume")
|
||||
}
|
||||
if upload.SectionReader != nil {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: Already exist chunk reader")
|
||||
}
|
||||
ss, err := stream.NewStreamSectionReader(upload, int(len), &upload.UpdateProgress)
|
||||
if err != nil {
|
||||
return witgo.Err[uint32](err.Error())
|
||||
}
|
||||
chunkSize := int64(len)
|
||||
upload.SectionReader = &plugin_warp.StreamSectionReader{StreamSectionReaderIF: ss, CunketSize: chunkSize}
|
||||
return witgo.Ok[uint32, string](uint32((upload.GetSize() + chunkSize - 1) / chunkSize))
|
||||
}
|
||||
|
||||
// next-chunk: func() -> result<input-stream, string>;
|
||||
func (host *DriverHost) NextChunk(this plugin_warp.UploadReadable) witgo.Result[io_v0_2.InputStream, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.SectionReader == nil {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: No chunk reader")
|
||||
}
|
||||
|
||||
chunkSize := min(upload.SectionReader.CunketSize, upload.GetSize()-upload.SectionReader.Offset)
|
||||
sr, err := upload.SectionReader.GetSectionReader(upload.SectionReader.Offset, chunkSize)
|
||||
if err != nil {
|
||||
return witgo.Err[io_v0_2.InputStream](err.Error())
|
||||
}
|
||||
upload.SectionReader.Offset += chunkSize
|
||||
streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: sr, Seeker: sr, Closer: utils.CloseFunc(func() error {
|
||||
upload.SectionReader.FreeSectionReader(sr)
|
||||
return nil
|
||||
})})
|
||||
return witgo.Ok[io_v0_2.InputStream, string](streamHandle)
|
||||
}
|
||||
|
||||
// chunk-reset: func(chunk: input-stream) -> result<_, string>;
|
||||
func (host *DriverHost) ChunkReset(this plugin_warp.UploadReadable, chunk io_v0_2.InputStream) witgo.Result[witgo.Unit, string] {
|
||||
stream, ok := host.StreamManager().Get(chunk)
|
||||
if !ok {
|
||||
return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if stream.Seeker == nil {
|
||||
return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: Not Seeker")
|
||||
}
|
||||
_, err := stream.Seeker.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return witgo.Err[witgo.Unit](err.Error())
|
||||
}
|
||||
return witgo.Ok[witgo.Unit, string](witgo.Unit{})
|
||||
}
|
||||
|
||||
// get-hasher: func(hashs: list<hash-alg>) -> result<hash-info, string>;
|
||||
func (host *DriverHost) GetHasher(this plugin_warp.UploadReadable, hashs []plugin_warp.HashAlg) witgo.Result[[]plugin_warp.HashInfo, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
|
||||
@@ -3,6 +3,7 @@ package plugin_warp
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
hash_extend "github.com/OpenListTeam/OpenList/v4/pkg/utils/hash"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
@@ -21,6 +22,13 @@ type UploadReadableType struct {
|
||||
model.FileStreamer
|
||||
StreamConsume bool
|
||||
UpdateProgress driver.UpdateProgress
|
||||
SectionReader *StreamSectionReader
|
||||
}
|
||||
|
||||
type StreamSectionReader struct {
|
||||
stream.StreamSectionReaderIF
|
||||
Offset int64
|
||||
CunketSize int64
|
||||
}
|
||||
|
||||
type UploadReadableManager = witgo.ResourceManager[*UploadReadableType]
|
||||
|
||||
Reference in New Issue
Block a user