mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-11-25 03:15:19 +08:00
Compare commits
13 Commits
0857478516
...
plugin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6417f71527 | ||
|
|
ae93fb0479 | ||
|
|
ce3f8e36c1 | ||
|
|
33f1fbc9fb | ||
|
|
fbc4d6d3f8 | ||
|
|
834248b9e4 | ||
|
|
9235c7dff1 | ||
|
|
7b377b1d54 | ||
|
|
d312db3db1 | ||
|
|
7e1358e686 | ||
|
|
62e381a764 | ||
|
|
bbc328d589 | ||
|
|
5780db293a |
@@ -17,6 +17,7 @@ func Init() {
|
||||
bootstrap.Log()
|
||||
bootstrap.InitDB()
|
||||
data.InitData()
|
||||
bootstrap.InitPlugins()
|
||||
bootstrap.InitStreamLimit()
|
||||
bootstrap.InitIndex()
|
||||
bootstrap.InitUpgradePatch()
|
||||
|
||||
6
go.mod
6
go.mod
@@ -9,6 +9,7 @@ require (
|
||||
github.com/OpenListTeam/sftpd-openlist v1.0.1
|
||||
github.com/OpenListTeam/tache v0.2.0
|
||||
github.com/OpenListTeam/times v0.1.0
|
||||
github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9
|
||||
github.com/OpenListTeam/wopan-sdk-go v0.1.5
|
||||
github.com/ProtonMail/go-crypto v1.3.0
|
||||
github.com/SheltonZhu/115driver v1.1.1
|
||||
@@ -63,6 +64,7 @@ require (
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5
|
||||
github.com/tchap/go-patricia/v2 v2.3.3
|
||||
github.com/tetratelabs/wazero v1.9.0
|
||||
github.com/u2takey/ffmpeg-go v0.5.0
|
||||
github.com/upyun/go-sdk/v3 v3.0.4
|
||||
github.com/winfsp/cgofuse v1.6.0
|
||||
@@ -192,7 +194,7 @@ require (
|
||||
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
|
||||
github.com/bytedance/sonic v1.13.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/coreos/go-semver v0.3.1 // indirect
|
||||
github.com/coreos/go-semver v0.3.1
|
||||
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
|
||||
@@ -303,3 +305,5 @@ replace github.com/ProtonMail/go-proton-api => github.com/henrybear327/go-proton
|
||||
replace github.com/cronokirby/saferith => github.com/Da3zKi7/saferith v0.33.0-fixed
|
||||
|
||||
// replace github.com/OpenListTeam/115-sdk-go => ../../OpenListTeam/115-sdk-go
|
||||
|
||||
replace google.golang.org/genproto => google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822
|
||||
|
||||
4
go.sum
4
go.sum
@@ -53,6 +53,8 @@ github.com/OpenListTeam/tache v0.2.0 h1:Q4MjuyECn0CZCf1ZF91JaVaZTaps1mOTAm8bFj8s
|
||||
github.com/OpenListTeam/tache v0.2.0/go.mod h1:qmnZ/VpY2DUlmjg3UoDeNFy/LRqrw0biN3hYEEGc/+A=
|
||||
github.com/OpenListTeam/times v0.1.0 h1:qknxw+qj5CYKgXAwydA102UEpPcpU8TYNGRmwRyPYpg=
|
||||
github.com/OpenListTeam/times v0.1.0/go.mod h1:Jx7qen5NCYzKk2w14YuvU48YYMcPa1P9a+EJePC15Pc=
|
||||
github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9 h1:yddTD9Fxh6bLMLmG0hSR7Eh6XkoK0RMlE4N1e6/+Iy8=
|
||||
github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9/go.mod h1:+BpydPG2cUQHYFwH3/lVmvXyMl/zxHW+XM+XTSzqu2Q=
|
||||
github.com/OpenListTeam/wopan-sdk-go v0.1.5 h1:iKKcVzIqBgtGDbn0QbdWrCazSGxXFmYFyrnFBG+U8dI=
|
||||
github.com/OpenListTeam/wopan-sdk-go v0.1.5/go.mod h1:otynv0CgSNUClPpUgZ44qCZGcMRe0dc83Pkk65xAunI=
|
||||
github.com/ProtonMail/bcrypt v0.0.0-20210511135022-227b4adcab57/go.mod h1:HecWFHognK8GfRDGnFQbW/LiV7A3MX3gZVs45vk5h8I=
|
||||
@@ -688,6 +690,8 @@ github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543 h1:6Y51mutOvRGRx6K
|
||||
github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543/go.mod h1:jpwqYA8KUVEvSUJHkCXsnBRJCSKP1BMa81QZ6kvRpow=
|
||||
github.com/tchap/go-patricia/v2 v2.3.3 h1:xfNEsODumaEcCcY3gI0hYPZ/PcpVv5ju6RMAhgwZDDc=
|
||||
github.com/tchap/go-patricia/v2 v2.3.3/go.mod h1:VZRHKAb53DLaG+nA9EaYYiaEx6YztwDlLElMsnSHD4k=
|
||||
github.com/tetratelabs/wazero v1.9.0 h1:IcZ56OuxrtaEz8UYNRHBrUa9bYeX9oVY93KspZZBf/I=
|
||||
github.com/tetratelabs/wazero v1.9.0/go.mod h1:TSbcXCfFP0L2FGkRPxHphadXPjo1T6W+CseNNY7EkjM=
|
||||
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=
|
||||
github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=
|
||||
github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso=
|
||||
|
||||
24
internal/alloc/alloc_other.go
Normal file
24
internal/alloc/alloc_other.go
Normal file
@@ -0,0 +1,24 @@
|
||||
//go:build !unix && !windows
|
||||
|
||||
package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc"
|
||||
|
||||
import "github.com/tetratelabs/wazero/experimental"
|
||||
|
||||
func NewMemory(cap, max uint64) experimental.LinearMemory {
|
||||
return &sliceMemory{make([]byte, 0, cap)}
|
||||
}
|
||||
|
||||
type sliceMemory struct {
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (b *sliceMemory) Free() {}
|
||||
|
||||
func (b *sliceMemory) Reallocate(size uint64) []byte {
|
||||
if cap := uint64(cap(b.buf)); size > cap {
|
||||
b.buf = append(b.buf[:cap], make([]byte, size-cap)...)
|
||||
} else {
|
||||
b.buf = b.buf[:size]
|
||||
}
|
||||
return b.buf
|
||||
}
|
||||
14
internal/alloc/alloc_test.go
Normal file
14
internal/alloc/alloc_test.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package alloc_test // import "github.com/ncruces/go-sqlite3/internal/alloc"
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/alloc"
|
||||
)
|
||||
|
||||
func TestVirtual(t *testing.T) {
|
||||
defer func() { _ = recover() }()
|
||||
alloc.NewMemory(math.MaxInt+2, math.MaxInt+2)
|
||||
t.Error("want panic")
|
||||
}
|
||||
75
internal/alloc/alloc_unix.go
Normal file
75
internal/alloc/alloc_unix.go
Normal file
@@ -0,0 +1,75 @@
|
||||
//go:build unix
|
||||
|
||||
package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc"
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/tetratelabs/wazero/experimental"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func NewMemory(cap, max uint64) experimental.LinearMemory {
|
||||
// Round up to the page size.
|
||||
rnd := uint64(unix.Getpagesize() - 1)
|
||||
res := (max + rnd) &^ rnd
|
||||
|
||||
if res > math.MaxInt {
|
||||
// This ensures int(res) overflows to a negative value,
|
||||
// and unix.Mmap returns EINVAL.
|
||||
res = math.MaxUint64
|
||||
}
|
||||
|
||||
com := res
|
||||
prot := unix.PROT_READ | unix.PROT_WRITE
|
||||
if cap < max { // Commit memory only if cap=max.
|
||||
com = 0
|
||||
prot = unix.PROT_NONE
|
||||
}
|
||||
|
||||
// Reserve res bytes of address space, to ensure we won't need to move it.
|
||||
// A protected, private, anonymous mapping should not commit memory.
|
||||
b, err := unix.Mmap(-1, 0, int(res), prot, unix.MAP_PRIVATE|unix.MAP_ANON)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &mmappedMemory{buf: b[:com]}
|
||||
}
|
||||
|
||||
// The slice covers the entire mmapped memory:
|
||||
// - len(buf) is the already committed memory,
|
||||
// - cap(buf) is the reserved address space.
|
||||
type mmappedMemory struct {
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (m *mmappedMemory) Reallocate(size uint64) []byte {
|
||||
com := uint64(len(m.buf))
|
||||
res := uint64(cap(m.buf))
|
||||
if com < size && size <= res {
|
||||
// Grow geometrically, round up to the page size.
|
||||
rnd := uint64(unix.Getpagesize() - 1)
|
||||
new := com + com>>3
|
||||
new = min(max(size, new), res)
|
||||
new = (new + rnd) &^ rnd
|
||||
|
||||
// Commit additional memory up to new bytes.
|
||||
err := unix.Mprotect(m.buf[com:new], unix.PROT_READ|unix.PROT_WRITE)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.buf = m.buf[:new] // Update committed memory.
|
||||
}
|
||||
// Limit returned capacity because bytes beyond
|
||||
// len(m.buf) have not yet been committed.
|
||||
return m.buf[:size:len(m.buf)]
|
||||
}
|
||||
|
||||
func (m *mmappedMemory) Free() {
|
||||
err := unix.Munmap(m.buf[:cap(m.buf)])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m.buf = nil
|
||||
}
|
||||
76
internal/alloc/alloc_windows.go
Normal file
76
internal/alloc/alloc_windows.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc"
|
||||
|
||||
import (
|
||||
"math"
|
||||
"unsafe"
|
||||
|
||||
"github.com/tetratelabs/wazero/experimental"
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
func NewMemory(cap, max uint64) experimental.LinearMemory {
|
||||
// Round up to the page size.
|
||||
rnd := uint64(windows.Getpagesize() - 1)
|
||||
res := (max + rnd) &^ rnd
|
||||
|
||||
if res > math.MaxInt {
|
||||
// This ensures uintptr(res) overflows to a large value,
|
||||
// and windows.VirtualAlloc returns an error.
|
||||
res = math.MaxUint64
|
||||
}
|
||||
|
||||
com := res
|
||||
kind := windows.MEM_COMMIT
|
||||
if cap < max { // Commit memory only if cap=max.
|
||||
com = 0
|
||||
kind = windows.MEM_RESERVE
|
||||
}
|
||||
|
||||
// Reserve res bytes of address space, to ensure we won't need to move it.
|
||||
r, err := windows.VirtualAlloc(0, uintptr(res), uint32(kind), windows.PAGE_READWRITE)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
buf := unsafe.Slice((*byte)(unsafe.Pointer(r)), int(max))
|
||||
mem := virtualMemory{addr: r, buf: buf[:com:res]}
|
||||
return &mem
|
||||
}
|
||||
|
||||
// The slice covers the entire mmapped memory:
|
||||
// - len(buf) is the already committed memory,
|
||||
// - cap(buf) is the reserved address space.
|
||||
type virtualMemory struct {
|
||||
buf []byte
|
||||
addr uintptr
|
||||
}
|
||||
|
||||
func (m *virtualMemory) Reallocate(size uint64) []byte {
|
||||
com := uint64(len(m.buf))
|
||||
res := uint64(cap(m.buf))
|
||||
if com < size && size <= res {
|
||||
// Grow geometrically, round up to the page size.
|
||||
rnd := uint64(windows.Getpagesize() - 1)
|
||||
new := com + com>>3
|
||||
new = min(max(size, new), res)
|
||||
new = (new + rnd) &^ rnd
|
||||
|
||||
// Commit additional memory up to new bytes.
|
||||
_, err := windows.VirtualAlloc(m.addr, uintptr(new), windows.MEM_COMMIT, windows.PAGE_READWRITE)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.buf = m.buf[:new] // Update committed memory.
|
||||
}
|
||||
// Limit returned capacity because bytes beyond
|
||||
// len(m.buf) have not yet been committed.
|
||||
return m.buf[:size:len(m.buf)]
|
||||
}
|
||||
|
||||
func (m *virtualMemory) Free() {
|
||||
err := windows.VirtualFree(m.addr, 0, windows.MEM_RELEASE)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m.addr = 0
|
||||
}
|
||||
23
internal/bootstrap/plugin.go
Normal file
23
internal/bootstrap/plugin.go
Normal file
@@ -0,0 +1,23 @@
|
||||
// internal/bootstrap/plugin.go
|
||||
package bootstrap
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/cmd/flags"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/plugin"
|
||||
)
|
||||
|
||||
// InitPlugins 初始化插件管理器
|
||||
func InitPlugins() {
|
||||
// 2. 创建并初始化 Manager
|
||||
// "data" 目录应从配置中获取
|
||||
manager, err := plugin.NewManager(context.Background(), flags.DataDir)
|
||||
if err != nil {
|
||||
// 在启动时,如果插件系统失败,应该 panic
|
||||
panic(fmt.Sprintf("Failed to initialize plugin manager: %v", err))
|
||||
}
|
||||
|
||||
plugin.PluginManager = manager
|
||||
}
|
||||
@@ -12,7 +12,7 @@ var db *gorm.DB
|
||||
|
||||
func Init(d *gorm.DB) {
|
||||
db = d
|
||||
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB))
|
||||
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB), new(model.Plugin))
|
||||
if err != nil {
|
||||
log.Fatalf("failed migrate database: %s", err.Error())
|
||||
}
|
||||
|
||||
47
internal/db/plugin.go
Normal file
47
internal/db/plugin.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// CreatePlugin 在数据库中插入一条新的插件记录
|
||||
// 如果记录已存在,则会更新它 (Upsert)
|
||||
func CreatePlugin(ctx context.Context, plugin *model.Plugin) error {
|
||||
return db.WithContext(ctx).Save(plugin).Error
|
||||
}
|
||||
|
||||
// GetPluginByID 从数据库中根据 ID 查询单个插件
|
||||
func GetPluginByID(ctx context.Context, id string) (*model.Plugin, error) {
|
||||
var plugin model.Plugin
|
||||
err := db.WithContext(ctx).First(&plugin, "id = ?", id).Error
|
||||
if err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return nil, nil // 返回 nil, nil 表示未找到
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &plugin, nil
|
||||
}
|
||||
|
||||
// GetAllPlugins 从数据库中获取所有已安装的插件
|
||||
func GetAllPlugins(ctx context.Context) ([]*model.Plugin, error) {
|
||||
var plugins []*model.Plugin
|
||||
err := db.WithContext(ctx).Find(&plugins).Error
|
||||
return plugins, err
|
||||
}
|
||||
|
||||
// DeletePluginByID 从数据库中根据 ID 删除一个插件
|
||||
func DeletePluginByID(ctx context.Context, id string) error {
|
||||
return db.WithContext(ctx).Delete(&model.Plugin{}, "id = ?", id).Error
|
||||
}
|
||||
|
||||
// UpdatePluginStatus 更新指定插件的状态和消息
|
||||
func UpdatePluginStatus(ctx context.Context, pluginID string, status model.PluginStatus, message string) error {
|
||||
return db.WithContext(ctx).Model(&model.Plugin{}).Where("id = ?", pluginID).Updates(map[string]interface{}{
|
||||
"status": status,
|
||||
"message": message,
|
||||
}).Error
|
||||
}
|
||||
@@ -19,6 +19,10 @@ type Info struct {
|
||||
Config Config `json:"config"`
|
||||
}
|
||||
|
||||
type IGetItem interface {
|
||||
GetItems() []Item
|
||||
}
|
||||
|
||||
type IRootPath interface {
|
||||
GetRootPath() string
|
||||
}
|
||||
|
||||
42
internal/model/plugin.go
Normal file
42
internal/model/plugin.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
// PluginStatus 定义了插件的几种可能状态
|
||||
type PluginStatus string
|
||||
|
||||
const (
|
||||
// StatusActive 表示插件已成功加载并正在运行
|
||||
StatusActive PluginStatus = "active"
|
||||
// StatusInactive 表示插件已安装但未加载(例如,等待重启)
|
||||
StatusInactive PluginStatus = "inactive"
|
||||
// StatusError 表示插件在加载或运行时遇到错误
|
||||
StatusError PluginStatus = "error"
|
||||
)
|
||||
|
||||
type Plugin struct {
|
||||
// 插件的唯一标识符,例如 "com.openlist.driver.s3"
|
||||
// 这是主键
|
||||
ID string `gorm:"primaryKey" json:"id"`
|
||||
|
||||
// --- 来自插件元数据 ---
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
Author string `json:"author"`
|
||||
Description string `gorm:"type:text" json:"description"`
|
||||
IconURL string `json:"icon_url"`
|
||||
|
||||
// --- 管理器需要的信息 ---
|
||||
// 插件的下载源地址
|
||||
SourceURL string `json:"source_url"`
|
||||
// Wasm 文件在本地的存储路径
|
||||
WasmPath string `json:"wasm_path"`
|
||||
|
||||
// 新增状态字段
|
||||
Status PluginStatus `gorm:"default:'inactive'" json:"status"`
|
||||
Message string `gorm:"type:text" json:"message"` // 用于存储错误信息
|
||||
|
||||
// --- GORM 自动管理字段 ---
|
||||
CreatedAt time.Time `json:"-"`
|
||||
UpdatedAt time.Time `json:"-"`
|
||||
}
|
||||
@@ -15,12 +15,27 @@ type DriverConstructor func() driver.Driver
|
||||
var driverMap = map[string]DriverConstructor{}
|
||||
var driverInfoMap = map[string]driver.Info{}
|
||||
|
||||
func RegisterDriver(driver DriverConstructor) {
|
||||
func RegisterDriver(driver DriverConstructor) error {
|
||||
// log.Infof("register driver: [%s]", config.Name)
|
||||
tempDriver := driver()
|
||||
if tempDriver == nil {
|
||||
return errors.New("register driver is null")
|
||||
}
|
||||
tempConfig := tempDriver.Config()
|
||||
|
||||
if driverMap[tempConfig.Name] != nil {
|
||||
return errors.New("driver is registered")
|
||||
}
|
||||
registerDriverItems(tempConfig, tempDriver.GetAddition())
|
||||
driverMap[tempConfig.Name] = driver
|
||||
return nil
|
||||
}
|
||||
|
||||
func UnRegisterDriver(driver DriverConstructor) {
|
||||
if tempDriver := driver(); tempDriver != nil {
|
||||
tempConfig := tempDriver.Config()
|
||||
delete(driverMap, tempConfig.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func GetDriver(name string) (DriverConstructor, error) {
|
||||
@@ -45,12 +60,18 @@ func GetDriverInfoMap() map[string]driver.Info {
|
||||
|
||||
func registerDriverItems(config driver.Config, addition driver.Additional) {
|
||||
// log.Debugf("addition of %s: %+v", config.Name, addition)
|
||||
tAddition := reflect.TypeOf(addition)
|
||||
for tAddition.Kind() == reflect.Pointer {
|
||||
tAddition = tAddition.Elem()
|
||||
var additionalItems []driver.Item
|
||||
if v, ok := addition.(driver.IGetItem); ok {
|
||||
additionalItems = v.GetItems()
|
||||
} else {
|
||||
tAddition := reflect.TypeOf(addition)
|
||||
for tAddition.Kind() == reflect.Pointer {
|
||||
tAddition = tAddition.Elem()
|
||||
}
|
||||
additionalItems = getAdditionalItems(tAddition, config.DefaultRoot)
|
||||
}
|
||||
|
||||
mainItems := getMainItems(config)
|
||||
additionalItems := getAdditionalItems(tAddition, config.DefaultRoot)
|
||||
driverInfoMap[config.Name] = driver.Info{
|
||||
Common: mainItems,
|
||||
Additional: additionalItems,
|
||||
|
||||
909
internal/plugin/driver.go
Normal file
909
internal/plugin/driver.go
Normal file
@@ -0,0 +1,909 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
stderrors "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/alloc"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
plugin_warp "github.com/OpenListTeam/OpenList/v4/internal/plugin/warp"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
pool "github.com/jolestar/go-commons-pool/v2"
|
||||
|
||||
manager_io "github.com/OpenListTeam/wazero-wasip2/manager/io"
|
||||
io_v_0_2 "github.com/OpenListTeam/wazero-wasip2/wasip2/io/v0_2"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tetratelabs/wazero"
|
||||
"github.com/tetratelabs/wazero/api"
|
||||
"github.com/tetratelabs/wazero/experimental"
|
||||
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
|
||||
)
|
||||
|
||||
var PluginPrefix = "openlist:plugin-driver/exports@0.1.0#"
|
||||
|
||||
// DriverPlugin 是*插件*管理器 (每个 .wasm 文件一个)
|
||||
// 它管理共享的 wazero 资源
|
||||
type DriverPlugin struct {
|
||||
plugin *PluginInfo
|
||||
runtime wazero.Runtime // 共享的 wazero 运行时
|
||||
compiledModule wazero.CompiledModule // 共享的已编译模块
|
||||
host *DriverHost // 注册的 wasi host 资源, 这里的self.driver始终为nil
|
||||
}
|
||||
|
||||
// WasmInstance 代表池中的一个可重用对象
|
||||
// 它包含一个活动的 WASM 实例及其宿主/Guest API
|
||||
type WasmInstance struct {
|
||||
instance api.Module
|
||||
exports *DriverHost
|
||||
guest *witgo.Host
|
||||
}
|
||||
|
||||
// 内部函数,用于动态调用 Guest 以获取属性
|
||||
func (d *WasmInstance) GetProperties(ctx context.Context) (plugin_warp.DriverProps, error) {
|
||||
var propertiesResult plugin_warp.DriverProps
|
||||
err := d.guest.Call(ctx, PluginPrefix+"get-properties", &propertiesResult)
|
||||
if err != nil {
|
||||
return plugin_warp.DriverProps{}, err
|
||||
}
|
||||
return propertiesResult, nil
|
||||
}
|
||||
|
||||
// 内部函数,用于动态调用 Guest 以获取表单
|
||||
func (d *WasmInstance) GetFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) {
|
||||
var formMeta []plugin_warp.FormField
|
||||
err := d.guest.Call(ctx, PluginPrefix+"get-form-meta", &formMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return formMeta, nil
|
||||
}
|
||||
|
||||
func (i *WasmInstance) Close() error {
|
||||
return i.instance.Close(context.Background())
|
||||
// exports 借用WasmDriver的资源这里不销毁
|
||||
}
|
||||
|
||||
// 用于创建和管理 WasmInstance
|
||||
type driverPoolFactory struct {
|
||||
ctx context.Context
|
||||
driver *WasmDriver // 指向 WasmDriver (状态持有者)
|
||||
compiledModule wazero.CompiledModule // 共享的模块
|
||||
runtime wazero.Runtime // 共享的运行时
|
||||
host *DriverHost
|
||||
}
|
||||
|
||||
func (f *driverPoolFactory) makeObject(ctx context.Context) (*WasmInstance, error) {
|
||||
// 1. 配置模块
|
||||
moduleConfig := wazero.NewModuleConfig().
|
||||
WithFS(os.DirFS("/")).
|
||||
WithStartFunctions("_initialize").
|
||||
WithStdout(os.Stdout).
|
||||
WithStderr(os.Stderr).
|
||||
WithStdin(os.Stdin).
|
||||
// WithSysNanosleep().
|
||||
// WithSysNanotime().
|
||||
// WithSysWalltime().
|
||||
WithOsyield(func() {
|
||||
runtime.Gosched()
|
||||
}).
|
||||
WithName(f.driver.plugin.plugin.ID)
|
||||
|
||||
instanceCtx := experimental.WithMemoryAllocator(f.ctx, experimental.MemoryAllocatorFunc(alloc.NewMemory))
|
||||
|
||||
// 2. 实例化共享的已编译模块
|
||||
instance, err := f.runtime.InstantiateModule(instanceCtx, f.compiledModule, moduleConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to instantiate module: %w", err)
|
||||
}
|
||||
|
||||
// 3. 创建 Guest API
|
||||
guest, err := witgo.NewHost(instance)
|
||||
if err != nil {
|
||||
instance.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 5. 组装 WasmInstance
|
||||
wasmInstance := &WasmInstance{
|
||||
instance: instance,
|
||||
exports: f.host,
|
||||
guest: guest,
|
||||
}
|
||||
return wasmInstance, nil
|
||||
}
|
||||
|
||||
// MakeObject 创建一个新的 WasmInstance 并将其放入池中
|
||||
func (f *driverPoolFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
|
||||
wasmInstance, err := f.makeObject(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 设置Host端句柄,用于配置获取等host端方法
|
||||
if err := wasmInstance.guest.Call(ctx, PluginPrefix+"set-handle", nil, uint32(f.driver.ID)); err != nil {
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
wasmInstance.Close()
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
// 调用实例的初始化方法
|
||||
ctxHandle := f.host.ContextManager().Add(ctx)
|
||||
defer f.host.ContextManager().Remove(ctxHandle)
|
||||
|
||||
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
|
||||
if err := wasmInstance.guest.Call(ctx, PluginPrefix+"init", &result, ctxHandle); err != nil {
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
wasmInstance.Close()
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
if result.Err != nil {
|
||||
wasmInstance.Close()
|
||||
return nil, result.Err.ToError()
|
||||
}
|
||||
|
||||
return pool.NewPooledObject(wasmInstance), nil
|
||||
}
|
||||
|
||||
// DestroyObject 销毁池中的 WasmInstance
|
||||
func (f *driverPoolFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
instance := object.Object.(*WasmInstance)
|
||||
log.Debugf("Destroying pooled WASM instance for plugin: %s", f.driver.Storage.MountPath)
|
||||
|
||||
var err error
|
||||
// 4. 调用实例的销毁化方法
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
|
||||
if err = instance.guest.Call(ctx, PluginPrefix+"drop", &result, ctxHandle); err != nil {
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
err = errors.New("Internal error in plugin")
|
||||
} else if result.Err != nil {
|
||||
err = result.Err.ToError()
|
||||
}
|
||||
|
||||
return stderrors.Join(err, instance.Close())
|
||||
}
|
||||
|
||||
// ValidateObject 验证实例是否仍然有效
|
||||
func (f *driverPoolFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
|
||||
instance := object.Object.(*WasmInstance)
|
||||
return instance.instance != nil && !instance.instance.IsClosed()
|
||||
}
|
||||
|
||||
// ActivateObject 在借用时调用
|
||||
func (f *driverPoolFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PassivateObject 在归还时调用
|
||||
func (f *driverPoolFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WasmDriver 是*驱动*实例 (每个挂载点一个)
|
||||
// 它管理池和*状态*
|
||||
type WasmDriver struct {
|
||||
model.Storage
|
||||
flag uint32
|
||||
|
||||
plugin *DriverPlugin
|
||||
|
||||
host *DriverHost
|
||||
pool *pool.ObjectPool
|
||||
|
||||
config plugin_warp.DriverProps
|
||||
additional plugin_warp.Additional
|
||||
}
|
||||
|
||||
// NewDriverPlugin
|
||||
// 创建插件管理器
|
||||
func NewDriverPlugin(ctx context.Context, plugin *PluginInfo) (*DriverPlugin, error) {
|
||||
wasmBytes, err := os.ReadFile(plugin.WasmPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read wasm file '%s': %w", plugin.WasmPath, err)
|
||||
}
|
||||
|
||||
// 1. 创建共享的 wazero 运行时
|
||||
rt := wazero.NewRuntime(ctx)
|
||||
|
||||
// 2. 注册 wasip1/wasip2 资源
|
||||
wasi_snapshot_preview1.MustInstantiate(ctx, rt)
|
||||
host := NewDriverHost()
|
||||
if err := host.Instantiate(ctx, rt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 3. 编译共享的模块
|
||||
compiledModule, err := rt.CompileModule(ctx, wasmBytes)
|
||||
if err != nil {
|
||||
rt.Close(ctx)
|
||||
return nil, fmt.Errorf("failed to compile wasm module for plugin '%s': %w", plugin.ID, err)
|
||||
}
|
||||
|
||||
// 4. 创建 DriverPlugin 实例(管理器)
|
||||
driverPlugin := &DriverPlugin{
|
||||
plugin: plugin,
|
||||
runtime: rt,
|
||||
compiledModule: compiledModule,
|
||||
host: host,
|
||||
}
|
||||
return driverPlugin, nil
|
||||
}
|
||||
|
||||
// Close 关闭共享的 wazero 运行时
|
||||
func (dp *DriverPlugin) Close(ctx context.Context) error {
|
||||
log.Infof("Closing plugin runtime for: %s", dp.plugin.ID)
|
||||
if dp.runtime != nil {
|
||||
return dp.runtime.Close(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewWasmDriver
|
||||
// 创建*驱动实例* (每个挂载一个)
|
||||
func (dp *DriverPlugin) NewWasmDriver() (driver.Driver, error) {
|
||||
ctx := context.Background() // Factory/Pool context
|
||||
|
||||
// 1. 创建 WasmDriver 实例 (状态持有者)
|
||||
driver := &WasmDriver{
|
||||
plugin: dp, // 指向共享资源的管理器
|
||||
host: dp.host,
|
||||
}
|
||||
|
||||
type WasmDirverWarp struct {
|
||||
*WasmDriver
|
||||
}
|
||||
driverWarp := &WasmDirverWarp{driver}
|
||||
runtime.SetFinalizer(driverWarp, func(driver *WasmDirverWarp) {
|
||||
dp.host.driver.Remove(uint32(driver.ID))
|
||||
})
|
||||
|
||||
// 3. 创建池工厂
|
||||
factory := &driverPoolFactory{
|
||||
ctx: ctx,
|
||||
driver: driver,
|
||||
compiledModule: dp.compiledModule,
|
||||
runtime: dp.runtime,
|
||||
host: dp.host,
|
||||
}
|
||||
|
||||
// 4. 配置并创建池
|
||||
poolConfig := pool.NewDefaultPoolConfig()
|
||||
poolConfig.MaxIdle = 2
|
||||
poolConfig.MaxTotal = 8
|
||||
poolConfig.TestOnBorrow = true
|
||||
poolConfig.BlockWhenExhausted = true
|
||||
driver.pool = pool.NewObjectPool(ctx, factory, poolConfig)
|
||||
|
||||
// 5. 首次获取插件信息
|
||||
initConfig := func() error {
|
||||
instance, err := factory.makeObject(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer instance.Close()
|
||||
|
||||
props, err := instance.GetProperties(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to refresh properties: %w", err)
|
||||
}
|
||||
driver.config = props
|
||||
|
||||
forms, err := instance.GetFormMeta(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to refresh forms: %w", err)
|
||||
}
|
||||
driver.additional.Forms = forms
|
||||
return nil
|
||||
}
|
||||
if err := initConfig(); err != nil {
|
||||
driver.Close(ctx) // 构造失败,关闭池
|
||||
return nil, err
|
||||
}
|
||||
return driverWarp, nil
|
||||
}
|
||||
|
||||
// Close (在 WasmDriver 上) 关闭此*实例*的池
|
||||
func (d *WasmDriver) Close(ctx context.Context) error {
|
||||
log.Infof("Closing pool for driver: %s", d.MountPath)
|
||||
if d.pool != nil {
|
||||
d.pool.Close(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleError 处理 wasm 驱动返回的错误
|
||||
func (d *WasmDriver) handleError(errcode *plugin_warp.ErrCode) error {
|
||||
if errcode != nil {
|
||||
err := errcode.ToError()
|
||||
if errcode.Unauthorized != nil && d.Status == op.WORK {
|
||||
if atomic.CompareAndSwapUint32(&d.flag, 0, 1) {
|
||||
d.Status = err.Error()
|
||||
op.MustSaveDriverStorage(d)
|
||||
atomic.StoreUint32(&d.flag, 0)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// // 内部函数,用于动态调用 Guest 以获取属性
|
||||
// func (d *WasmDriver) getProperties(ctx context.Context) (plugin_warp.DriverProps, error) {
|
||||
// obj, err := d.pool.BorrowObject(ctx)
|
||||
// if err != nil {
|
||||
// return plugin_warp.DriverProps{}, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
// }
|
||||
// instance := obj.(*WasmInstance)
|
||||
// defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
// return instance.GetProperties(ctx)
|
||||
// }
|
||||
|
||||
// // 内部函数,用于动态调用 Guest 以获取表单
|
||||
// func (d *WasmDriver) getFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) {
|
||||
// obj, err := d.pool.BorrowObject(ctx)
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
// }
|
||||
// instance := obj.(*WasmInstance)
|
||||
// defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
// return instance.GetFormMeta(ctx)
|
||||
// }
|
||||
|
||||
// Config 返回缓存的配置
|
||||
func (d *WasmDriver) Config() driver.Config {
|
||||
// props, err := d.getProperties(context.Background())
|
||||
// if err != nil {
|
||||
// log.Errorf("failed to get properties: %s", err)
|
||||
// return d.config.ToConfig()
|
||||
// }
|
||||
|
||||
// d.config = props
|
||||
return d.config.ToConfig()
|
||||
}
|
||||
|
||||
func (d *WasmDriver) GetAddition() driver.Additional {
|
||||
// newFormMeta, err := d.getFormMeta(context.Background())
|
||||
// if err != nil {
|
||||
// log.Errorf("failed to get form meta: %s", err)
|
||||
// return &d.additional
|
||||
// }
|
||||
// d.additional.Forms = newFormMeta
|
||||
return &d.additional
|
||||
}
|
||||
|
||||
// Init 初始化驱动
|
||||
func (d *WasmDriver) Init(ctx context.Context) error {
|
||||
log.Debugf("Re-initializing pool for plugin %s by clearing idle.", d.MountPath)
|
||||
d.pool.Clear(ctx)
|
||||
|
||||
// 注册
|
||||
d.host.driver.Set(uint32(d.ID), d)
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to pre-warm pool after re-init: %w", err)
|
||||
}
|
||||
d.pool.ReturnObject(ctx, obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Drop 销毁驱动 (由 Guest 调用)
|
||||
func (d *WasmDriver) Drop(ctx context.Context) error {
|
||||
log.Infof("Guest triggered Drop, closing pool for driver: %s", d.MountPath)
|
||||
return d.Close(ctx)
|
||||
}
|
||||
|
||||
func (d *WasmDriver) GetRoot(ctx context.Context) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.ListFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode]
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"get-root", &result, ctxHandle)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok, nil
|
||||
}
|
||||
|
||||
// GetFile 获取文件信息
|
||||
func (d *WasmDriver) Get(ctx context.Context, path string) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.GetFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode]
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"get-file", &result, ctxHandle, path)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok, nil
|
||||
}
|
||||
|
||||
// List 列出文件
|
||||
func (d *WasmDriver) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
|
||||
if !d.config.Capabilitys.ListFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
robj := dir.(*plugin_warp.Object)
|
||||
var result witgo.Result[[]plugin_warp.Object, plugin_warp.ErrCode]
|
||||
|
||||
param := struct {
|
||||
Handle plugin_warp.Context
|
||||
Obj *plugin_warp.Object
|
||||
}{ctxHandle, robj}
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"list-files", &result, param)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
return utils.MustSliceConvert(*result.Ok, func(o plugin_warp.Object) model.Obj { return &o }), nil
|
||||
}
|
||||
|
||||
// Link 获取文件直链或读取流
|
||||
func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
|
||||
if !d.config.Capabilitys.LinkFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
// 这部分资源全由Host端管理
|
||||
// TODO: 或许应该把创建的Stream生命周期一同绑定到此处结束,防止忘记关闭导致的资源泄漏
|
||||
|
||||
pobj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := pobj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, pobj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
headersHandle := instance.exports.HTTPManager().Fields.Add(args.Header)
|
||||
defer instance.exports.HTTPManager().Fields.Remove(headersHandle)
|
||||
|
||||
obj := file.(*plugin_warp.Object)
|
||||
|
||||
var result witgo.Result[plugin_warp.LinkResult, plugin_warp.ErrCode]
|
||||
|
||||
param := struct {
|
||||
Handle plugin_warp.Context
|
||||
Obj *plugin_warp.Object
|
||||
LinkArgs plugin_warp.LinkArgs
|
||||
}{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}}
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"link-file", &result, param)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
// 覆盖旧的Obj
|
||||
if result.Ok.File.IsSome() {
|
||||
*obj = *result.Ok.File.Some
|
||||
}
|
||||
|
||||
if result.Ok.Resource.Direct != nil {
|
||||
direct := result.Ok.Resource.Direct
|
||||
header, _ := instance.exports.HTTPManager().Fields.Pop(direct.Header)
|
||||
link := &model.Link{URL: direct.Url, Header: http.Header(header)}
|
||||
if direct.Expiratcion.IsSome() {
|
||||
exp := direct.Expiratcion.Some.ToDuration()
|
||||
link.Expiration = &exp
|
||||
}
|
||||
return link, nil
|
||||
}
|
||||
|
||||
if result.Ok.Resource.RangeStream != nil {
|
||||
fileSize := obj.GetSize()
|
||||
return &model.Link{
|
||||
RangeReader: stream.RateLimitRangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
|
||||
var size uint64
|
||||
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > fileSize {
|
||||
size = uint64(fileSize - httpRange.Start)
|
||||
} else {
|
||||
size = uint64(httpRange.Length)
|
||||
}
|
||||
|
||||
pobj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instance := pobj.(*WasmInstance)
|
||||
|
||||
r, w := io.Pipe()
|
||||
cw := &checkWriter{W: w, N: size}
|
||||
streamHandle := instance.exports.StreamManager().Add(&manager_io.Stream{
|
||||
Writer: cw,
|
||||
CheckWriter: cw,
|
||||
})
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
|
||||
type RangeSpec struct {
|
||||
Offset uint64
|
||||
Size uint64
|
||||
Stream io_v_0_2.OutputStream
|
||||
}
|
||||
|
||||
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
|
||||
param := struct {
|
||||
Handle plugin_warp.Context
|
||||
Obj *plugin_warp.Object
|
||||
LinkArgs plugin_warp.LinkArgs
|
||||
RangeSpec RangeSpec
|
||||
}{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: size, Stream: streamHandle}}
|
||||
|
||||
go func() {
|
||||
defer d.pool.ReturnObject(ctx, instance)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
if err := instance.guest.Call(ctx, PluginPrefix+"link-range", &result, param); err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
w.CloseWithError(errs.NotImplement)
|
||||
return
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
w.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
w.CloseWithError(d.handleError(result.Err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return utils.NewReadCloser(r, func() error {
|
||||
instance.exports.StreamManager().Remove(streamHandle)
|
||||
return r.Close()
|
||||
}), nil
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
type checkWriter struct {
|
||||
W io.Writer
|
||||
N uint64
|
||||
}
|
||||
|
||||
func (c *checkWriter) Write(p []byte) (n int, err error) {
|
||||
if c.N <= 0 {
|
||||
return 0, stderrors.New("write limit exceeded")
|
||||
}
|
||||
n, err = c.W.Write(p[:min(uint64(len(p)), c.N)])
|
||||
c.N -= uint64(n)
|
||||
return
|
||||
}
|
||||
func (c *checkWriter) CheckWrite() uint64 {
|
||||
return max(c.N, 1)
|
||||
}
|
||||
|
||||
func (d *WasmDriver) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.MkdirFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
robj := parentDir.(*plugin_warp.Object)
|
||||
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
|
||||
|
||||
if err := instance.guest.Call(ctx, PluginPrefix+"make-dir", &result, ctxHandle, robj, dirName); err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok.Some, nil
|
||||
}
|
||||
|
||||
func (d *WasmDriver) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.RenameFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
robj := srcObj.(*plugin_warp.Object)
|
||||
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
|
||||
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"rename-file", &result, ctxHandle, robj, newName)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok.Some, nil
|
||||
}
|
||||
|
||||
func (d *WasmDriver) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.MoveFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
srcobj := srcObj.(*plugin_warp.Object)
|
||||
dstobj := dstDir.(*plugin_warp.Object)
|
||||
|
||||
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
|
||||
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"move-file", &result, ctxHandle, srcobj, dstobj)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok.Some, nil
|
||||
}
|
||||
|
||||
func (d *WasmDriver) Remove(ctx context.Context, srcObj model.Obj) error {
|
||||
if !d.config.Capabilitys.RemoveFile {
|
||||
return errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
srcobj := srcObj.(*plugin_warp.Object)
|
||||
|
||||
var result witgo.Result[witgo.Unit, plugin_warp.ErrCode]
|
||||
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"remove-file", &result, ctxHandle, srcobj)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *WasmDriver) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.CopyFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
srcobj := srcObj.(*plugin_warp.Object)
|
||||
dstobj := dstDir.(*plugin_warp.Object)
|
||||
|
||||
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
|
||||
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"copy-file", &result, ctxHandle, srcobj, dstobj)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok.Some, nil
|
||||
}
|
||||
|
||||
func (d *WasmDriver) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
|
||||
if !d.config.Capabilitys.UploadFile {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
|
||||
obj, err := d.pool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to borrow wasm instance: %w", err)
|
||||
}
|
||||
instance := obj.(*WasmInstance)
|
||||
defer d.pool.ReturnObject(ctx, obj)
|
||||
|
||||
ctxHandle := instance.exports.ContextManager().Add(ctx)
|
||||
defer instance.exports.ContextManager().Remove(ctxHandle)
|
||||
|
||||
stream := instance.exports.uploads.Add(&plugin_warp.UploadReadableType{FileStreamer: file, UpdateProgress: up})
|
||||
defer instance.exports.uploads.Remove(stream)
|
||||
|
||||
dstobj := dstDir.(*plugin_warp.Object)
|
||||
|
||||
var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode]
|
||||
|
||||
exist := witgo.None[plugin_warp.Object]()
|
||||
if file.GetExist() != nil {
|
||||
exist = witgo.Some(plugin_warp.ConvertObjToObject(file.GetExist()))
|
||||
}
|
||||
|
||||
uploadReq := &plugin_warp.UploadRequest{
|
||||
Target: plugin_warp.ConvertObjToObject(file),
|
||||
Content: stream,
|
||||
Exist: exist,
|
||||
}
|
||||
|
||||
err = instance.guest.Call(ctx, PluginPrefix+"upload-file", &result, ctxHandle, dstobj, uploadReq)
|
||||
if err != nil {
|
||||
if errors.Is(err, witgo.ErrNotExportFunc) {
|
||||
return nil, errs.NotImplement
|
||||
}
|
||||
// 这里就不返回错误了,避免大量栈数据
|
||||
log.Errorln(err)
|
||||
return nil, errors.New("Internal error in plugin")
|
||||
}
|
||||
|
||||
if result.Err != nil {
|
||||
return nil, d.handleError(result.Err)
|
||||
}
|
||||
|
||||
return result.Ok.Some, nil
|
||||
}
|
||||
|
||||
var _ driver.Meta = (*WasmDriver)(nil)
|
||||
var _ driver.Reader = (*WasmDriver)(nil)
|
||||
var _ driver.Getter = (*WasmDriver)(nil)
|
||||
var _ driver.GetRooter = (*WasmDriver)(nil)
|
||||
var _ driver.MkdirResult = (*WasmDriver)(nil)
|
||||
var _ driver.RenameResult = (*WasmDriver)(nil)
|
||||
var _ driver.MoveResult = (*WasmDriver)(nil)
|
||||
var _ driver.Remove = (*WasmDriver)(nil)
|
||||
var _ driver.CopyResult = (*WasmDriver)(nil)
|
||||
var _ driver.PutResult = (*WasmDriver)(nil)
|
||||
284
internal/plugin/host.go
Normal file
284
internal/plugin/host.go
Normal file
@@ -0,0 +1,284 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"maps"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tetratelabs/wazero"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
plugin_warp "github.com/OpenListTeam/OpenList/v4/internal/plugin/warp"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
|
||||
manager_io "github.com/OpenListTeam/wazero-wasip2/manager/io"
|
||||
"github.com/OpenListTeam/wazero-wasip2/wasip2"
|
||||
wasi_clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks"
|
||||
wasi_filesystem "github.com/OpenListTeam/wazero-wasip2/wasip2/filesystem"
|
||||
wasi_http "github.com/OpenListTeam/wazero-wasip2/wasip2/http"
|
||||
wasi_io "github.com/OpenListTeam/wazero-wasip2/wasip2/io"
|
||||
io_v0_2 "github.com/OpenListTeam/wazero-wasip2/wasip2/io/v0_2"
|
||||
wasi_random "github.com/OpenListTeam/wazero-wasip2/wasip2/random"
|
||||
wasi_sockets "github.com/OpenListTeam/wazero-wasip2/wasip2/sockets"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
)
|
||||
|
||||
type DriverHost struct {
|
||||
*wasip2.Host
|
||||
contexts *plugin_warp.ContextManaget
|
||||
uploads *plugin_warp.UploadReadableManager
|
||||
|
||||
driver *witgo.ResourceManager[*WasmDriver]
|
||||
}
|
||||
|
||||
func NewDriverHost() *DriverHost {
|
||||
waspi2_host := wasip2.NewHost(
|
||||
wasi_io.Module("0.2.2"),
|
||||
wasi_filesystem.Module("0.2.2"),
|
||||
wasi_random.Module("0.2.2"),
|
||||
wasi_clocks.Module("0.2.2"),
|
||||
wasi_sockets.Module("0.2.0"),
|
||||
wasi_http.Module("0.2.0"),
|
||||
)
|
||||
return &DriverHost{
|
||||
Host: waspi2_host,
|
||||
contexts: plugin_warp.NewContextManager(),
|
||||
uploads: plugin_warp.NewUploadManager(),
|
||||
driver: witgo.NewResourceManager[*WasmDriver](nil),
|
||||
}
|
||||
}
|
||||
|
||||
func (host *DriverHost) Instantiate(ctx context.Context, rt wazero.Runtime) error {
|
||||
if err := host.Host.Instantiate(ctx, rt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
module := rt.NewHostModuleBuilder("openlist:plugin-driver/host@0.1.0")
|
||||
exports := witgo.NewExporter(module)
|
||||
|
||||
exports.Export("log", host.Log)
|
||||
exports.Export("load-config", host.LoadConfig)
|
||||
exports.Export("save-config", host.SaveConfig)
|
||||
if _, err := exports.Instantiate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
moduleType := rt.NewHostModuleBuilder("openlist:plugin-driver/types@0.1.0")
|
||||
exportsType := witgo.NewExporter(moduleType)
|
||||
exportsType.Export("[resource-drop]cancellable", host.DropContext)
|
||||
exportsType.Export("[method]cancellable.subscribe", host.Subscribe)
|
||||
|
||||
exportsType.Export("[resource-drop]readable", host.DropReadable)
|
||||
exportsType.Export("[method]readable.streams", host.Stream)
|
||||
exportsType.Export("[method]readable.peek", host.StreamPeek)
|
||||
exportsType.Export("[method]readable.chunks", host.Chunks)
|
||||
exportsType.Export("[method]readable.next-chunk", host.NextChunk)
|
||||
exportsType.Export("[method]readable.chunk-reset", host.ChunkReset)
|
||||
exportsType.Export("[method]readable.get-hasher", host.GetHasher)
|
||||
exportsType.Export("[method]readable.update-progress", host.UpdateProgress)
|
||||
if _, err := exportsType.Instantiate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (host *DriverHost) ContextManager() *plugin_warp.ContextManaget {
|
||||
return host.contexts
|
||||
}
|
||||
|
||||
func (host *DriverHost) UploadManager() *plugin_warp.UploadReadableManager {
|
||||
return host.uploads
|
||||
}
|
||||
|
||||
func (host *DriverHost) DropReadable(this plugin_warp.UploadReadable) {
|
||||
host.uploads.Remove(this)
|
||||
}
|
||||
|
||||
func (host *DriverHost) DropContext(this plugin_warp.Context) {
|
||||
host.contexts.Remove(this)
|
||||
}
|
||||
|
||||
// log: func(level: log-level, message: string);
|
||||
func (host *DriverHost) Log(level plugin_warp.LogLevel, message string) {
|
||||
if level.Debug != nil {
|
||||
log.Debugln(message)
|
||||
} else if level.Error != nil {
|
||||
log.Errorln(message)
|
||||
} else if level.Info != nil {
|
||||
log.Infoln(message)
|
||||
} else if level.Warn != nil {
|
||||
log.Warnln(message)
|
||||
} else {
|
||||
log.Traceln(message)
|
||||
}
|
||||
}
|
||||
|
||||
// load-config: func(driver: u32) -> result<list<u8>, string>;
|
||||
func (host *DriverHost) LoadConfig(driverHandle uint32) witgo.Result[[]byte, string] {
|
||||
driver, ok := host.driver.Get(driverHandle)
|
||||
if !ok || driver == nil {
|
||||
return witgo.Err[[]byte]("host.driver is null, loading timing too early")
|
||||
}
|
||||
return witgo.Ok[[]byte, string](driver.additional.Bytes())
|
||||
}
|
||||
|
||||
// save-config: func(driver: u32, config: list<u8>) -> result<_, string>;
|
||||
func (host *DriverHost) SaveConfig(driverHandle uint32, config []byte) witgo.Result[witgo.Unit, string] {
|
||||
driver, ok := host.driver.Get(driverHandle)
|
||||
if !ok || driver == nil {
|
||||
return witgo.Err[witgo.Unit]("host.driver is null, loading timing too early")
|
||||
}
|
||||
|
||||
driver.additional.SetBytes(config)
|
||||
op.MustSaveDriverStorage(driver)
|
||||
return witgo.Ok[witgo.Unit, string](witgo.Unit{})
|
||||
}
|
||||
|
||||
// streams: func() -> result<input-stream, string>;
|
||||
func (host *DriverHost) Stream(this plugin_warp.UploadReadable) witgo.Result[io_v0_2.InputStream, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::Stream: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.StreamConsume {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::Stream: StreamConsume")
|
||||
}
|
||||
|
||||
upload.StreamConsume = true
|
||||
streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: upload, Seeker: upload.GetFile()})
|
||||
return witgo.Ok[io_v0_2.InputStream, string](streamHandle)
|
||||
}
|
||||
|
||||
// peek: func(offset: u64, len: u64) -> result<input-stream, string>;
|
||||
func (host *DriverHost) StreamPeek(this plugin_warp.UploadReadable, offset uint64, len uint64) witgo.Result[io_v0_2.InputStream, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::StreamPeek: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.StreamConsume {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::StreamPeek: StreamConsume")
|
||||
}
|
||||
|
||||
peekReader, err := upload.RangeRead(http_range.Range{Start: int64(offset), Length: int64(len)})
|
||||
if err != nil {
|
||||
return witgo.Err[io_v0_2.InputStream](err.Error())
|
||||
}
|
||||
seeker, _ := peekReader.(io.Seeker)
|
||||
streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: peekReader, Seeker: seeker})
|
||||
return witgo.Ok[io_v0_2.InputStream, string](streamHandle)
|
||||
}
|
||||
|
||||
// chunks: func(len: u32) -> result<u32, string>;
|
||||
func (host *DriverHost) Chunks(this plugin_warp.UploadReadable, len uint32) witgo.Result[uint32, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.StreamConsume {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: StreamConsume")
|
||||
}
|
||||
if upload.SectionReader != nil {
|
||||
return witgo.Err[uint32]("UploadReadable::Chunks: Already exist chunk reader")
|
||||
}
|
||||
|
||||
ss, err := stream.NewStreamSectionReader(upload, int(len), &upload.UpdateProgress)
|
||||
if err != nil {
|
||||
return witgo.Err[uint32](err.Error())
|
||||
}
|
||||
chunkSize := int64(len)
|
||||
upload.SectionReader = &plugin_warp.StreamSectionReader{StreamSectionReaderIF: ss, CunketSize: chunkSize}
|
||||
return witgo.Ok[uint32, string](uint32((upload.GetSize() + chunkSize - 1) / chunkSize))
|
||||
}
|
||||
|
||||
// next-chunk: func() -> result<input-stream, string>;
|
||||
func (host *DriverHost) NextChunk(this plugin_warp.UploadReadable) witgo.Result[io_v0_2.InputStream, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if upload.SectionReader == nil {
|
||||
return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: No chunk reader")
|
||||
}
|
||||
|
||||
chunkSize := min(upload.SectionReader.CunketSize, upload.GetSize()-upload.SectionReader.Offset)
|
||||
sr, err := upload.SectionReader.GetSectionReader(upload.SectionReader.Offset, chunkSize)
|
||||
if err != nil {
|
||||
return witgo.Err[io_v0_2.InputStream](err.Error())
|
||||
}
|
||||
upload.SectionReader.Offset += chunkSize
|
||||
streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: sr, Seeker: sr, Closer: utils.CloseFunc(func() error {
|
||||
upload.SectionReader.FreeSectionReader(sr)
|
||||
return nil
|
||||
})})
|
||||
return witgo.Ok[io_v0_2.InputStream, string](streamHandle)
|
||||
}
|
||||
|
||||
// chunk-reset: func(chunk: input-stream) -> result<_, string>;
|
||||
func (host *DriverHost) ChunkReset(this plugin_warp.UploadReadable, chunk io_v0_2.InputStream) witgo.Result[witgo.Unit, string] {
|
||||
stream, ok := host.StreamManager().Get(chunk)
|
||||
if !ok {
|
||||
return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: ErrorCodeBadDescriptor")
|
||||
}
|
||||
if stream.Seeker == nil {
|
||||
return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: Not Seeker")
|
||||
}
|
||||
_, err := stream.Seeker.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return witgo.Err[witgo.Unit](err.Error())
|
||||
}
|
||||
return witgo.Ok[witgo.Unit, string](witgo.Unit{})
|
||||
}
|
||||
|
||||
// get-hasher: func(hashs: list<hash-alg>) -> result<list<hash-info>, string>;
|
||||
func (host *DriverHost) GetHasher(this plugin_warp.UploadReadable, hashs []plugin_warp.HashAlg) witgo.Result[[]plugin_warp.HashInfo, string] {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if !ok {
|
||||
return witgo.Err[[]plugin_warp.HashInfo]("UploadReadable: ErrorCodeBadDescriptor")
|
||||
}
|
||||
|
||||
resultHashs := plugin_warp.HashInfoConvert2(upload.GetHash(), hashs)
|
||||
if resultHashs != nil {
|
||||
return witgo.Ok[[]plugin_warp.HashInfo, string](resultHashs)
|
||||
}
|
||||
|
||||
if upload.StreamConsume {
|
||||
return witgo.Err[[]plugin_warp.HashInfo]("UploadReadable: StreamConsume")
|
||||
}
|
||||
|
||||
// 无法从obj中获取需要的hash,或者获取的hash不完整。
|
||||
// 需要缓存整个文件并进行hash计算
|
||||
hashTypes := plugin_warp.HashAlgConverts(hashs)
|
||||
|
||||
hashers := utils.NewMultiHasher(hashTypes)
|
||||
if _, err := upload.CacheFullAndWriter(&upload.UpdateProgress, hashers); err != nil {
|
||||
return witgo.Err[[]plugin_warp.HashInfo](err.Error())
|
||||
}
|
||||
|
||||
maps.Copy(upload.GetHash().Export(), hashers.GetHashInfo().Export())
|
||||
|
||||
return witgo.Ok[[]plugin_warp.HashInfo, string](plugin_warp.HashInfoConvert(*hashers.GetHashInfo()))
|
||||
}
|
||||
|
||||
// update-progress: func(progress: f64);
|
||||
func (host *DriverHost) UpdateProgress(this plugin_warp.UploadReadable, progress float64) {
|
||||
upload, ok := host.uploads.Get(this)
|
||||
if ok {
|
||||
upload.UpdateProgress(progress)
|
||||
}
|
||||
}
|
||||
|
||||
// resource cancellable { subscribe: func() -> pollable; }
|
||||
func (host *DriverHost) Subscribe(this plugin_warp.Context) io_v0_2.Pollable {
|
||||
poll := host.Host.PollManager()
|
||||
|
||||
ctx, ok := host.contexts.Get(this)
|
||||
if !ok {
|
||||
return poll.Add(manager_io.ReadyPollable)
|
||||
}
|
||||
|
||||
return poll.Add(&plugin_warp.ContextPollable{Context: ctx})
|
||||
}
|
||||
650
internal/plugin/manager.go
Normal file
650
internal/plugin/manager.go
Normal file
@@ -0,0 +1,650 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/db"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
)
|
||||
|
||||
var (
|
||||
PluginManager *Manager
|
||||
)
|
||||
|
||||
// PluginInfo 只包含从数据库加载的插件元数据。
|
||||
type PluginInfo struct {
|
||||
*model.Plugin
|
||||
handler PluginHandler // 缓存与此插件匹配的处理器
|
||||
driver *DriverPlugin // 缓存已创建的驱动插件实例
|
||||
}
|
||||
|
||||
// PluginHandler 定义了处理特定类型插件的接口
|
||||
type PluginHandler interface {
|
||||
// Prefix 返回此处理器能处理的插件ID前缀
|
||||
Prefix() string
|
||||
// Register 注册一个插件
|
||||
Register(ctx context.Context, plugin *PluginInfo) error
|
||||
// Unregister 注销一个插件
|
||||
Unregister(ctx context.Context, plugin *PluginInfo) error
|
||||
}
|
||||
|
||||
// Manager 负责管理插件的生命周期(安装、卸载、加载元数据)。
|
||||
type Manager struct {
|
||||
sync.RWMutex
|
||||
plugins map[string]*PluginInfo // Key: 插件 ID
|
||||
pluginDir string
|
||||
httpClient *http.Client
|
||||
handlers []PluginHandler // 插件处理器列表
|
||||
}
|
||||
|
||||
// NewManager 创建一个新的、轻量级的插件管理器。
|
||||
func NewManager(ctx context.Context, dataDir string) (*Manager, error) {
|
||||
pluginDir := filepath.Join(dataDir, "plugins")
|
||||
if err := os.MkdirAll(pluginDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create plugin directory: %w", err)
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
plugins: make(map[string]*PluginInfo),
|
||||
pluginDir: pluginDir,
|
||||
httpClient: &http.Client{},
|
||||
// 在这里注册所有支持的插件处理器
|
||||
handlers: []PluginHandler{
|
||||
&DriverPluginHandler{}, // 注册驱动插件处理器
|
||||
// 未来可以添加 newThemePluginHandler(), newOtherPluginHandler() 等
|
||||
},
|
||||
}
|
||||
|
||||
if err := m.loadPluginsFromDB(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to load plugins from database: %w", err)
|
||||
}
|
||||
|
||||
// 在 NewManager 中直接调用 RegisterAll,确保启动时所有插件都被注册
|
||||
m.RegisterAll(ctx)
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// loadPluginsFromDB 在启动时仅从数据库加载插件元数据。
|
||||
func (m *Manager) loadPluginsFromDB(ctx context.Context) error {
|
||||
storedPlugins, err := db.GetAllPlugins(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Found %d installed plugins in the database.", len(storedPlugins))
|
||||
for _, p := range storedPlugins {
|
||||
if _, err := os.Stat(p.WasmPath); os.IsNotExist(err) {
|
||||
log.Warnf("Plugin '%s' found in database but its wasm file is missing at %s. Skipping.", p.ID, p.WasmPath)
|
||||
continue
|
||||
}
|
||||
pluginInfo := &PluginInfo{Plugin: p}
|
||||
// 为插件找到匹配的处理器
|
||||
for _, h := range m.handlers {
|
||||
if strings.HasPrefix(p.ID, h.Prefix()) {
|
||||
pluginInfo.handler = h
|
||||
break
|
||||
}
|
||||
}
|
||||
if pluginInfo.handler == nil {
|
||||
log.Warnf("No handler found for plugin type with ID '%s'. Skipping registration.", p.ID)
|
||||
}
|
||||
m.plugins[p.ID] = pluginInfo
|
||||
log.Infof("Loaded plugin metadata: %s (v%s)", p.Name, p.Version)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterAll 遍历所有已加载的插件,并使用对应的处理器进行注册。
|
||||
func (m *Manager) RegisterAll(ctx context.Context) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
log.Infof("Registering all loaded plugins...")
|
||||
for id, pluginInfo := range m.plugins {
|
||||
if pluginInfo.handler != nil {
|
||||
if err := pluginInfo.handler.Register(ctx, pluginInfo); err != nil {
|
||||
// 注册失败,更新数据库状态
|
||||
log.Errorf("Failed to register plugin '%s': %v", id, err)
|
||||
pluginInfo.Status = model.StatusError
|
||||
pluginInfo.Message = err.Error()
|
||||
// 更新数据库
|
||||
if err := db.UpdatePluginStatus(ctx, id, model.StatusError, err.Error()); err != nil {
|
||||
log.Errorf("Failed to update status for plugin '%s' in database: %v", id, err)
|
||||
}
|
||||
} else {
|
||||
// 注册成功,更新状态
|
||||
pluginInfo.Status = model.StatusActive
|
||||
pluginInfo.Message = ""
|
||||
if err := db.UpdatePluginStatus(ctx, id, model.StatusActive, ""); err != nil {
|
||||
log.Errorf("Failed to update status for plugin '%s' in database: %v", id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Install 根据源字符串的格式自动选择安装方式。
|
||||
func (m *Manager) Install(ctx context.Context, source string) (*PluginInfo, error) {
|
||||
if strings.HasSuffix(source, ".zip") {
|
||||
log.Infof("Installing plugin from archive URL: %s", source)
|
||||
return m.InstallFromArchiveURL(ctx, source)
|
||||
}
|
||||
if strings.HasPrefix(source, "https://github.com/") {
|
||||
log.Infof("Installing plugin from GitHub repository: %s", source)
|
||||
return m.InstallFromGitHub(ctx, source)
|
||||
}
|
||||
// 默认认为是本地文件系统路径
|
||||
log.Infof("Installing plugin from local path: %s", source)
|
||||
return m.InstallFromLocal(ctx, source, "")
|
||||
}
|
||||
|
||||
// InstallFromLocal 从本地清单和 Wasm 文件安装插件。
|
||||
// manifestPath 是必需的,wasmPath 是可选的(如果为空,则在 manifestPath 相同目录下查找 .wasm 文件)。
|
||||
func (m *Manager) InstallFromLocal(ctx context.Context, manifestPath string, wasmPath string) (*PluginInfo, error) {
|
||||
manifestBytes, err := os.ReadFile(manifestPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read manifest file '%s': %w", manifestPath, err)
|
||||
}
|
||||
|
||||
if wasmPath == "" {
|
||||
wasmPath = strings.TrimSuffix(manifestPath, filepath.Ext(manifestPath)) + ".wasm"
|
||||
}
|
||||
|
||||
wasmBytes, err := os.ReadFile(wasmPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read wasm file at '%s': %w", wasmPath, err)
|
||||
}
|
||||
|
||||
return m.install(ctx, manifestBytes, wasmBytes, "local:"+manifestPath)
|
||||
}
|
||||
|
||||
// InstallFromUpload 从一个上传的文件流 (io.Reader) 安装插件。
|
||||
func (m *Manager) InstallFromUpload(ctx context.Context, fileReader io.Reader, originalFileName string) (*PluginInfo, error) {
|
||||
// 1. 将上传的文件内容保存到一个临时文件中
|
||||
tmpFile, err := os.CreateTemp("", "plugin-upload-*.zip")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temporary file for upload: %w", err)
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
_, err = io.Copy(tmpFile, fileReader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to save uploaded file to temporary location: %w", err)
|
||||
}
|
||||
// 必须关闭文件,以便 zip.OpenReader 能够读取它
|
||||
tmpFile.Close()
|
||||
|
||||
// 2. 从这个临时的 zip 文件中提取 manifest 和 wasm
|
||||
manifestBytes, wasmBytes, err := extractPluginFromZip(tmpFile.Name())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to extract plugin from uploaded archive: %w", err)
|
||||
}
|
||||
|
||||
// 3. 调用核心安装逻辑,使用 "upload:[filename]" 作为来源标识
|
||||
return m.install(ctx, manifestBytes, wasmBytes, "upload:"+originalFileName)
|
||||
}
|
||||
|
||||
// InstallFromArchiveURL 从一个 zip 压缩包的 URL 安装插件。
|
||||
func (m *Manager) InstallFromArchiveURL(ctx context.Context, url string) (*PluginInfo, error) {
|
||||
tmpFile, err := downloadTempFile(m.httpClient, url)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to download archive from %s: %w", url, err)
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
manifestBytes, wasmBytes, err := extractPluginFromZip(tmpFile.Name())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to extract plugin from archive '%s': %w", url, err)
|
||||
}
|
||||
|
||||
return m.install(ctx, manifestBytes, wasmBytes, url)
|
||||
}
|
||||
|
||||
// InstallFromGitHub 从 GitHub 仓库的最新 release 安装插件。
|
||||
func (m *Manager) InstallFromGitHub(ctx context.Context, repoURL string) (*PluginInfo, error) {
|
||||
repoURL = strings.TrimSuffix(repoURL, ".git")
|
||||
parts := strings.Split(strings.TrimPrefix(repoURL, "https://github.com/"), "/")
|
||||
if len(parts) < 2 {
|
||||
return nil, fmt.Errorf("invalid github repo URL format: %s", repoURL)
|
||||
}
|
||||
owner, repo := parts[0], parts[1]
|
||||
|
||||
// 1. 获取最新 release 信息
|
||||
apiURL := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo)
|
||||
log.Infof("Fetching latest release from GitHub API: %s", apiURL)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Accept", "application/vnd.github.v3+json")
|
||||
|
||||
resp, err := m.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to call GitHub API: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("GitHub API returned non-200 status: %s", resp.Status)
|
||||
}
|
||||
|
||||
var release struct {
|
||||
Assets []struct {
|
||||
Name string `json:"name"`
|
||||
DownloadURL string `json:"browser_download_url"`
|
||||
} `json:"assets"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&release); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse GitHub API response: %w", err)
|
||||
}
|
||||
|
||||
// 2. 查找包含插件的 zip 资产
|
||||
var assetURL string
|
||||
for _, asset := range release.Assets {
|
||||
// 寻找第一个 .zip 文件作为目标
|
||||
if strings.HasSuffix(asset.Name, ".zip") {
|
||||
assetURL = asset.DownloadURL
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if assetURL == "" {
|
||||
return nil, fmt.Errorf("no .zip asset found in the latest release of %s/%s", owner, repo)
|
||||
}
|
||||
|
||||
log.Infof("Found release asset to download: %s", assetURL)
|
||||
return m.InstallFromArchiveURL(ctx, assetURL)
|
||||
}
|
||||
|
||||
// install 是安装插件的核心逻辑
|
||||
func (m *Manager) install(ctx context.Context, manifestBytes []byte, wasmBytes []byte, sourceURL string) (*PluginInfo, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
var meta model.Plugin
|
||||
if err := json.Unmarshal(manifestBytes, &meta); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse plugin manifest: %w", err)
|
||||
}
|
||||
if meta.ID == "" || meta.Name == "" || meta.Version == "" {
|
||||
return nil, fmt.Errorf("plugin manifest is missing required fields (id, name, version)")
|
||||
}
|
||||
|
||||
// 1. 查找匹配的处理器并检查插件类型
|
||||
var handler PluginHandler
|
||||
for _, h := range m.handlers {
|
||||
if strings.HasPrefix(meta.ID, h.Prefix()) {
|
||||
handler = h
|
||||
break
|
||||
}
|
||||
}
|
||||
if handler == nil {
|
||||
return nil, fmt.Errorf("unsupported plugin type for ID '%s'", meta.ID)
|
||||
}
|
||||
|
||||
if _, exists := m.plugins[meta.ID]; exists {
|
||||
return nil, fmt.Errorf("plugin with id '%s' already exists", meta.ID)
|
||||
}
|
||||
|
||||
fileName := formatPluginFileName(meta.Author, meta.ID)
|
||||
wasmPath := filepath.Join(m.pluginDir, fileName)
|
||||
if err := os.WriteFile(wasmPath, wasmBytes, 0644); err != nil {
|
||||
return nil, fmt.Errorf("failed to save wasm file: %w", err)
|
||||
}
|
||||
|
||||
pluginModel := &model.Plugin{
|
||||
ID: meta.ID,
|
||||
Name: meta.Name,
|
||||
Version: meta.Version,
|
||||
Author: meta.Author,
|
||||
Description: meta.Description,
|
||||
IconURL: meta.IconURL,
|
||||
SourceURL: sourceURL,
|
||||
WasmPath: wasmPath,
|
||||
}
|
||||
|
||||
// 先存入数据库,初始状态为 'inactive'
|
||||
if err := db.CreatePlugin(ctx, pluginModel); err != nil {
|
||||
os.Remove(wasmPath)
|
||||
return nil, fmt.Errorf("failed to save plugin metadata to database: %w", err)
|
||||
}
|
||||
log.Infof("Plugin '%s' metadata saved to database with status: inactive.", pluginModel.ID)
|
||||
|
||||
pluginInfo := &PluginInfo{Plugin: pluginModel, handler: handler}
|
||||
m.plugins[pluginInfo.ID] = pluginInfo
|
||||
|
||||
// 使用找到的处理器进行注册
|
||||
if err := handler.Register(ctx, pluginInfo); err != nil {
|
||||
// 注册失败,更新数据库状态
|
||||
log.Errorf("Failed to register newly installed plugin '%s': %v", pluginInfo.ID, err)
|
||||
pluginInfo.Status = model.StatusError
|
||||
pluginInfo.Message = err.Error()
|
||||
if dbErr := db.UpdatePluginStatus(ctx, pluginInfo.ID, model.StatusError, err.Error()); dbErr != nil {
|
||||
log.Errorf("Failed to update error status for plugin '%s' in database: %v", pluginInfo.ID, dbErr)
|
||||
}
|
||||
} else {
|
||||
// 注册成功,更新状态
|
||||
pluginInfo.Status = model.StatusActive
|
||||
pluginInfo.Message = ""
|
||||
if dbErr := db.UpdatePluginStatus(ctx, pluginInfo.ID, model.StatusActive, ""); dbErr != nil {
|
||||
log.Errorf("Failed to update active status for plugin '%s' in database: %v", pluginInfo.ID, dbErr)
|
||||
}
|
||||
}
|
||||
|
||||
return pluginInfo, nil
|
||||
}
|
||||
|
||||
// Uninstall 卸载一个插件
|
||||
func (m *Manager) Uninstall(ctx context.Context, pluginID string) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
plugin, ok := m.plugins[pluginID]
|
||||
if !ok {
|
||||
return fmt.Errorf("plugin with ID '%s' not found", pluginID)
|
||||
}
|
||||
|
||||
// 1. 使用对应的处理器进行注销
|
||||
if plugin.handler != nil {
|
||||
if err := plugin.handler.Unregister(ctx, plugin); err != nil {
|
||||
// 即便注销失败,也要继续删除流程
|
||||
log.Warnf("Failed to unregister plugin '%s', but continuing with uninstallation: %v", pluginID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 关闭插件内部资源 (如果 driver 实例存在)
|
||||
if plugin.driver != nil {
|
||||
if err := plugin.driver.Close(ctx); err != nil {
|
||||
log.Warnf("Error closing driver resources for plugin %s: %v", pluginID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 从数据库删除
|
||||
if err := db.DeletePluginByID(ctx, pluginID); err != nil {
|
||||
return fmt.Errorf("failed to delete plugin '%s' from database: %w", pluginID, err)
|
||||
}
|
||||
|
||||
// 4. 删除文件
|
||||
if err := os.Remove(plugin.WasmPath); err != nil && !os.IsNotExist(err) {
|
||||
log.Warnf("Failed to remove wasm file %s, but database entry was removed: %v", plugin.WasmPath, err)
|
||||
}
|
||||
|
||||
// 5. 从内存中删除
|
||||
delete(m.plugins, pluginID)
|
||||
log.Infof("Plugin '%s' has been successfully uninstalled.", pluginID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckForUpdate 检查单个指定插件的更新。
|
||||
// 如果有可用更新,则返回新版本号;否则返回空字符串。
|
||||
func (m *Manager) CheckForUpdate(ctx context.Context, pluginID string) (string, error) {
|
||||
m.RLock()
|
||||
plugin, ok := m.plugins[pluginID]
|
||||
m.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return "", fmt.Errorf("plugin with ID '%s' not found", pluginID)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") {
|
||||
return "", fmt.Errorf("only plugins installed from GitHub can be checked for updates")
|
||||
}
|
||||
|
||||
latestVersionStr, err := m.getLatestGitHubVersionTag(ctx, plugin.SourceURL)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to check for updates for plugin '%s': %w", pluginID, err)
|
||||
}
|
||||
|
||||
latestVersion, err := semver.NewVersion(latestVersionStr)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid latest version format '%s' for plugin '%s': %w", latestVersionStr, pluginID, err)
|
||||
}
|
||||
|
||||
currentVersion, err := semver.NewVersion(plugin.Version)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid current version format '%s' for plugin '%s': %w", plugin.Version, pluginID, err)
|
||||
}
|
||||
|
||||
if latestVersion.Compare(*currentVersion) > 0 {
|
||||
return latestVersion.String(), nil
|
||||
}
|
||||
|
||||
// 没有可用更新
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// CheckForUpdates 检查所有已安装插件的更新。
|
||||
func (m *Manager) CheckForUpdates(ctx context.Context) (map[string]string, error) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
updatesAvailable := make(map[string]string)
|
||||
|
||||
for id, plugin := range m.plugins {
|
||||
if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") {
|
||||
continue // 只支持检查来自 GitHub 的插件
|
||||
}
|
||||
|
||||
latestVersionStr, err := m.getLatestGitHubVersionTag(ctx, plugin.SourceURL)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to check for updates for plugin '%s': %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
latestVersion, err := semver.NewVersion(latestVersionStr)
|
||||
if err != nil {
|
||||
log.Warnf("Invalid latest version format '%s' for plugin '%s': %v", latestVersionStr, id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
currentVersion, err := semver.NewVersion(plugin.Version)
|
||||
if err != nil {
|
||||
log.Warnf("Invalid current version format '%s' for plugin '%s': %v", plugin.Version, id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 使用 Compare 方法进行比较
|
||||
if latestVersion.Compare(*currentVersion) > 0 {
|
||||
updatesAvailable[id] = latestVersion.String()
|
||||
log.Infof("Update available for plugin '%s': %s -> %s", id, currentVersion.String(), latestVersion.String())
|
||||
}
|
||||
}
|
||||
|
||||
return updatesAvailable, nil
|
||||
}
|
||||
|
||||
// Update 更新指定的插件到最新版本。
|
||||
func (m *Manager) Update(ctx context.Context, pluginID string) (*PluginInfo, error) {
|
||||
m.Lock()
|
||||
plugin, ok := m.plugins[pluginID]
|
||||
m.Unlock() // 提前解锁
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin with ID '%s' not found", pluginID)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") {
|
||||
return nil, fmt.Errorf("only plugins installed from GitHub can be updated automatically")
|
||||
}
|
||||
|
||||
log.Infof("Updating plugin '%s' from %s", pluginID, plugin.SourceURL)
|
||||
|
||||
// 先卸载旧版本
|
||||
if err := m.Uninstall(ctx, pluginID); err != nil {
|
||||
return nil, fmt.Errorf("failed to uninstall old version of plugin '%s' during update: %w", pluginID, err)
|
||||
}
|
||||
|
||||
// 重新从 GitHub 安装
|
||||
return m.Install(ctx, plugin.SourceURL)
|
||||
}
|
||||
|
||||
// getLatestGitHubVersionTag 从 GitHub API 获取最新的 release tag 字符串。
|
||||
func (m *Manager) getLatestGitHubVersionTag(ctx context.Context, repoURL string) (string, error) {
|
||||
// 规范化 URL 并解析 owner/repo
|
||||
repoURL = strings.TrimSuffix(repoURL, ".git")
|
||||
parts := strings.Split(strings.TrimPrefix(repoURL, "https://github.com/"), "/")
|
||||
if len(parts) < 2 {
|
||||
return "", fmt.Errorf("invalid github repo URL format: %s", repoURL)
|
||||
}
|
||||
owner, repo := parts[0], parts[1]
|
||||
|
||||
// 构建 API URL
|
||||
apiURL := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo)
|
||||
|
||||
// 创建带上下文的 HTTP 请求
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request for GitHub API: %w", err)
|
||||
}
|
||||
// 根据 GitHub API v3 的要求设置 Accept header
|
||||
req.Header.Set("Accept", "application/vnd.github.v3+json")
|
||||
|
||||
// 执行请求
|
||||
resp, err := m.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to call GitHub API at %s: %w", apiURL, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 检查响应状态码
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// 读取响应体以获取更详细的错误信息
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return "", fmt.Errorf("GitHub API returned non-200 status: %s, body: %s", resp.Status, string(body))
|
||||
}
|
||||
|
||||
// 定义一个结构体来仅解析我们需要的字段 (tag_name)
|
||||
var release struct {
|
||||
TagName string `json:"tag_name"`
|
||||
}
|
||||
|
||||
// 解析 JSON 响应
|
||||
if err := json.NewDecoder(resp.Body).Decode(&release); err != nil {
|
||||
return "", fmt.Errorf("failed to parse GitHub API response: %w", err)
|
||||
}
|
||||
|
||||
if release.TagName == "" {
|
||||
return "", errors.New("no tag_name found in the latest release")
|
||||
}
|
||||
|
||||
return release.TagName, nil
|
||||
}
|
||||
|
||||
// --- 辅助函数 ---
|
||||
|
||||
// extractPluginFromZip 从 zip 文件中提取 plugin.json 和 .wasm 文件
|
||||
func extractPluginFromZip(zipPath string) ([]byte, []byte, error) {
|
||||
r, err := zip.OpenReader(zipPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
var manifestBytes, wasmBytes []byte
|
||||
|
||||
for _, f := range r.File {
|
||||
// 忽略目录和非插件文件
|
||||
if f.FileInfo().IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
baseName := filepath.Base(f.Name)
|
||||
if baseName == "plugin.json" {
|
||||
rc, err := f.Open()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
manifestBytes, err = io.ReadAll(rc)
|
||||
rc.Close()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
} else if strings.HasSuffix(baseName, ".wasm") {
|
||||
rc, err := f.Open()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wasmBytes, err = io.ReadAll(rc)
|
||||
rc.Close()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if manifestBytes == nil {
|
||||
return nil, nil, errors.New("manifest 'plugin.json' not found in archive")
|
||||
}
|
||||
if wasmBytes == nil {
|
||||
return nil, nil, errors.New("no .wasm file found in archive")
|
||||
}
|
||||
|
||||
return manifestBytes, wasmBytes, nil
|
||||
}
|
||||
|
||||
// downloadTempFile 将文件从 URL 下载到临时目录
|
||||
func downloadTempFile(client *http.Client, url string) (*os.File, error) {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("server returned status: %s", resp.Status)
|
||||
}
|
||||
|
||||
tmpFile, err := os.CreateTemp("", "plugin-download-*.zip")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = io.Copy(tmpFile, resp.Body)
|
||||
if err != nil {
|
||||
tmpFile.Close()
|
||||
os.Remove(tmpFile.Name())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 确保内容写入磁盘
|
||||
if err := tmpFile.Sync(); err != nil {
|
||||
tmpFile.Close()
|
||||
os.Remove(tmpFile.Name())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tmpFile.Close()
|
||||
return tmpFile, nil
|
||||
}
|
||||
|
||||
var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9_.-]+`)
|
||||
|
||||
func sanitize(s string) string {
|
||||
if s == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return nonAlphanumericRegex.ReplaceAllString(s, "_")
|
||||
}
|
||||
|
||||
func formatPluginFileName(author, id string) string {
|
||||
return fmt.Sprintf("%s-%s.wasm", sanitize(author), sanitize(id))
|
||||
}
|
||||
70
internal/plugin/manager_driver.go
Normal file
70
internal/plugin/manager_driver.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// --- 驱动插件处理器 ---
|
||||
|
||||
// DriverPluginHandler 实现了 PluginHandler 接口,专门处理驱动插件
|
||||
type DriverPluginHandler struct{}
|
||||
|
||||
func (h *DriverPluginHandler) Prefix() string {
|
||||
return "openlist.driver."
|
||||
}
|
||||
|
||||
func (h *DriverPluginHandler) Register(ctx context.Context, plugin *PluginInfo) error {
|
||||
if plugin.driver != nil {
|
||||
return nil // 已经注册过了
|
||||
}
|
||||
|
||||
var err error
|
||||
plugin.driver, err = NewDriverPlugin(ctx, plugin)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load driver plugin err: %w", err)
|
||||
}
|
||||
|
||||
err = op.RegisterDriver(func() driver.Driver {
|
||||
tempDriver, err := plugin.driver.NewWasmDriver()
|
||||
if err != nil {
|
||||
log.Errorf("deferred load driver plugin err: %v", err)
|
||||
return nil
|
||||
}
|
||||
return tempDriver
|
||||
})
|
||||
if err != nil {
|
||||
// 如果注册失败,关闭运行时
|
||||
plugin.driver.Close(ctx)
|
||||
return fmt.Errorf("failed to register driver in op: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Successfully registered driver for plugin: %s", plugin.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *DriverPluginHandler) Unregister(ctx context.Context, plugin *PluginInfo) error {
|
||||
if plugin.driver == nil {
|
||||
log.Errorf("plugin.driver is nil during unregister for plugin '%s', cannot get config", plugin.ID)
|
||||
return fmt.Errorf("plugin.driver instance not found, cannot properly unregister from op")
|
||||
}
|
||||
|
||||
op.UnRegisterDriver(func() driver.Driver {
|
||||
tempDriver, err := plugin.driver.NewWasmDriver()
|
||||
if err != nil {
|
||||
log.Warnf("Failed to create temp driver for unregister: %v", err)
|
||||
return nil
|
||||
}
|
||||
return tempDriver
|
||||
})
|
||||
|
||||
if err := plugin.driver.Close(ctx); err != nil {
|
||||
log.Warnf("Error closing driver plugin runtime for %s: %v", plugin.ID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
44
internal/plugin/warp/context.go
Normal file
44
internal/plugin/warp/context.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package plugin_warp
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
)
|
||||
|
||||
type ContextManaget = witgo.ResourceManager[context.Context]
|
||||
type Context = uint32
|
||||
|
||||
func NewContextManager() *ContextManaget {
|
||||
return witgo.NewResourceManager[context.Context](nil)
|
||||
}
|
||||
|
||||
type ContextPollable struct {
|
||||
context.Context
|
||||
}
|
||||
|
||||
func (c *ContextPollable) IsReady() bool {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Block 阻塞直到 Pollable 就绪。
|
||||
func (c *ContextPollable) Block() {
|
||||
<-c.Done()
|
||||
}
|
||||
|
||||
func (*ContextPollable) SetReady() {
|
||||
|
||||
}
|
||||
|
||||
func (ContextPollable) Close() {
|
||||
|
||||
}
|
||||
|
||||
func (c *ContextPollable) Channel() <-chan struct{} {
|
||||
return c.Done()
|
||||
}
|
||||
45
internal/plugin/warp/errors.go
Normal file
45
internal/plugin/warp/errors.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package plugin_warp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
||||
)
|
||||
|
||||
type ErrCode struct {
|
||||
InvalidHandle *struct{} `wit:"case(0)"`
|
||||
// 表示功能未实现。
|
||||
NotImplemented *struct{} `wit:"case(1)"`
|
||||
// 表示功能不支持。
|
||||
NotSupport *struct{} `wit:"case(2)"`
|
||||
// 表示资源未找到。
|
||||
NotFound *struct{} `wit:"case(3)"`
|
||||
// 表示路径是文件而非目录。
|
||||
NotFolder *struct{} `wit:"case(4)"`
|
||||
// 表示路径是目录而非文件。
|
||||
NotFile *struct{} `wit:"case(5)"`
|
||||
// 包含描述信息的通用错误。
|
||||
Generic *string `wit:"case(6)"`
|
||||
// 授权失效,此时驱动处于无法自动恢复的状态
|
||||
Unauthorized *string `wit:"case(7)"`
|
||||
}
|
||||
|
||||
func (e ErrCode) ToError() error {
|
||||
if e.InvalidHandle != nil {
|
||||
return errs.StorageNotFound
|
||||
} else if e.NotImplemented != nil {
|
||||
return errs.NotImplement
|
||||
} else if e.NotSupport != nil {
|
||||
return errs.NotSupport
|
||||
} else if e.NotFound != nil {
|
||||
return errs.ObjectNotFound
|
||||
} else if e.NotFile != nil {
|
||||
return errs.NotFile
|
||||
} else if e.NotFolder != nil {
|
||||
return errs.NotFolder
|
||||
} else if e.Unauthorized != nil {
|
||||
return errors.New(*e.Unauthorized)
|
||||
}
|
||||
|
||||
return errors.New(*e.Generic)
|
||||
}
|
||||
98
internal/plugin/warp/object.go
Normal file
98
internal/plugin/warp/object.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package plugin_warp
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks/v0_2"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
)
|
||||
|
||||
type Object struct {
|
||||
// 对象的绝对路径。
|
||||
Path string
|
||||
// 对象的id信息
|
||||
ID string
|
||||
// 对象的名称。
|
||||
Name string
|
||||
// 对象的大小(字节)。
|
||||
Size int64
|
||||
// 是否为目录。
|
||||
IsFolder bool
|
||||
// 创建时间戳
|
||||
Created clocks.Duration
|
||||
// 修改时间戳
|
||||
Modified clocks.Duration
|
||||
// 缩略图链接。
|
||||
Thumbnail witgo.Option[string]
|
||||
// 文件的哈希信息列表。
|
||||
Hashes []HashInfo
|
||||
// 用于存储驱动特定的、非标准的元数据。
|
||||
Extra [][2]string
|
||||
}
|
||||
|
||||
func (o *Object) GetName() string {
|
||||
return o.Name
|
||||
}
|
||||
|
||||
func (o *Object) GetSize() int64 {
|
||||
return o.Size
|
||||
}
|
||||
|
||||
func (o *Object) ModTime() time.Time {
|
||||
return o.Modified.ToTime()
|
||||
}
|
||||
func (o *Object) CreateTime() time.Time {
|
||||
if o.Created == 0 {
|
||||
return o.ModTime()
|
||||
}
|
||||
return o.Created.ToTime()
|
||||
}
|
||||
|
||||
func (o *Object) IsDir() bool {
|
||||
return o.IsFolder
|
||||
}
|
||||
|
||||
func (o *Object) GetID() string {
|
||||
return o.ID
|
||||
}
|
||||
|
||||
func (o *Object) GetPath() string {
|
||||
return o.Path
|
||||
}
|
||||
|
||||
func (o *Object) SetPath(path string) {
|
||||
o.Path = path
|
||||
}
|
||||
|
||||
func (o *Object) GetHash() utils.HashInfo {
|
||||
return HashInfoConvert3(o.Hashes)
|
||||
}
|
||||
|
||||
func (o *Object) Thumb() string {
|
||||
return o.Thumbnail.UnwrapOr("")
|
||||
}
|
||||
|
||||
var _ model.Obj = (*Object)(nil)
|
||||
var _ model.Thumb = (*Object)(nil)
|
||||
var _ model.SetPath = (*Object)(nil)
|
||||
|
||||
func ConvertObjToObject(obj model.Obj) Object {
|
||||
|
||||
thumbnail := witgo.None[string]()
|
||||
if t, ok := obj.(model.Thumb); ok {
|
||||
thumbnail = witgo.Some(t.Thumb())
|
||||
}
|
||||
return Object{
|
||||
Path: obj.GetPath(),
|
||||
ID: obj.GetID(),
|
||||
Name: obj.GetName(),
|
||||
Size: obj.GetSize(),
|
||||
IsFolder: obj.IsDir(),
|
||||
Created: clocks.Duration(obj.CreateTime().UnixNano()),
|
||||
Modified: clocks.Duration(obj.ModTime().UnixNano()),
|
||||
Thumbnail: thumbnail,
|
||||
Hashes: HashInfoConvert(obj.GetHash()),
|
||||
}
|
||||
}
|
||||
198
internal/plugin/warp/types.go
Normal file
198
internal/plugin/warp/types.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package plugin_warp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks/v0_2"
|
||||
wasi_http "github.com/OpenListTeam/wazero-wasip2/wasip2/http/v0_2"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
)
|
||||
|
||||
type LogLevel struct {
|
||||
Debug *struct{} `wit:"case(0)"`
|
||||
Info *struct{} `wit:"case(1)"`
|
||||
Warn *struct{} `wit:"case(2)"`
|
||||
Error *struct{} `wit:"case(3)"`
|
||||
}
|
||||
|
||||
type HashAlg struct {
|
||||
MD5 *struct{} `wit:"case(0)"`
|
||||
SHA1 *struct{} `wit:"case(1)"`
|
||||
SHA256 *struct{} `wit:"case(2)"`
|
||||
GCID *struct{} `wit:"case(3)"`
|
||||
}
|
||||
|
||||
type HashInfo struct {
|
||||
Alg HashAlg
|
||||
Val string
|
||||
}
|
||||
|
||||
type LinkResult struct {
|
||||
File witgo.Option[Object]
|
||||
Resource LinkResource
|
||||
}
|
||||
type LinkResource struct {
|
||||
Direct *struct {
|
||||
Url string
|
||||
Header wasi_http.Headers
|
||||
Expiratcion witgo.Option[clocks.Duration]
|
||||
} `wit:"case(0)"`
|
||||
RangeStream *struct{} `wit:"case(1)"`
|
||||
}
|
||||
|
||||
type Capability struct {
|
||||
GetFile bool
|
||||
ListFile bool
|
||||
LinkFile bool
|
||||
MkdirFile bool
|
||||
RenameFile bool
|
||||
MoveFile bool
|
||||
RemoveFile bool
|
||||
CopyFile bool
|
||||
UploadFile bool
|
||||
}
|
||||
|
||||
func (Capability) IsFlags() {}
|
||||
|
||||
type DriverProps struct {
|
||||
Name string
|
||||
|
||||
OnlyProxy bool
|
||||
NoCache bool
|
||||
|
||||
Alert string
|
||||
|
||||
NoOverwriteUpload bool
|
||||
ProxyRange bool
|
||||
|
||||
// 网盘能力标记
|
||||
Capabilitys Capability
|
||||
}
|
||||
|
||||
func (c DriverProps) ToConfig() driver.Config {
|
||||
return driver.Config{
|
||||
Name: c.Name,
|
||||
LocalSort: true,
|
||||
OnlyProxy: c.OnlyProxy,
|
||||
NoCache: c.NoCache,
|
||||
NoUpload: !c.Capabilitys.UploadFile,
|
||||
|
||||
CheckStatus: true,
|
||||
Alert: c.Alert,
|
||||
|
||||
NoOverwriteUpload: c.NoOverwriteUpload,
|
||||
ProxyRangeOption: c.ProxyRange,
|
||||
}
|
||||
}
|
||||
|
||||
type FormField struct {
|
||||
// 字段的唯一标识符(键)。
|
||||
Name string
|
||||
// 显示给用户的标签。
|
||||
Label string
|
||||
// 字段的输入类型,用于 UI 渲染。
|
||||
Kind FieldKind
|
||||
// 是否必填
|
||||
Required bool
|
||||
// 字段的帮助或提示信息。
|
||||
Help string
|
||||
}
|
||||
|
||||
type FieldKind struct {
|
||||
String *string `wit:"case(0)"`
|
||||
Password *string `wit:"case(1)"`
|
||||
Number *float64 `wit:"case(2)"`
|
||||
Boolean *bool `wit:"case(3)"`
|
||||
Text *string `wit:"case(4)"`
|
||||
Select *[]string `wit:"case(5)"`
|
||||
}
|
||||
|
||||
type Additional struct {
|
||||
Json []byte
|
||||
Forms []FormField
|
||||
}
|
||||
|
||||
func NewAdditional(forms []FormField) Additional {
|
||||
return Additional{
|
||||
Forms: forms,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Additional) String() string {
|
||||
return string(m.Json)
|
||||
}
|
||||
func (m *Additional) SetString(config string) {
|
||||
m.Json = []byte(config)
|
||||
}
|
||||
|
||||
func (m *Additional) Bytes() []byte {
|
||||
return m.Json
|
||||
}
|
||||
|
||||
func (m *Additional) SetBytes(config []byte) {
|
||||
m.Json = config
|
||||
}
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m Additional) MarshalJSON() ([]byte, error) {
|
||||
return m.Json, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *Additional) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("json.RawMessage: UnmarshalJSON on nil pointer")
|
||||
}
|
||||
m.Json = slices.Clone(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (addit *Additional) GetItems() []driver.Item {
|
||||
return utils.MustSliceConvert(addit.Forms, func(item FormField) driver.Item {
|
||||
var typ string
|
||||
var def string
|
||||
var opts string
|
||||
if item.Kind.Boolean != nil {
|
||||
typ = conf.TypeBool
|
||||
def = fmt.Sprintf("%t", *item.Kind.Boolean)
|
||||
} else if item.Kind.Password != nil {
|
||||
typ = conf.TypeString
|
||||
def = *item.Kind.Password
|
||||
} else if item.Kind.Number != nil {
|
||||
typ = conf.TypeNumber
|
||||
def = fmt.Sprintf("%f", *item.Kind.Number)
|
||||
} else if item.Kind.Select != nil {
|
||||
typ = conf.TypeSelect
|
||||
if len(*item.Kind.Select) > 0 {
|
||||
def = (*item.Kind.Select)[0]
|
||||
opts = strings.Join((*item.Kind.Select), ",")
|
||||
}
|
||||
} else if item.Kind.String != nil {
|
||||
typ = conf.TypeString
|
||||
def = *item.Kind.String
|
||||
} else if item.Kind.Text != nil {
|
||||
typ = conf.TypeText
|
||||
def = *item.Kind.Text
|
||||
}
|
||||
|
||||
return driver.Item{
|
||||
Name: item.Name,
|
||||
Type: typ,
|
||||
Default: def,
|
||||
Options: opts,
|
||||
Required: item.Required,
|
||||
Help: item.Help,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type LinkArgs struct {
|
||||
IP string
|
||||
Header wasi_http.Headers
|
||||
}
|
||||
105
internal/plugin/warp/upload.go
Normal file
105
internal/plugin/warp/upload.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package plugin_warp
|
||||
|
||||
import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/driver"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
hash_extend "github.com/OpenListTeam/OpenList/v4/pkg/utils/hash"
|
||||
witgo "github.com/OpenListTeam/wazero-wasip2/wit-go"
|
||||
)
|
||||
|
||||
type UploadRequest struct {
|
||||
Target Object
|
||||
// 指向宿主端文件内容的句柄
|
||||
// 由host控制释放
|
||||
Content UploadReadable
|
||||
// 如果是覆盖上传,宿主会提供被覆盖文件的原始对象数据
|
||||
Exist witgo.Option[Object]
|
||||
}
|
||||
|
||||
type UploadReadableType struct {
|
||||
model.FileStreamer
|
||||
StreamConsume bool
|
||||
UpdateProgress driver.UpdateProgress
|
||||
SectionReader *StreamSectionReader
|
||||
}
|
||||
|
||||
type StreamSectionReader struct {
|
||||
stream.StreamSectionReaderIF
|
||||
Offset int64
|
||||
CunketSize int64
|
||||
}
|
||||
|
||||
type UploadReadableManager = witgo.ResourceManager[*UploadReadableType]
|
||||
type UploadReadable = uint32
|
||||
|
||||
func NewUploadManager() *UploadReadableManager {
|
||||
return witgo.NewResourceManager[*UploadReadableType](nil)
|
||||
}
|
||||
|
||||
func HashTypeConvert(typ *utils.HashType) HashAlg {
|
||||
switch typ {
|
||||
case utils.MD5:
|
||||
return HashAlg{MD5: &struct{}{}}
|
||||
case utils.SHA1:
|
||||
return HashAlg{SHA1: &struct{}{}}
|
||||
case utils.SHA256:
|
||||
return HashAlg{SHA256: &struct{}{}}
|
||||
case hash_extend.GCID:
|
||||
return HashAlg{GCID: &struct{}{}}
|
||||
}
|
||||
panic("plase add hash convert")
|
||||
}
|
||||
func HashAlgConvert(hash HashAlg) *utils.HashType {
|
||||
if hash.MD5 != nil {
|
||||
return utils.MD5
|
||||
} else if hash.SHA1 != nil {
|
||||
return utils.SHA1
|
||||
} else if hash.SHA256 != nil {
|
||||
return utils.SHA256
|
||||
} else if hash.GCID != nil {
|
||||
return hash_extend.GCID
|
||||
}
|
||||
panic("plase add hash convert")
|
||||
}
|
||||
func HashAlgConverts(HashAlgs []HashAlg) []*utils.HashType {
|
||||
hashTypes := make([]*utils.HashType, 0, len(HashAlgs))
|
||||
for _, needHash := range HashAlgs {
|
||||
hashTypes = append(hashTypes, HashAlgConvert(needHash))
|
||||
}
|
||||
return hashTypes
|
||||
}
|
||||
|
||||
func HashInfoConvert(hashInfo utils.HashInfo) []HashInfo {
|
||||
result := make([]HashInfo, 0, 4)
|
||||
for hash, val := range hashInfo.All() {
|
||||
if hash.Width != len(val) {
|
||||
continue
|
||||
}
|
||||
result = append(result, HashInfo{Alg: HashTypeConvert(hash), Val: val})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func HashInfoConvert2(hashInfo utils.HashInfo, needHashs []HashAlg) []HashInfo {
|
||||
resultHashs := make([]HashInfo, 0, len(needHashs))
|
||||
|
||||
for _, needHash := range needHashs {
|
||||
hashType := HashAlgConvert(needHash)
|
||||
hash := hashInfo.GetHash(hashType)
|
||||
if hashType.Width != len(hash) {
|
||||
return nil
|
||||
}
|
||||
resultHashs = append(resultHashs, HashInfo{Alg: needHash, Val: hash})
|
||||
}
|
||||
return resultHashs
|
||||
}
|
||||
func HashInfoConvert3(hashInfo []HashInfo) utils.HashInfo {
|
||||
newHashInfo := make(map[*utils.HashType]string, len(hashInfo))
|
||||
for _, hashInfo := range hashInfo {
|
||||
newHashInfo[HashAlgConvert(hashInfo.Alg)] = hashInfo.Val
|
||||
}
|
||||
return utils.NewHashInfoByMap(newHashInfo)
|
||||
}
|
||||
245
server/handles/plugin.go
Normal file
245
server/handles/plugin.go
Normal file
@@ -0,0 +1,245 @@
|
||||
package handles
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/db"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/plugin"
|
||||
"github.com/OpenListTeam/OpenList/v4/server/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// InstallPluginReq 定义了安装插件 API 的请求体结构
|
||||
type InstallPluginReq struct {
|
||||
// Source 是插件的来源地址,可以是:
|
||||
// 1. GitHub 仓库 URL (e.g., "https://github.com/user/repo")
|
||||
// 2. Zip 压缩包 URL (e.g., "https://example.com/plugin.zip")
|
||||
// 3. 本地 manifest 文件路径 (e.g., "/path/to/plugin.json")
|
||||
Source string `json:"source" binding:"required"`
|
||||
}
|
||||
|
||||
// PluginIDReq 定义了需要插件 ID 的通用请求体结构
|
||||
type PluginIDReq struct {
|
||||
ID string `json:"id" binding:"required"`
|
||||
}
|
||||
|
||||
// --- API 处理器 ---
|
||||
|
||||
// ListPlugins godoc
|
||||
// @Summary List all installed plugins
|
||||
// @Description Get a list of all plugins that are currently installed.
|
||||
// @Tags plugin
|
||||
// @Produce json
|
||||
// @Success 200 {object} common.Resp{data=[]model.Plugin} "A list of installed plugins"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/list [get]
|
||||
func ListPlugins(c *gin.Context) {
|
||||
// 直接从数据库获取最新的插件列表,确保状态是最新的
|
||||
plugins, err := db.GetAllPlugins(c.Request.Context())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get all plugins from database: %v", err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
common.SuccessResp(c, plugins)
|
||||
}
|
||||
|
||||
// InstallPlugin godoc
|
||||
// @Summary Install a new plugin
|
||||
// @Description Install a plugin from a source URL (GitHub, Zip) or a local path.
|
||||
// @Tags plugin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param req body InstallPluginReq true "Plugin source"
|
||||
// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin installed successfully"
|
||||
// @Failure 400 {object} common.Resp "Bad request"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/install [post]
|
||||
func InstallPlugin(c *gin.Context) {
|
||||
var req InstallPluginReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
common.ErrorResp(c, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Attempting to install plugin from source: %s", req.Source)
|
||||
|
||||
pluginInfo, err := plugin.PluginManager.Install(c.Request.Context(), req.Source)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to install plugin from source '%s': %v", req.Source, err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Successfully installed plugin: %s (v%s)", pluginInfo.Name, pluginInfo.Version)
|
||||
common.SuccessResp(c, pluginInfo.Plugin)
|
||||
}
|
||||
|
||||
// InstallPluginFromUpload godoc
|
||||
// @Summary Install a plugin from an uploaded zip file
|
||||
// @Description Upload a .zip file containing plugin.json and a .wasm file to install a new plugin.
|
||||
// @Tags plugin
|
||||
// @Accept multipart/form-data
|
||||
// @Produce json
|
||||
// @Param file formData file true "The plugin zip file to upload"
|
||||
// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin installed successfully"
|
||||
// @Failure 400 {object} common.Resp "Bad request (e.g., no file uploaded)"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/upload [post]
|
||||
func InstallPluginFromUpload(c *gin.Context) {
|
||||
// "file" 必须是前端上传文件时使用的表单字段名 (form field name)
|
||||
file, err := c.FormFile("file")
|
||||
if err != nil {
|
||||
common.ErrorResp(c, fmt.Errorf("failed to get 'file' from form: %w", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Attempting to install plugin from uploaded file: %s", file.Filename)
|
||||
|
||||
// 打开上传的文件以获取 io.Reader
|
||||
f, err := file.Open()
|
||||
if err != nil {
|
||||
common.ErrorResp(c, fmt.Errorf("failed to open uploaded file: %w", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// 调用管理器的 InstallFromUpload 方法
|
||||
pluginInfo, err := plugin.PluginManager.InstallFromUpload(c.Request.Context(), f, file.Filename)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to install plugin from uploaded file '%s': %v", file.Filename, err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Successfully installed plugin from upload: %s (v%s)", pluginInfo.Name, pluginInfo.Version)
|
||||
common.SuccessResp(c, pluginInfo.Plugin)
|
||||
}
|
||||
|
||||
// UninstallPlugin godoc
|
||||
// @Summary Uninstall a plugin
|
||||
// @Description Uninstall a plugin by its ID.
|
||||
// @Tags plugin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param req body PluginIDReq true "Plugin ID to uninstall"
|
||||
// @Success 200 {object} common.Resp "Plugin uninstalled successfully"
|
||||
// @Failure 400 {object} common.Resp "Bad request"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/uninstall [post]
|
||||
func UninstallPlugin(c *gin.Context) {
|
||||
var req PluginIDReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
common.ErrorResp(c, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Attempting to uninstall plugin with ID: %s", req.ID)
|
||||
|
||||
if err := plugin.PluginManager.Uninstall(c.Request.Context(), req.ID); err != nil {
|
||||
log.Errorf("Failed to uninstall plugin '%s': %v", req.ID, err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Successfully uninstalled plugin: %s", req.ID)
|
||||
common.SuccessResp(c, "Plugin uninstalled successfully")
|
||||
}
|
||||
|
||||
// CheckForUpdates godoc
|
||||
// @Summary Check for plugin updates
|
||||
// @Description Checks all installed plugins from GitHub for available updates.
|
||||
// @Tags plugin
|
||||
// @Produce json
|
||||
// @Success 200 {object} common.Resp{data=map[string]string} "A map of plugins with available updates (id: new_version)"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/updates/check [get]
|
||||
func CheckForUpdates(c *gin.Context) {
|
||||
log.Info("Checking for plugin updates...")
|
||||
|
||||
updates, err := plugin.PluginManager.CheckForUpdates(c.Request.Context())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to check for plugin updates: %v", err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Found %d available plugin updates.", len(updates))
|
||||
common.SuccessResp(c, updates)
|
||||
}
|
||||
|
||||
// UpdatePlugin godoc
|
||||
// @Summary Update a plugin
|
||||
// @Description Update a specific plugin to its latest version. The plugin must have been installed from GitHub.
|
||||
// @Tags plugin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param req body PluginIDReq true "Plugin ID to update"
|
||||
// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin updated successfully"
|
||||
// @Failure 400 {object} common.Resp "Bad request"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/update [post]
|
||||
func UpdatePlugin(c *gin.Context) {
|
||||
var req PluginIDReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
common.ErrorResp(c, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Attempting to update plugin with ID: %s", req.ID)
|
||||
|
||||
updatedPluginInfo, err := plugin.PluginManager.Update(c.Request.Context(), req.ID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to update plugin '%s': %v", req.ID, err)
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Successfully updated plugin: %s", req.ID)
|
||||
common.SuccessResp(c, updatedPluginInfo.Plugin)
|
||||
}
|
||||
|
||||
// internal/server/handles/plugin.go
|
||||
|
||||
// CheckForUpdateSingle godoc
|
||||
// @Summary Check for a single plugin update
|
||||
// @Description Checks a specific plugin for an available update.
|
||||
// @Tags plugin
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param req body PluginIDReq true "Plugin ID to check"
|
||||
// @Success 200 {object} common.Resp{data=map[string]string} "A map containing the new version if an update is available (e.g., {\"new_version\": \"1.1.0\"})"
|
||||
// @Failure 400 {object} common.Resp "Bad request"
|
||||
// @Failure 404 {object} common.Resp "Plugin not found or not eligible for update"
|
||||
// @Failure 500 {object} common.Resp "Internal server error"
|
||||
// @Router /api/plugin/updates/check_one [post]
|
||||
func CheckForUpdateSingle(c *gin.Context) {
|
||||
var req PluginIDReq
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
common.ErrorResp(c, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Checking for update for plugin: %s", req.ID)
|
||||
|
||||
newVersion, err := plugin.PluginManager.CheckForUpdate(c.Request.Context(), req.ID)
|
||||
if err != nil {
|
||||
// 区分是插件找不到还是检查过程出错
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
common.ErrorResp(c, err, http.StatusNotFound)
|
||||
} else {
|
||||
common.ErrorResp(c, err, http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
response := make(map[string]string)
|
||||
if newVersion != "" {
|
||||
response["new_version"] = newVersion
|
||||
}
|
||||
|
||||
common.SuccessResp(c, response)
|
||||
}
|
||||
@@ -166,6 +166,23 @@ func admin(g *gin.RouterGroup) {
|
||||
setting.POST("/set_thunderx", handles.SetThunderX)
|
||||
setting.POST("/set_thunder_browser", handles.SetThunderBrowser)
|
||||
|
||||
// 添加插件管理 API 路由组
|
||||
plugin := g.Group("/plugin")
|
||||
{
|
||||
plugin.GET("/list", handles.ListPlugins)
|
||||
plugin.POST("/install", handles.InstallPlugin)
|
||||
plugin.POST("/upload", handles.InstallPluginFromUpload)
|
||||
plugin.POST("/uninstall", handles.UninstallPlugin)
|
||||
plugin.POST("/update", handles.UpdatePlugin)
|
||||
|
||||
// 将检查更新的路由放在一个子组中,更符合 RESTful 风格
|
||||
updates := plugin.Group("/updates")
|
||||
{
|
||||
updates.GET("/check", handles.CheckForUpdates)
|
||||
updates.POST("/check_one", handles.CheckForUpdateSingle)
|
||||
}
|
||||
}
|
||||
|
||||
// retain /admin/task API to ensure compatibility with legacy automation scripts
|
||||
_task(g.Group("/task"))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user