From 7b377b1d541e48274830629f91c5d4eee5a34edb Mon Sep 17 00:00:00 2001 From: j2rong4cn Date: Thu, 30 Oct 2025 14:36:49 +0800 Subject: [PATCH] feat(driver): enhance Link method with size handling and add checkWriter for stream management --- internal/plugin/driver.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/plugin/driver.go b/internal/plugin/driver.go index 0fb25e6a..0adcbba1 100644 --- a/internal/plugin/driver.go +++ b/internal/plugin/driver.go @@ -2,6 +2,7 @@ package plugin import ( "context" + stderrors "errors" "fmt" "io" "net/http" @@ -370,20 +371,26 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr streamManager := d.plugin.exports.StreamManager() return &model.Link{ RangeReader: stream.RateLimitRangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + var size uint64 + if httpRange.Length < 0 { + size = uint64(obj.GetSize() - httpRange.Start) + } else { + size = uint64(httpRange.Length) + } r, w := io.Pipe() - streamHandle := streamManager.Add(manager_io.NewAsyncStreamForWriter(w)) + cw := &checkWriter{W: w, N: size} + streamHandle := streamManager.Add(&manager_io.Stream{ + Writer: cw, + CheckWriter: cw, + }) ctxHandle := d.plugin.exports.ContextManager().Add(ctx) type RangeSpec struct { Offset uint64 - Size witgo.Option[uint64] + Size uint64 Stream io_v_0_2.OutputStream } - var size = witgo.None[uint64]() - if httpRange.Length >= 0 { - size = witgo.Some(uint64(httpRange.Length)) - } var result witgo.Result[witgo.Unit, plugin_warp.ErrCode] param := struct { Driver uint32 @@ -391,7 +398,7 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr Obj *plugin_warp.Object LinkArgs plugin_warp.LinkArgs RangeSpec RangeSpec - }{d.handle, ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: size, Stream: streamHandle}} + }{d.handle, ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: uint64(httpRange.Length), Stream: streamHandle}} go func() { if err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.link-range", &result, param); err != nil { @@ -423,6 +430,23 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr return nil, errs.NotImplement } +type checkWriter struct { + W io.Writer + N uint64 +} + +func (c *checkWriter) Write(p []byte) (n int, err error) { + if c.N <= 0 { + return 0, stderrors.New("write limit exceeded") + } + n, err = c.W.Write(p[:min(uint64(len(p)), c.N)]) + c.N -= uint64(n) + return +} +func (c *checkWriter) CheckWrite() uint64 { + return max(c.N, 1) +} + func (d *WasmDriver) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { if !d.config.Capabilitys.MkdirFile { return nil, errs.NotImplement