feat(plugin): Adapt to the new stateless driver API

This commit is contained in:
foxxorcat
2025-11-06 02:15:03 +08:00
parent ae93fb0479
commit 6417f71527
2 changed files with 450 additions and 222 deletions

View File

@@ -21,6 +21,8 @@ import (
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
log "github.com/sirupsen/logrus"
pool "github.com/jolestar/go-commons-pool/v2"
manager_io "github.com/OpenListTeam/wazero-wasip2/manager/io"
io_v_0_2 "github.com/OpenListTeam/wazero-wasip2/wasip2/io/v0_2"
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
@@ -35,123 +37,301 @@ import (
var PluginPrefix = "openlist:plugin-driver/exports@0.1.0#"
// DriverPlugin 是*插件*管理器 (每个 .wasm 文件一个)
// 它管理共享的 wazero 资源
type DriverPlugin struct {
runtime wazero.Runtime
plugin *PluginInfo
runtime wazero.Runtime // 共享的 wazero 运行时
compiledModule wazero.CompiledModule // 共享的已编译模块
host *DriverHost // 注册的 wasi host 资源, 这里的self.driver始终为nil
}
// WasmInstance 代表池中的一个可重用对象
// 它包含一个活动的 WASM 实例及其宿主/Guest API
type WasmInstance struct {
instance api.Module
exports *DriverHost
guest *witgo.Host
}
// 内部函数,用于动态调用 Guest 以获取属性
func (d *WasmInstance) GetProperties(ctx context.Context) (plugin_warp.DriverProps, error) {
var propertiesResult plugin_warp.DriverProps
err := d.guest.Call(ctx, PluginPrefix+"get-properties", &propertiesResult)
if err != nil {
return plugin_warp.DriverProps{}, err
}
return propertiesResult, nil
}
// 内部函数,用于动态调用 Guest 以获取表单
func (d *WasmInstance) GetFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) {
var formMeta []plugin_warp.FormField
err := d.guest.Call(ctx, PluginPrefix+"get-form-meta", &formMeta)
if err != nil {
return nil, err
}
return formMeta, nil
}
func (i *WasmInstance) Close() error {
return i.instance.Close(context.Background())
// exports 借用WasmDriver的资源这里不销毁
}
// 用于创建和管理 WasmInstance
type driverPoolFactory struct {
ctx context.Context
driver *WasmDriver // 指向 WasmDriver (状态持有者)
compiledModule wazero.CompiledModule // 共享的模块
runtime wazero.Runtime // 共享的运行时
host *DriverHost
}
func (f *driverPoolFactory) makeObject(ctx context.Context) (*WasmInstance, error) {
// 1. 配置模块
moduleConfig := wazero.NewModuleConfig().
WithFS(os.DirFS("/")).
WithStartFunctions("_initialize").
WithStdout(os.Stdout).
WithStderr(os.Stderr).
WithStdin(os.Stdin).
// WithSysNanosleep().
// WithSysNanotime().
// WithSysWalltime().
WithOsyield(func() {
runtime.Gosched()
}).
WithName(f.driver.plugin.plugin.ID)
instanceCtx := experimental.WithMemoryAllocator(f.ctx, experimental.MemoryAllocatorFunc(alloc.NewMemory))
// 2. 实例化共享的已编译模块
instance, err := f.runtime.InstantiateModule(instanceCtx, f.compiledModule, moduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to instantiate module: %w", err)
}
// 3. 创建 Guest API
guest, err := witgo.NewHost(instance)
if err != nil {
instance.Close(ctx)
return nil, err
}
// 5. 组装 WasmInstance
wasmInstance := &WasmInstance{
instance: instance,
exports: f.host,
guest: guest,
}
return wasmInstance, nil
}
// MakeObject 创建一个新的 WasmInstance 并将其放入池中
func (f *driverPoolFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
wasmInstance, err := f.makeObject(ctx)
if err != nil {
return nil, err
}
// 设置Host端句柄用于配置获取等host端方法
if err := wasmInstance.guest.Call(ctx, PluginPrefix+"set-handle", nil, uint32(f.driver.ID)); err != nil {
// 这里就不返回错误了,避免大量栈数据
log.Errorln(err)
wasmInstance.Close()
return nil, errors.New("Internal error in plugin")
}
// 调用实例的初始化方法
ctxHandle := f.host.ContextManager().Add(ctx)
defer f.host.ContextManager().Remove(ctxHandle)
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
if err := wasmInstance.guest.Call(ctx, PluginPrefix+"init", &result, ctxHandle); err != nil {
// 这里就不返回错误了,避免大量栈数据
log.Errorln(err)
wasmInstance.Close()
return nil, errors.New("Internal error in plugin")
}
if result.Err != nil {
wasmInstance.Close()
return nil, result.Err.ToError()
}
return pool.NewPooledObject(wasmInstance), nil
}
// DestroyObject 销毁池中的 WasmInstance
func (f *driverPoolFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
instance := object.Object.(*WasmInstance)
log.Debugf("Destroying pooled WASM instance for plugin: %s", f.driver.Storage.MountPath)
var err error
// 4. 调用实例的销毁化方法
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
if err = instance.guest.Call(ctx, PluginPrefix+"drop", &result, ctxHandle); err != nil {
// 这里就不返回错误了,避免大量栈数据
log.Errorln(err)
err = errors.New("Internal error in plugin")
} else if result.Err != nil {
err = result.Err.ToError()
}
return stderrors.Join(err, instance.Close())
}
// ValidateObject 验证实例是否仍然有效
func (f *driverPoolFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
instance := object.Object.(*WasmInstance)
return instance.instance != nil && !instance.instance.IsClosed()
}
// ActivateObject 在借用时调用
func (f *driverPoolFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
return nil
}
// PassivateObject 在归还时调用
func (f *driverPoolFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
return nil
}
// WasmDriver 是*驱动*实例 (每个挂载点一个)
// 它管理池和*状态*
type WasmDriver struct {
model.Storage
flag uint32
plugin *DriverPlugin
host *DriverHost
pool *pool.ObjectPool
config plugin_warp.DriverProps
additional plugin_warp.Additional
}
// NewDriverPlugin
// 创建插件管理器
func NewDriverPlugin(ctx context.Context, plugin *PluginInfo) (*DriverPlugin, error) {
wasmBytes, err := os.ReadFile(plugin.WasmPath)
if err != nil {
return nil, fmt.Errorf("failed to read wasm file '%s': %w", plugin.WasmPath, err)
}
// 初始化 Wazero 运行时,并导入 WASI
// 1. 创建共享的 wazero 运行时
rt := wazero.NewRuntime(ctx)
wasi_snapshot_preview1.MustInstantiate(ctx, rt)
driverHost := NewDriverHost()
if err := driverHost.Instantiate(ctx, rt); err != nil {
rt.Close(ctx)
log.Fatalf("编译模块失败: %v", err)
// 2. 注册 wasip1/wasip2 资源
wasi_snapshot_preview1.MustInstantiate(ctx, rt)
host := NewDriverHost()
if err := host.Instantiate(ctx, rt); err != nil {
return nil, err
}
// 3. 编译共享的模块
compiledModule, err := rt.CompileModule(ctx, wasmBytes)
if err != nil {
rt.Close(ctx)
return nil, fmt.Errorf("failed to compile wasm module for plugin '%s': %w", plugin.ID, err)
}
// 为这个驱动实例创建一个独立的模块配置,特别是文件系统
moduleConfig := wazero.NewModuleConfig().
WithFS(os.DirFS("/")). // 示例:可以根据需要配置虚拟文件系统
WithStartFunctions("_initialize").
WithStdout(os.Stdout).
WithStderr(os.Stderr).
WithStdin(os.Stdin).
WithSysNanosleep().
WithSysNanotime().
WithSysWalltime().
WithOsyield(func() {
runtime.Gosched()
}).
WithName(plugin.ID)
ctx = experimental.WithMemoryAllocator(ctx, experimental.MemoryAllocatorFunc(alloc.NewMemory))
// 实例化模块,同时注入 Host API
instance, err := rt.InstantiateModule(ctx, compiledModule, moduleConfig)
if err != nil {
rt.Close(ctx)
return nil, fmt.Errorf("failed to instantiate module: %w", err)
// 4. 创建 DriverPlugin 实例(管理器)
driverPlugin := &DriverPlugin{
plugin: plugin,
runtime: rt,
compiledModule: compiledModule,
host: host,
}
guest, err := witgo.NewHost(instance)
if err != nil {
rt.Close(ctx)
return nil, err
}
driver := &DriverPlugin{
runtime: rt,
instance: instance,
exports: driverHost,
guest: guest,
}
return driver, nil
return driverPlugin, nil
}
func (d *DriverPlugin) Close(ctx context.Context) error {
return d.runtime.Close(ctx)
// Close 关闭共享的 wazero 运行时
func (dp *DriverPlugin) Close(ctx context.Context) error {
log.Infof("Closing plugin runtime for: %s", dp.plugin.ID)
if dp.runtime != nil {
return dp.runtime.Close(ctx)
}
return nil
}
func (d *DriverPlugin) NewWasmDriver() (driver.Driver, error) {
var driverHandle uint32
err := d.guest.Call(context.Background(), PluginPrefix+"[constructor]driver", &driverHandle)
if err != nil {
return nil, err
}
// NewWasmDriver
// 创建*驱动实例* (每个挂载一个)
func (dp *DriverPlugin) NewWasmDriver() (driver.Driver, error) {
ctx := context.Background() // Factory/Pool context
// 1. 创建 WasmDriver 实例 (状态持有者)
driver := &WasmDriver{
plugin: d,
handle: driverHandle,
}
if driver.config, err = driver.GetProperties(); err != nil {
return nil, err
}
if driver.additional.Forms, err = driver.GetFormMeta(); err != nil {
return nil, err
plugin: dp, // 指向共享资源的管理器
host: dp.host,
}
type WasmDirverWarp struct {
*WasmDriver
}
driverWarp := &WasmDirverWarp{driver}
driver.plugin.exports.driver.Set(driver.handle, driver)
runtime.SetFinalizer(driverWarp, func(driver *WasmDirverWarp) {
log.Infof("runtime.SetFinalizer: %s => %d", driver.config.Name, driver.handle)
driver.plugin.guest.Call(context.Background(), PluginPrefix+"[dtor]driver", driver.handle)
driver.plugin.exports.driver.Remove(driver.handle)
dp.host.driver.Remove(uint32(driver.ID))
})
// 3. 创建池工厂
factory := &driverPoolFactory{
ctx: ctx,
driver: driver,
compiledModule: dp.compiledModule,
runtime: dp.runtime,
host: dp.host,
}
// 4. 配置并创建池
poolConfig := pool.NewDefaultPoolConfig()
poolConfig.MaxIdle = 2
poolConfig.MaxTotal = 8
poolConfig.TestOnBorrow = true
poolConfig.BlockWhenExhausted = true
driver.pool = pool.NewObjectPool(ctx, factory, poolConfig)
// 5. 首次获取插件信息
initConfig := func() error {
instance, err := factory.makeObject(ctx)
if err != nil {
return err
}
defer instance.Close()
props, err := instance.GetProperties(ctx)
if err != nil {
return fmt.Errorf("failed to refresh properties: %w", err)
}
driver.config = props
forms, err := instance.GetFormMeta(ctx)
if err != nil {
return fmt.Errorf("failed to refresh forms: %w", err)
}
driver.additional.Forms = forms
return nil
}
if err := initConfig(); err != nil {
driver.Close(ctx) // 构造失败,关闭池
return nil, err
}
return driverWarp, nil
}
// WasmDriver 实现了 Driver 接口,并代理调用到 Wasm 模块
type WasmDriver struct {
model.Storage
flag uint32
plugin *DriverPlugin
handle uint32
config plugin_warp.DriverProps
additional plugin_warp.Additional
// Close (在 WasmDriver 上) 关闭此*实例*的池
func (d *WasmDriver) Close(ctx context.Context) error {
log.Infof("Closing pool for driver: %s", d.MountPath)
if d.pool != nil {
d.pool.Close(ctx)
}
return nil
}
// 处理wasm驱动返回的错误当错误类型为Unauthorized将驱动状态设置为nowork避免无效时继续访问
// handleError 处理 wasm 驱动返回的错误
func (d *WasmDriver) handleError(errcode *plugin_warp.ErrCode) error {
if errcode != nil {
err := errcode.ToError()
@@ -168,80 +348,91 @@ func (d *WasmDriver) handleError(errcode *plugin_warp.ErrCode) error {
return nil
}
func (d *WasmDriver) GetProperties() (plugin_warp.DriverProps, error) {
var propertiesResult plugin_warp.DriverProps
err := d.plugin.guest.Call(context.Background(), PluginPrefix+"[method]driver.get-properties", &propertiesResult, d.handle)
return propertiesResult, err
}
// // 内部函数,用于动态调用 Guest 以获取属性
// func (d *WasmDriver) getProperties(ctx context.Context) (plugin_warp.DriverProps, error) {
// obj, err := d.pool.BorrowObject(ctx)
// if err != nil {
// return plugin_warp.DriverProps{}, fmt.Errorf("failed to borrow wasm instance: %w", err)
// }
// instance := obj.(*WasmInstance)
// defer d.pool.ReturnObject(ctx, obj)
func (d *WasmDriver) GetFormMeta() ([]plugin_warp.FormField, error) {
var formMeta []plugin_warp.FormField
err := d.plugin.guest.Call(context.Background(), PluginPrefix+"[method]driver.get-form-meta", &formMeta, d.handle)
return formMeta, err
}
// return instance.GetProperties(ctx)
// }
// // 内部函数,用于动态调用 Guest 以获取表单
// func (d *WasmDriver) getFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) {
// obj, err := d.pool.BorrowObject(ctx)
// if err != nil {
// return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
// }
// instance := obj.(*WasmInstance)
// defer d.pool.ReturnObject(ctx, obj)
// return instance.GetFormMeta(ctx)
// }
// Config 返回缓存的配置
func (d *WasmDriver) Config() driver.Config {
newconfig, err := d.GetProperties()
if err != nil {
panic(err)
}
d.config = newconfig
// props, err := d.getProperties(context.Background())
// if err != nil {
// log.Errorf("failed to get properties: %s", err)
// return d.config.ToConfig()
// }
// d.config = props
return d.config.ToConfig()
}
func (d *WasmDriver) GetAddition() driver.Additional {
newFormMeta, err := d.GetFormMeta()
if err != nil {
panic(err)
}
d.additional.Forms = newFormMeta
// newFormMeta, err := d.getFormMeta(context.Background())
// if err != nil {
// log.Errorf("failed to get form meta: %s", err)
// return &d.additional
// }
// d.additional.Forms = newFormMeta
return &d.additional
}
// Init 初始化驱动
func (d *WasmDriver) Init(ctx context.Context) error {
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
log.Debugf("Re-initializing pool for plugin %s by clearing idle.", d.MountPath)
d.pool.Clear(ctx)
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
if err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.init", &result, d.handle, ctxHandle); err != nil {
// 这里就不返回错误了,避免大量栈数据
log.Errorln(err)
return errors.New("Internal error in plugin")
}
if result.Err != nil {
return result.Err.ToError()
}
// 注册
d.host.driver.Set(uint32(d.ID), d)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("failed to pre-warm pool after re-init: %w", err)
}
d.pool.ReturnObject(ctx, obj)
return nil
}
// Drop 销毁驱动
// Drop 销毁驱动 (由 Guest 调用)
func (d *WasmDriver) Drop(ctx context.Context) error {
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
if err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.drop", &result, d.handle, ctxHandle); err != nil {
// 这里就不返回错误了,避免大量栈数据
log.Errorln(err)
return errors.New("Internal error in plugin")
}
if result.Err != nil {
return result.Err.ToError()
}
return nil
log.Infof("Guest triggered Drop, closing pool for driver: %s", d.MountPath)
return d.Close(ctx)
}
func (d *WasmDriver) GetRoot(ctx context.Context) (model.Obj, error) {
if !d.config.Capabilitys.ListFile {
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode]
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.get-root", &result, d.handle, ctxHandle)
err = instance.guest.Call(ctx, PluginPrefix+"get-root", &result, ctxHandle)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -264,11 +455,18 @@ func (d *WasmDriver) Get(ctx context.Context, path string) (model.Obj, error) {
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode]
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.get-file", &result, d.handle, ctxHandle, path)
err = instance.guest.Call(ctx, PluginPrefix+"get-file", &result, ctxHandle, path)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -290,19 +488,24 @@ func (d *WasmDriver) List(ctx context.Context, dir model.Obj, args model.ListArg
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
obj := dir.(*plugin_warp.Object)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
robj := dir.(*plugin_warp.Object)
var result witgo.Result[[]plugin_warp.Object, plugin_warp.ErrCode]
param := struct {
Driver uint32
Handle plugin_warp.Context
Obj *plugin_warp.Object
}{d.handle, ctxHandle, obj}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.list-files", &result, param)
}{ctxHandle, robj}
err = instance.guest.Call(ctx, PluginPrefix+"list-files", &result, param)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -326,22 +529,29 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
// 这部分资源全由Host端管理
// TODO: 或许应该把创建的Stream生命周期一同绑定到此处结束防止忘记关闭导致的资源泄漏
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
headersHandle := d.plugin.exports.HTTPManager().Fields.Add(args.Header)
defer d.plugin.exports.HTTPManager().Fields.Remove(headersHandle)
pobj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := pobj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, pobj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
headersHandle := instance.exports.HTTPManager().Fields.Add(args.Header)
defer instance.exports.HTTPManager().Fields.Remove(headersHandle)
obj := file.(*plugin_warp.Object)
var result witgo.Result[plugin_warp.LinkResult, plugin_warp.ErrCode]
param := struct {
Driver uint32
Handle plugin_warp.Context
Obj *plugin_warp.Object
LinkArgs plugin_warp.LinkArgs
}{d.handle, ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.link-file", &result, param)
}{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}}
err = instance.guest.Call(ctx, PluginPrefix+"link-file", &result, param)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -361,7 +571,7 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
if result.Ok.Resource.Direct != nil {
direct := result.Ok.Resource.Direct
header, _ := d.plugin.exports.HTTPManager().Fields.Pop(direct.Header)
header, _ := instance.exports.HTTPManager().Fields.Pop(direct.Header)
link := &model.Link{URL: direct.Url, Header: http.Header(header)}
if direct.Expiratcion.IsSome() {
exp := direct.Expiratcion.Some.ToDuration()
@@ -371,7 +581,6 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
}
if result.Ok.Resource.RangeStream != nil {
streamManager := d.plugin.exports.StreamManager()
fileSize := obj.GetSize()
return &model.Link{
RangeReader: stream.RateLimitRangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
@@ -381,13 +590,20 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
} else {
size = uint64(httpRange.Length)
}
pobj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, err
}
instance := pobj.(*WasmInstance)
r, w := io.Pipe()
cw := &checkWriter{W: w, N: size}
streamHandle := streamManager.Add(&manager_io.Stream{
streamHandle := instance.exports.StreamManager().Add(&manager_io.Stream{
Writer: cw,
CheckWriter: cw,
})
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
ctxHandle := instance.exports.ContextManager().Add(ctx)
type RangeSpec struct {
Offset uint64
@@ -397,15 +613,17 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
param := struct {
Driver uint32
Handle plugin_warp.Context
Obj *plugin_warp.Object
LinkArgs plugin_warp.LinkArgs
RangeSpec RangeSpec
}{d.handle, ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: size, Stream: streamHandle}}
}{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: size, Stream: streamHandle}}
go func() {
if err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.link-range", &result, param); err != nil {
defer d.pool.ReturnObject(ctx, instance)
defer instance.exports.ContextManager().Remove(ctxHandle)
if err := instance.guest.Call(ctx, PluginPrefix+"link-range", &result, param); err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
w.CloseWithError(errs.NotImplement)
return
@@ -423,8 +641,7 @@ func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkAr
}()
return utils.NewReadCloser(r, func() error {
d.plugin.exports.ContextManager().Remove(ctxHandle)
streamManager.Remove(streamHandle)
instance.exports.StreamManager().Remove(streamHandle)
return r.Close()
}), nil
}),
@@ -456,21 +673,20 @@ func (d *WasmDriver) MakeDir(ctx context.Context, parentDir model.Obj, dirName s
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
obj := parentDir.(*plugin_warp.Object)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
robj := parentDir.(*plugin_warp.Object)
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
params := struct {
Driver uint32
Handle plugin_warp.Context
Obj *plugin_warp.Object
DirName string
}{d.handle, ctxHandle, obj, dirName}
if err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.make-dir", &result, params); err != nil {
if err := instance.guest.Call(ctx, PluginPrefix+"make-dir", &result, ctxHandle, robj, dirName); err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
}
@@ -491,20 +707,20 @@ func (d *WasmDriver) Rename(ctx context.Context, srcObj model.Obj, newName strin
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
obj := srcObj.(*plugin_warp.Object)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
robj := srcObj.(*plugin_warp.Object)
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
params := struct {
Driver uint32
Handle plugin_warp.Context
Obj *plugin_warp.Object
NewName string
}{d.handle, ctxHandle, obj, newName}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.rename-file", &result, params)
err = instance.guest.Call(ctx, PluginPrefix+"rename-file", &result, ctxHandle, robj, newName)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -526,21 +742,22 @@ func (d *WasmDriver) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
srcobj := srcObj.(*plugin_warp.Object)
dstobj := dstDir.(*plugin_warp.Object)
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
params := struct {
Driver uint32
Handle plugin_warp.Context
SrcObj *plugin_warp.Object
DstObj *plugin_warp.Object
}{d.handle, ctxHandle, srcobj, dstobj}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.move-file", &result, params)
err = instance.guest.Call(ctx, PluginPrefix+"move-file", &result, ctxHandle, srcobj, dstobj)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -562,19 +779,21 @@ func (d *WasmDriver) Remove(ctx context.Context, srcObj model.Obj) error {
return errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
srcobj := srcObj.(*plugin_warp.Object)
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
params := struct {
Driver uint32
Handle plugin_warp.Context
SrcObj *plugin_warp.Object
}{d.handle, ctxHandle, srcobj}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.remove-file", &result, params)
err = instance.guest.Call(ctx, PluginPrefix+"remove-file", &result, ctxHandle, srcobj)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return errs.NotImplement
@@ -596,21 +815,22 @@ func (d *WasmDriver) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
srcobj := srcObj.(*plugin_warp.Object)
dstobj := dstDir.(*plugin_warp.Object)
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
params := struct {
Driver uint32
Handle plugin_warp.Context
SrcObj *plugin_warp.Object
DstObj *plugin_warp.Object
}{d.handle, ctxHandle, srcobj, dstobj}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.copy-file", &result, params)
err = instance.guest.Call(ctx, PluginPrefix+"copy-file", &result, ctxHandle, srcobj, dstobj)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement
@@ -632,11 +852,18 @@ func (d *WasmDriver) Put(ctx context.Context, dstDir model.Obj, file model.FileS
return nil, errs.NotImplement
}
ctxHandle := d.plugin.exports.ContextManager().Add(ctx)
defer d.plugin.exports.ContextManager().Remove(ctxHandle)
obj, err := d.pool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
}
instance := obj.(*WasmInstance)
defer d.pool.ReturnObject(ctx, obj)
stream := d.plugin.exports.uploads.Add(&plugin_warp.UploadReadableType{FileStreamer: file, UpdateProgress: up})
defer d.plugin.exports.uploads.Remove(stream)
ctxHandle := instance.exports.ContextManager().Add(ctx)
defer instance.exports.ContextManager().Remove(ctxHandle)
stream := instance.exports.uploads.Add(&plugin_warp.UploadReadableType{FileStreamer: file, UpdateProgress: up})
defer instance.exports.uploads.Remove(stream)
dstobj := dstDir.(*plugin_warp.Object)
@@ -647,17 +874,13 @@ func (d *WasmDriver) Put(ctx context.Context, dstDir model.Obj, file model.FileS
exist = witgo.Some(plugin_warp.ConvertObjToObject(file.GetExist()))
}
params := struct {
Driver uint32
Handle plugin_warp.Context
DstObj *plugin_warp.Object
Upload *plugin_warp.UploadRequest
}{d.handle, ctxHandle, dstobj, &plugin_warp.UploadRequest{
uploadReq := &plugin_warp.UploadRequest{
Target: plugin_warp.ConvertObjToObject(file),
Content: stream,
Exist: exist,
}}
err := d.plugin.guest.Call(ctx, PluginPrefix+"[method]driver.upload-file", &result, params)
}
err = instance.guest.Call(ctx, PluginPrefix+"upload-file", &result, ctxHandle, dstobj, uploadReq)
if err != nil {
if errors.Is(err, witgo.ErrNotExportFunc) {
return nil, errs.NotImplement

View File

@@ -30,13 +30,16 @@ func (h *DriverPluginHandler) Register(ctx context.Context, plugin *PluginInfo)
}
err = op.RegisterDriver(func() driver.Driver {
driver, err := plugin.driver.NewWasmDriver()
tempDriver, err := plugin.driver.NewWasmDriver()
if err != nil {
log.Errorf("deferred load driver plugin err: %v", err)
return nil
}
return driver
return tempDriver
})
if err != nil {
// 如果注册失败,关闭运行时
plugin.driver.Close(ctx)
return fmt.Errorf("failed to register driver in op: %w", err)
}
@@ -45,21 +48,23 @@ func (h *DriverPluginHandler) Register(ctx context.Context, plugin *PluginInfo)
}
func (h *DriverPluginHandler) Unregister(ctx context.Context, plugin *PluginInfo) error {
// 遵循用户提供的模式,传递一个工厂函数来注销
if plugin.driver == nil {
log.Errorf("plugin.driver is nil during unregister for plugin '%s', cannot get config", plugin.ID)
return fmt.Errorf("plugin.driver instance not found, cannot properly unregister from op")
}
op.UnRegisterDriver(func() driver.Driver {
if plugin.driver == nil {
// 如果 driver 实例不存在,尝试临时创建一个用于获取元数据
// 注意:这可能因插件的复杂性而失败
tempDriver, err := NewDriverPlugin(ctx, plugin)
if err != nil {
log.Errorf("failed to create temporary driver for unregistering plugin '%s': %v", plugin.ID, err)
return nil
}
d, _ := tempDriver.NewWasmDriver()
return d
tempDriver, err := plugin.driver.NewWasmDriver()
if err != nil {
log.Warnf("Failed to create temp driver for unregister: %v", err)
return nil
}
d, _ := plugin.driver.NewWasmDriver()
return d
return tempDriver
})
if err := plugin.driver.Close(ctx); err != nil {
log.Warnf("Error closing driver plugin runtime for %s: %v", plugin.ID, err)
}
return nil
}